debian-python-whoosh/tests/test_searching.py

1738 lines
60 KiB
Python

#encoding: utf-8
from __future__ import with_statement
import copy
from datetime import datetime, timedelta
import pytest
from whoosh import analysis, fields, index, qparser, query, searching, scoring
from whoosh.codec.whoosh3 import W3Codec
from whoosh.compat import b, u, text_type
from whoosh.compat import xrange, permutations, izip_longest
from whoosh.filedb.filestore import RamStorage
from whoosh.util.testing import TempIndex
def make_index():
s = fields.Schema(key=fields.ID(stored=True),
name=fields.TEXT,
value=fields.TEXT)
st = RamStorage()
ix = st.create_index(s)
w = ix.writer()
w.add_document(key=u("A"), name=u("Yellow brown"),
value=u("Blue red green render purple?"))
w.add_document(key=u("B"), name=u("Alpha beta"),
value=u("Gamma delta epsilon omega."))
w.add_document(key=u("C"), name=u("One two"),
value=u("Three rendered four five."))
w.add_document(key=u("D"), name=u("Quick went"),
value=u("Every red town."))
w.add_document(key=u("E"), name=u("Yellow uptown"),
value=u("Interest rendering outer photo!"))
w.commit()
return ix
def _get_keys(stored_fields):
return sorted([d.get("key") for d in stored_fields])
def _docs(q, s):
return _get_keys([s.stored_fields(docnum) for docnum
in q.docs(s)])
def _run_query(q, target):
ix = make_index()
with ix.searcher() as s:
assert target == _docs(q, s)
def test_empty_index():
schema = fields.Schema(key=fields.ID(stored=True), value=fields.TEXT)
st = RamStorage()
with pytest.raises(index.EmptyIndexError):
st.open_index(schema=schema)
def test_docs_method():
ix = make_index()
with ix.searcher() as s:
assert _get_keys(s.documents(name="yellow")) == ["A", "E"]
assert _get_keys(s.documents(value="red")) == ["A", "D"]
assert _get_keys(s.documents()) == ["A", "B", "C", "D", "E"]
def test_term():
_run_query(query.Term("name", u("yellow")), [u("A"), u("E")])
_run_query(query.Term("value", u("zeta")), [])
_run_query(query.Term("value", u("red")), [u("A"), u("D")])
def test_require():
_run_query(query.Require(query.Term("value", u("red")),
query.Term("name", u("yellow"))),
[u("A")])
def test_and():
_run_query(query.And([query.Term("value", u("red")),
query.Term("name", u("yellow"))]),
[u("A")])
# Missing
_run_query(query.And([query.Term("value", u("ochre")),
query.Term("name", u("glonk"))]),
[])
def test_or():
_run_query(query.Or([query.Term("value", u("red")),
query.Term("name", u("yellow"))]),
[u("A"), u("D"), u("E")])
# Missing
_run_query(query.Or([query.Term("value", u("ochre")),
query.Term("name", u("glonk"))]),
[])
_run_query(query.Or([]), [])
def test_ors():
domain = u("alfa bravo charlie delta").split()
s = fields.Schema(num=fields.STORED, text=fields.TEXT)
st = RamStorage()
ix = st.create_index(s)
with ix.writer() as w:
for i, ls in enumerate(permutations(domain)):
w.add_document(num=i, text=" ".join(ls))
with ix.searcher() as s:
qs = [query.Term("text", word) for word in domain]
for i in xrange(1, len(domain)):
q = query.Or(qs[:i])
r1 = [(hit.docnum, hit.score) for hit in s.search(q, limit=None)]
q.binary_matcher = True
r2 = [(hit.docnum, hit.score) for hit in s.search(q, limit=None)]
for item1, item2 in izip_longest(r1, r2):
assert item1[0] == item2[0]
assert item1[1] == item2[1]
def test_not():
_run_query(query.And([query.Or([query.Term("value", u("red")),
query.Term("name", u("yellow"))]),
query.Not(query.Term("name", u("quick")))]),
[u("A"), u("E")])
def test_topnot():
_run_query(query.Not(query.Term("value", "red")), [u("B"), "C", "E"])
_run_query(query.Not(query.Term("name", "yellow")), [u("B"), u("C"),
u("D")])
def test_andnot():
_run_query(query.AndNot(query.Term("name", u("yellow")),
query.Term("value", u("purple"))),
[u("E")])
def test_andnot2():
schema = fields.Schema(a=fields.ID(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(a=u("bravo"))
w.add_document(a=u("echo"))
w.add_document(a=u("juliet"))
w.commit()
w = ix.writer()
w.add_document(a=u("kilo"))
w.add_document(a=u("foxtrot"))
w.add_document(a=u("charlie"))
w.commit(merge=False)
w = ix.writer()
w.delete_by_term("a", u("echo"))
w.add_document(a=u("alfa"))
w.add_document(a=u("india"))
w.add_document(a=u("delta"))
w.commit(merge=False)
with ix.searcher() as s:
q = query.TermRange("a", u("bravo"), u("k"))
qr = [hit["a"] for hit in s.search(q)]
assert " ".join(sorted(qr)) == "bravo charlie delta foxtrot india juliet"
oq = query.Or([query.Term("a", "bravo"), query.Term("a", "delta")])
oqr = [hit["a"] for hit in s.search(oq)]
assert " ".join(sorted(oqr)) == "bravo delta"
anq = query.AndNot(q, oq)
m = anq.matcher(s)
r = s.search(anq)
assert list(anq.docs(s)) == sorted(hit.docnum for hit in r)
assert " ".join(sorted(hit["a"] for hit in r)) == "charlie foxtrot india juliet"
def test_variations():
_run_query(query.Variations("value", u("render")),
[u("A"), u("C"), u("E")])
def test_wildcard():
_run_query(query.Or([query.Wildcard('value', u('*red*')),
query.Wildcard('name', u('*yellow*'))]),
[u("A"), u("C"), u("D"), u("E")])
# Missing
_run_query(query.Wildcard('value', 'glonk*'), [])
def test_not2():
schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
writer.add_document(name=u("a"), value=u("alfa bravo charlie delta echo"))
writer.add_document(name=u("b"),
value=u("bravo charlie delta echo foxtrot"))
writer.add_document(name=u("c"),
value=u("charlie delta echo foxtrot golf"))
writer.add_document(name=u("d"), value=u("delta echo golf hotel india"))
writer.add_document(name=u("e"), value=u("echo golf hotel india juliet"))
writer.commit()
with ix.searcher() as s:
p = qparser.QueryParser("value", None)
results = s.search(p.parse("echo NOT golf"))
assert sorted([d["name"] for d in results]) == ["a", "b"]
results = s.search(p.parse("echo NOT bravo"))
assert sorted([d["name"] for d in results]) == ["c", "d", "e"]
ix.delete_by_term("value", u("bravo"))
with ix.searcher() as s:
results = s.search(p.parse("echo NOT charlie"))
assert sorted([d["name"] for d in results]) == ["d", "e"]
# def test_or_minmatch():
# schema = fields.Schema(k=fields.STORED, v=fields.TEXT)
# st = RamStorage()
# ix = st.create_index(schema)
#
# w = ix.writer()
# w.add_document(k=1, v=u("alfa bravo charlie delta echo"))
# w.add_document(k=2, v=u("bravo charlie delta echo foxtrot"))
# w.add_document(k=3, v=u("charlie delta echo foxtrot golf"))
# w.add_document(k=4, v=u("delta echo foxtrot golf hotel"))
# w.add_document(k=5, v=u("echo foxtrot golf hotel india"))
# w.add_document(k=6, v=u("foxtrot golf hotel india juliet"))
# w.commit()
#
# s = ix.searcher()
# q = Or([Term("v", "echo"), Term("v", "foxtrot")], minmatch=2)
# r = s.search(q)
# assert sorted(d["k"] for d in r), [2, 3, 4, 5])
def test_range():
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("A"), content=u("alfa bravo charlie delta echo"))
w.add_document(id=u("B"), content=u("bravo charlie delta echo foxtrot"))
w.add_document(id=u("C"), content=u("charlie delta echo foxtrot golf"))
w.add_document(id=u("D"), content=u("delta echo foxtrot golf hotel"))
w.add_document(id=u("E"), content=u("echo foxtrot golf hotel india"))
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("content", schema)
q = qp.parse(u("charlie [delta TO foxtrot]"))
assert q.__class__ == query.And
assert q[0].__class__ == query.Term
assert q[1].__class__ == query.TermRange
assert q[1].start == "delta"
assert q[1].end == "foxtrot"
assert not q[1].startexcl
assert not q[1].endexcl
ids = sorted([d['id'] for d in s.search(q)])
assert ids == [u('A'), u('B'), u('C')]
q = qp.parse(u("foxtrot {echo TO hotel]"))
assert q.__class__ == query.And
assert q[0].__class__ == query.Term
assert q[1].__class__ == query.TermRange
assert q[1].start == "echo"
assert q[1].end == "hotel"
assert q[1].startexcl
assert not q[1].endexcl
ids = sorted([d['id'] for d in s.search(q)])
assert ids == [u('B'), u('C'), u('D'), u('E')]
q = qp.parse(u("{bravo TO delta}"))
assert q.__class__ == query.TermRange
assert q.start == "bravo"
assert q.end == "delta"
assert q.startexcl
assert q.endexcl
ids = sorted([d['id'] for d in s.search(q)])
assert ids == [u('A'), u('B'), u('C')]
# Shouldn't match anything
q = qp.parse(u("[1 to 10]"))
assert q.__class__ == query.TermRange
assert len(s.search(q)) == 0
def test_range_clusiveness():
schema = fields.Schema(id=fields.ID(stored=True))
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
for letter in u("abcdefg"):
w.add_document(id=letter)
w.commit()
with ix.searcher() as s:
def check(startexcl, endexcl, string):
q = query.TermRange("id", "b", "f", startexcl, endexcl)
r = "".join(sorted(d['id'] for d in s.search(q)))
assert r == string
check(False, False, "bcdef")
check(True, False, "cdef")
check(True, True, "cde")
check(False, True, "bcde")
def test_open_ranges():
schema = fields.Schema(id=fields.ID(stored=True))
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
for letter in u("abcdefg"):
w.add_document(id=letter)
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("id", schema)
def check(qstring, result):
q = qp.parse(qstring)
r = "".join(sorted([d['id'] for d in s.search(q)]))
assert r == result
check(u("[b TO]"), "bcdefg")
check(u("[TO e]"), "abcde")
check(u("[b TO d]"), "bcd")
check(u("{b TO]"), "cdefg")
check(u("[TO e}"), "abcd")
check(u("{b TO d}"), "c")
def test_open_numeric_ranges():
domain = range(0, 1000, 7)
schema = fields.Schema(num=fields.NUMERIC(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
for i in domain:
w.add_document(num=i)
w.commit()
qp = qparser.QueryParser("num", schema)
with ix.searcher() as s:
q = qp.parse("[100 to]")
r = [hit["num"] for hit in s.search(q, limit=None)]
assert r == [n for n in domain if n >= 100]
q = qp.parse("[to 500]")
r = [hit["num"] for hit in s.search(q, limit=None)]
assert r == [n for n in domain if n <= 500]
def test_open_date_ranges():
basedate = datetime(2011, 1, 24, 6, 25, 0, 0)
domain = [basedate + timedelta(days=n) for n in xrange(-20, 20)]
schema = fields.Schema(date=fields.DATETIME(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
for d in domain:
w.add_document(date=d)
w.commit()
with ix.searcher() as s:
# Without date parser
qp = qparser.QueryParser("date", schema)
q = qp.parse("[2011-01-10 to]")
r = [hit["date"] for hit in s.search(q, limit=None)]
assert len(r) > 0
target = [d for d in domain if d >= datetime(2011, 1, 10, 6, 25)]
assert r == target
q = qp.parse("[to 2011-01-30]")
r = [hit["date"] for hit in s.search(q, limit=None)]
assert len(r) > 0
target = [d for d in domain if d <= datetime(2011, 1, 30, 6, 25)]
assert r == target
# With date parser
from whoosh.qparser.dateparse import DateParserPlugin
qp.add_plugin(DateParserPlugin(basedate))
q = qp.parse("[10 jan 2011 to]")
r = [hit["date"] for hit in s.search(q, limit=None)]
assert len(r) > 0
target = [d for d in domain if d >= datetime(2011, 1, 10, 6, 25)]
assert r == target
q = qp.parse("[to 30 jan 2011]")
r = [hit["date"] for hit in s.search(q, limit=None)]
assert len(r) > 0
target = [d for d in domain if d <= datetime(2011, 1, 30, 6, 25)]
assert r == target
def test_negated_unlimited_ranges():
# Whoosh should treat u("[to]") as if it was "*"
schema = fields.Schema(id=fields.ID(stored=True), num=fields.NUMERIC,
date=fields.DATETIME)
ix = RamStorage().create_index(schema)
w = ix.writer()
from string import ascii_letters
domain = text_type(ascii_letters)
dt = datetime.now()
for i, letter in enumerate(domain):
w.add_document(id=letter, num=i, date=dt + timedelta(days=i))
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("id", schema)
nq = qp.parse(u("NOT [to]"))
assert nq.__class__ == query.Not
q = nq.query
assert q.__class__ == query.Every
assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
assert not list(nq.docs(s))
nq = qp.parse(u("NOT num:[to]"))
assert nq.__class__ == query.Not
q = nq.query
assert q.__class__ == query.NumericRange
assert q.start is None
assert q.end is None
assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
assert not list(nq.docs(s))
nq = qp.parse(u("NOT date:[to]"))
assert nq.__class__ == query.Not
q = nq.query
assert q.__class__ == query.Every
assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
assert not list(nq.docs(s))
def test_keyword_or():
schema = fields.Schema(a=fields.ID(stored=True), b=fields.KEYWORD)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(a=u("First"), b=u("ccc ddd"))
w.add_document(a=u("Second"), b=u("aaa ddd"))
w.add_document(a=u("Third"), b=u("ccc eee"))
w.commit()
qp = qparser.QueryParser("b", schema)
with ix.searcher() as s:
qr = qp.parse(u("b:ccc OR b:eee"))
assert qr.__class__ == query.Or
r = s.search(qr)
assert len(r) == 2
assert r[0]["a"] == "Third"
assert r[1]["a"] == "First"
def test_merged():
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
with TempIndex(schema) as ix:
with ix.writer() as w:
w.add_document(id=u("alfa"), content=u("alfa"))
w.add_document(id=u("bravo"), content=u("bravo"))
with ix.searcher() as s:
r = s.search(query.Term("content", u("bravo")))
assert len(r) == 1
assert r[0]["id"] == "bravo"
with ix.writer() as w:
w.add_document(id=u("charlie"), content=u("charlie"))
w.optimize = True
assert len(ix._segments()) == 1
with ix.searcher() as s:
r = s.search(query.Term("content", u("bravo")))
assert len(r) == 1
assert r[0]["id"] == "bravo"
def test_multireader():
sc = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
st = RamStorage()
ix = st.create_index(sc)
w = ix.writer()
w.add_document(id=u("alfa"), content=u("alfa"))
w.add_document(id=u("bravo"), content=u("bravo"))
w.add_document(id=u("charlie"), content=u("charlie"))
w.add_document(id=u("delta"), content=u("delta"))
w.add_document(id=u("echo"), content=u("echo"))
w.add_document(id=u("foxtrot"), content=u("foxtrot"))
w.add_document(id=u("golf"), content=u("golf"))
w.add_document(id=u("hotel"), content=u("hotel"))
w.add_document(id=u("india"), content=u("india"))
w.commit()
with ix.searcher() as s:
r = s.search(query.Term("content", u("bravo")))
assert len(r) == 1
assert r[0]["id"] == "bravo"
w = ix.writer()
w.add_document(id=u("juliet"), content=u("juliet"))
w.add_document(id=u("kilo"), content=u("kilo"))
w.add_document(id=u("lima"), content=u("lima"))
w.add_document(id=u("mike"), content=u("mike"))
w.add_document(id=u("november"), content=u("november"))
w.add_document(id=u("oscar"), content=u("oscar"))
w.add_document(id=u("papa"), content=u("papa"))
w.add_document(id=u("quebec"), content=u("quebec"))
w.add_document(id=u("romeo"), content=u("romeo"))
w.commit()
assert len(ix._segments()) == 2
#r = ix.reader()
#assert r.__class__.__name__ == "MultiReader"
#pr = r.postings("content", u("bravo"))
with ix.searcher() as s:
r = s.search(query.Term("content", u("bravo")))
assert len(r) == 1
assert r[0]["id"] == "bravo"
def test_posting_phrase():
schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
writer.add_document(name=u("A"),
value=u("Little Miss Muffet sat on a tuffet"))
writer.add_document(name=u("B"), value=u("Miss Little Muffet tuffet"))
writer.add_document(name=u("C"), value=u("Miss Little Muffet tuffet sat"))
writer.add_document(name=u("D"),
value=u("Gibberish blonk falunk miss muffet sat " +
"tuffet garbonzo"))
writer.add_document(name=u("E"), value=u("Blah blah blah pancakes"))
writer.commit()
with ix.searcher() as s:
def names(results):
return sorted([fields['name'] for fields in results])
q = query.Phrase("value", [u("little"), u("miss"), u("muffet"),
u("sat"), u("tuffet")])
m = q.matcher(s)
assert m.__class__.__name__ == "SpanNear2Matcher"
r = s.search(q)
assert names(r) == ["A"]
assert len(r) == 1
q = query.Phrase("value", [u("miss"), u("muffet"), u("sat"),
u("tuffet")])
assert names(s.search(q)) == ["A", "D"]
q = query.Phrase("value", [u("falunk"), u("gibberish")])
r = s.search(q)
assert not names(r)
assert len(r) == 0
q = query.Phrase("value", [u("gibberish"), u("falunk")], slop=2)
assert names(s.search(q)) == ["D"]
q = query.Phrase("value", [u("blah")] * 4)
assert not names(s.search(q)) # blah blah blah blah
q = query.Phrase("value", [u("blah")] * 3)
m = q.matcher(s)
assert names(s.search(q)) == ["E"]
def test_phrase_score():
schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
writer.add_document(name=u("A"),
value=u("Little Miss Muffet sat on a tuffet"))
writer.add_document(name=u("D"),
value=u("Gibberish blonk falunk miss muffet sat " +
"tuffet garbonzo"))
writer.add_document(name=u("E"), value=u("Blah blah blah pancakes"))
writer.add_document(name=u("F"),
value=u("Little miss muffet little miss muffet"))
writer.commit()
with ix.searcher() as s:
q = query.Phrase("value", [u("little"), u("miss"), u("muffet")])
m = q.matcher(s)
assert m.id() == 0
score1 = m.weight()
assert score1 > 0
m.next()
assert m.id() == 3
assert m.weight() > score1
def test_stop_phrase():
schema = fields.Schema(title=fields.TEXT(stored=True))
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
writer.add_document(title=u("Richard of York"))
writer.add_document(title=u("Lily the Pink"))
writer.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("title", schema)
q = qp.parse(u("richard of york"))
assert q.__unicode__() == "(title:richard AND title:york)"
assert len(s.search(q)) == 1
#q = qp.parse(u("lily the pink"))
#assert len(s.search(q)), 1)
assert len(s.find("title", u("lily the pink"))) == 1
def test_phrase_order():
tfield = fields.TEXT(stored=True, analyzer=analysis.SimpleAnalyzer())
schema = fields.Schema(text=tfield)
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
for ls in permutations(["ape", "bay", "can", "day"], 4):
writer.add_document(text=u(" ").join(ls))
writer.commit()
with ix.searcher() as s:
def result(q):
r = s.search(q, limit=None, sortedby=None)
return sorted([d['text'] for d in r])
q = query.Phrase("text", ["bay", "can", "day"])
assert result(q) == [u('ape bay can day'), u('bay can day ape')]
def test_phrase_sameword():
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
storage = RamStorage()
ix = storage.create_index(schema)
writer = ix.writer()
writer.add_document(id=1, text=u("The film Linda Linda Linda is good"))
writer.add_document(id=2, text=u("The model Linda Evangelista is pretty"))
writer.commit()
with ix.searcher() as s:
r = s.search(query.Phrase("text", ["linda", "linda", "linda"]),
limit=None)
assert len(r) == 1
assert r[0]["id"] == 1
def test_phrase_multi():
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
ix = RamStorage().create_index(schema)
domain = u("alfa bravo charlie delta echo").split()
w = None
for i, ls in enumerate(permutations(domain)):
if w is None:
w = ix.writer()
w.add_document(id=i, text=u(" ").join(ls))
if not i % 30:
w.commit()
w = None
if w is not None:
w.commit()
with ix.searcher() as s:
q = query.Phrase("text", ["alfa", "bravo"])
_ = s.search(q)
def test_missing_field_scoring():
schema = fields.Schema(name=fields.TEXT(stored=True),
hobbies=fields.TEXT(stored=True))
with TempIndex(schema) as ix:
with ix.writer() as w:
w.add_document(name=u('Frank'), hobbies=u('baseball, basketball'))
with ix.reader() as r:
assert r.field_length("hobbies") == 2
assert r.field_length("name") == 1
with ix.writer() as w:
w.add_document(name=u('Jonny'))
with ix.searcher() as s:
assert s.field_length("hobbies") == 2
assert s.field_length("name") == 2
parser = qparser.MultifieldParser(['name', 'hobbies'], schema)
q = parser.parse(u("baseball"))
result = s.search(q)
assert len(result) == 1
def test_search_fieldname_underscores():
s = fields.Schema(my_name=fields.ID(stored=True), my_value=fields.TEXT)
st = RamStorage()
ix = st.create_index(s)
w = ix.writer()
w.add_document(my_name=u("Green"), my_value=u("It's not easy being green"))
w.add_document(my_name=u("Red"),
my_value=u("Hopping mad like a playground ball"))
w.commit()
qp = qparser.QueryParser("my_value", schema=s)
with ix.searcher() as s:
r = s.search(qp.parse(u("my_name:Green")))
assert r[0]['my_name'] == "Green"
def test_short_prefix():
s = fields.Schema(name=fields.ID, value=fields.TEXT)
qp = qparser.QueryParser("value", schema=s)
q = qp.parse(u("s*"))
assert q.__class__.__name__ == "Prefix"
assert q.text == "s"
def test_weighting():
from whoosh.scoring import Weighting, BaseScorer
schema = fields.Schema(id=fields.ID(stored=True),
n_comments=fields.STORED)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("1"), n_comments=5)
w.add_document(id=u("2"), n_comments=12)
w.add_document(id=u("3"), n_comments=2)
w.add_document(id=u("4"), n_comments=7)
w.commit()
# Fake Weighting implementation
class CommentWeighting(Weighting):
def scorer(self, searcher, fieldname, text, qf=1):
return self.CommentScorer(searcher.stored_fields)
class CommentScorer(BaseScorer):
def __init__(self, stored_fields):
self.stored_fields = stored_fields
def score(self, matcher):
sf = self.stored_fields(matcher.id())
ncomments = sf.get("n_comments", 0)
return ncomments
with ix.searcher(weighting=CommentWeighting()) as s:
q = query.TermRange("id", u("1"), u("4"), constantscore=False)
r = s.search(q)
ids = [fs["id"] for fs in r]
assert ids == ["2", "4", "1", "3"]
def test_dismax():
schema = fields.Schema(id=fields.STORED,
f1=fields.TEXT, f2=fields.TEXT, f3=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, f1=u("alfa bravo charlie delta"),
f2=u("alfa alfa alfa"),
f3=u("alfa echo foxtrot hotel india"))
w.commit()
with ix.searcher(weighting=scoring.Frequency()) as s:
assert list(s.documents(f1="alfa")) == [{"id": 1}]
assert list(s.documents(f2="alfa")) == [{"id": 1}]
assert list(s.documents(f3="alfa")) == [{"id": 1}]
qs = [query.Term("f1", "alfa"), query.Term("f2", "alfa"),
query.Term("f3", "alfa")]
dm = query.DisjunctionMax(qs)
r = s.search(dm)
assert r.score(0) == 3.0
def test_deleted_wildcard():
schema = fields.Schema(id=fields.ID(stored=True))
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("alfa"))
w.add_document(id=u("bravo"))
w.add_document(id=u("charlie"))
w.add_document(id=u("delta"))
w.add_document(id=u("echo"))
w.add_document(id=u("foxtrot"))
w.commit()
w = ix.writer()
w.delete_by_term("id", "bravo")
w.delete_by_term("id", "delta")
w.delete_by_term("id", "echo")
w.commit()
with ix.searcher() as s:
r = s.search(query.Every("id"))
assert sorted([d['id'] for d in r]) == ["alfa", "charlie", "foxtrot"]
def test_missing_wildcard():
schema = fields.Schema(id=fields.ID(stored=True), f1=fields.TEXT,
f2=fields.TEXT)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("1"), f1=u("alfa"), f2=u("apple"))
w.add_document(id=u("2"), f1=u("bravo"))
w.add_document(id=u("3"), f1=u("charlie"), f2=u("candy"))
w.add_document(id=u("4"), f2=u("donut"))
w.add_document(id=u("5"))
w.commit()
with ix.searcher() as s:
r = s.search(query.Every("id"))
assert sorted([d['id'] for d in r]) == ["1", "2", "3", "4", "5"]
r = s.search(query.Every("f1"))
assert sorted([d['id'] for d in r]) == ["1", "2", "3"]
r = s.search(query.Every("f2"))
assert sorted([d['id'] for d in r]) == ["1", "3", "4"]
def test_finalweighting():
from whoosh.scoring import Frequency
schema = fields.Schema(id=fields.ID(stored=True),
summary=fields.TEXT,
n_comments=fields.STORED)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("1"), summary=u("alfa bravo"), n_comments=5)
w.add_document(id=u("2"), summary=u("alfa"), n_comments=12)
w.add_document(id=u("3"), summary=u("bravo"), n_comments=2)
w.add_document(id=u("4"), summary=u("bravo bravo"), n_comments=7)
w.commit()
class CommentWeighting(Frequency):
use_final = True
def final(self, searcher, docnum, score):
ncomments = searcher.stored_fields(docnum).get("n_comments", 0)
return ncomments
with ix.searcher(weighting=CommentWeighting()) as s:
q = qparser.QueryParser("summary", None).parse("alfa OR bravo")
r = s.search(q)
ids = [fs["id"] for fs in r]
assert ["2", "4", "1", "3"] == ids
def test_outofdate():
schema = fields.Schema(id=fields.ID(stored=True))
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
w.add_document(id=u("1"))
w.add_document(id=u("2"))
w.commit()
s = ix.searcher()
assert s.up_to_date()
w = ix.writer()
w.add_document(id=u("3"))
w.add_document(id=u("4"))
assert s.up_to_date()
w.commit()
assert not s.up_to_date()
s = s.refresh()
assert s.up_to_date()
s.close()
def test_find_missing():
schema = fields.Schema(id=fields.ID, text=fields.KEYWORD(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=u("1"), text=u("alfa"))
w.add_document(id=u("2"), text=u("bravo"))
w.add_document(text=u("charlie"))
w.add_document(id=u("4"), text=u("delta"))
w.add_document(text=u("echo"))
w.add_document(id=u("6"), text=u("foxtrot"))
w.add_document(text=u("golf"))
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("text", schema)
q = qp.parse(u("NOT id:*"))
r = s.search(q, limit=None)
assert list(h["text"] for h in r) == ["charlie", "echo", "golf"]
def test_ngram_phrase():
f = fields.NGRAM(minsize=2, maxsize=2, phrase=True)
schema = fields.Schema(text=f, path=fields.ID(stored=True))
ix = RamStorage().create_index(schema)
writer = ix.writer()
writer.add_document(text=u('\u9AD8\u6821\u307E\u3067\u306F\u6771\u4EAC'
'\u3067\u3001\u5927\u5B66\u304B\u3089\u306F'
'\u4EAC\u5927\u3067\u3059\u3002'),
path=u('sample'))
writer.commit()
with ix.searcher() as s:
p = qparser.QueryParser("text", schema)
q = p.parse(u('\u6771\u4EAC\u5927\u5B66'))
assert len(s.search(q)) == 1
q = p.parse(u('"\u6771\u4EAC\u5927\u5B66"'))
assert len(s.search(q)) == 0
q = p.parse(u('"\u306F\u6771\u4EAC\u3067"'))
assert len(s.search(q)) == 1
def test_ordered():
domain = u("alfa bravo charlie delta echo foxtrot").split(" ")
schema = fields.Schema(f=fields.TEXT(stored=True))
ix = RamStorage().create_index(schema)
writer = ix.writer()
for ls in permutations(domain):
writer.add_document(f=u(" ").join(ls))
writer.commit()
with ix.searcher() as s:
q = query.Ordered([query.Term("f", u("alfa")),
query.Term("f", u("charlie")),
query.Term("f", u("echo"))])
r = s.search(q)
for hit in r:
ls = hit["f"].split()
assert "alfa" in ls
assert "charlie" in ls
assert "echo" in ls
a = ls.index("alfa")
c = ls.index("charlie")
e = ls.index("echo")
assert a < c and c < e, repr(ls)
def test_otherwise():
schema = fields.Schema(id=fields.STORED, f=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, f=u("alfa one two"))
w.add_document(id=2, f=u("alfa three four"))
w.add_document(id=3, f=u("bravo four five"))
w.add_document(id=4, f=u("bravo six seven"))
w.commit()
with ix.searcher() as s:
q = query.Otherwise(query.Term("f", u("alfa")),
query.Term("f", u("six")))
assert [d["id"] for d in s.search(q)] == [1, 2]
q = query.Otherwise(query.Term("f", u("tango")),
query.Term("f", u("four")))
assert [d["id"] for d in s.search(q)] == [2, 3]
q = query.Otherwise(query.Term("f", u("tango")),
query.Term("f", u("nine")))
assert [d["id"] for d in s.search(q)] == []
def test_fuzzyterm():
schema = fields.Schema(id=fields.STORED, f=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, f=u("alfa bravo charlie delta"))
w.add_document(id=2, f=u("bravo charlie delta echo"))
w.add_document(id=3, f=u("charlie delta echo foxtrot"))
w.add_document(id=4, f=u("delta echo foxtrot golf"))
w.commit()
with ix.searcher() as s:
q = query.FuzzyTerm("f", "brave")
assert [d["id"] for d in s.search(q)] == [1, 2]
def test_fuzzyterm2():
schema = fields.Schema(id=fields.STORED, f=fields.TEXT(spelling=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, f=u("alfa bravo charlie delta"))
w.add_document(id=2, f=u("bravo charlie delta echo"))
w.add_document(id=3, f=u("charlie delta echo foxtrot"))
w.add_document(id=4, f=u("delta echo foxtrot golf"))
w.commit()
with ix.searcher() as s:
assert list(s.reader().terms_within("f", u("brave"), 1)) == ["bravo"]
q = query.FuzzyTerm("f", "brave")
assert [d["id"] for d in s.search(q)] == [1, 2]
def test_multireader_not():
schema = fields.Schema(id=fields.STORED, f=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=0, f=u("alfa bravo chralie"))
w.add_document(id=1, f=u("bravo chralie delta"))
w.add_document(id=2, f=u("charlie delta echo"))
w.add_document(id=3, f=u("delta echo foxtrot"))
w.add_document(id=4, f=u("echo foxtrot golf"))
w.commit()
with ix.searcher() as s:
q = query.And([query.Term("f", "delta"),
query.Not(query.Term("f", "delta"))])
r = s.search(q)
assert len(r) == 0
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=5, f=u("alfa bravo chralie"))
w.add_document(id=6, f=u("bravo chralie delta"))
w.commit(merge=False)
w = ix.writer()
w.add_document(id=7, f=u("charlie delta echo"))
w.add_document(id=8, f=u("delta echo foxtrot"))
w.commit(merge=False)
w = ix.writer()
w.add_document(id=9, f=u("echo foxtrot golf"))
w.add_document(id=10, f=u("foxtrot golf delta"))
w.commit(merge=False)
assert len(ix._segments()) > 1
with ix.searcher() as s:
q = query.And([query.Term("f", "delta"),
query.Not(query.Term("f", "delta"))])
r = s.search(q)
assert len(r) == 0
def test_boost_phrase():
schema = fields.Schema(title=fields.TEXT(field_boost=5.0, stored=True),
text=fields.TEXT)
ix = RamStorage().create_index(schema)
domain = u("alfa bravo charlie delta").split()
w = ix.writer()
for ls in permutations(domain):
t = u(" ").join(ls)
w.add_document(title=t, text=t)
w.commit()
q = query.Or([query.Term("title", u("alfa")),
query.Term("title", u("bravo")),
query.Phrase("text", [u("bravo"), u("charlie"), u("delta")])
])
def boost_phrases(q):
if isinstance(q, query.Phrase):
q.boost *= 1000.0
return q
else:
return q.apply(boost_phrases)
q = boost_phrases(q)
with ix.searcher() as s:
r = s.search(q, limit=None)
for hit in r:
if "bravo charlie delta" in hit["title"]:
assert hit.score > 100.0
def test_filter():
schema = fields.Schema(id=fields.STORED, path=fields.ID, text=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, path=u("/a/1"), text=u("alfa bravo charlie"))
w.add_document(id=2, path=u("/b/1"), text=u("bravo charlie delta"))
w.add_document(id=3, path=u("/c/1"), text=u("charlie delta echo"))
w.commit(merge=False)
w = ix.writer()
w.add_document(id=4, path=u("/a/2"), text=u("delta echo alfa"))
w.add_document(id=5, path=u("/b/2"), text=u("echo alfa bravo"))
w.add_document(id=6, path=u("/c/2"), text=u("alfa bravo charlie"))
w.commit(merge=False)
w = ix.writer()
w.add_document(id=7, path=u("/a/3"), text=u("bravo charlie delta"))
w.add_document(id=8, path=u("/b/3"), text=u("charlie delta echo"))
w.add_document(id=9, path=u("/c/3"), text=u("delta echo alfa"))
w.commit(merge=False)
with ix.searcher() as s:
fq = query.Or([query.Prefix("path", "/a"),
query.Prefix("path", "/b")])
r = s.search(query.Term("text", "alfa"), filter=fq)
assert [d["id"] for d in r] == [1, 4, 5]
r = s.search(query.Term("text", "bravo"), filter=fq)
assert [d["id"] for d in r] == [1, 2, 5, 7, ]
def test_fieldboost():
schema = fields.Schema(id=fields.STORED, a=fields.TEXT, b=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=0, a=u("alfa bravo charlie"), b=u("echo foxtrot india"))
w.add_document(id=1, a=u("delta bravo charlie"), b=u("alfa alfa alfa"))
w.add_document(id=2, a=u("alfa alfa alfa"), b=u("echo foxtrot india"))
w.add_document(id=3, a=u("alfa sierra romeo"), b=u("alfa tango echo"))
w.add_document(id=4, a=u("bravo charlie delta"), b=u("alfa foxtrot india"))
w.add_document(id=5, a=u("alfa alfa echo"), b=u("tango tango tango"))
w.add_document(id=6, a=u("alfa bravo echo"), b=u("alfa alfa tango"))
w.commit()
def field_booster(fieldname, factor=2.0):
"Returns a function which will boost the given field in a query tree"
def booster_fn(obj):
if obj.is_leaf() and obj.field() == fieldname:
obj = copy.deepcopy(obj)
obj.boost *= factor
return obj
else:
return obj
return booster_fn
with ix.searcher() as s:
q = query.Or([query.Term("a", u("alfa")),
query.Term("b", u("alfa"))])
q = q.accept(field_booster("a", 100.0))
assert text_type(q) == text_type("(a:alfa^100.0 OR b:alfa)")
r = s.search(q)
assert [hit["id"] for hit in r] == [2, 5, 6, 3, 0, 1, 4]
def test_andmaybe_quality():
schema = fields.Schema(id=fields.STORED, title=fields.TEXT(stored=True),
year=fields.NUMERIC)
ix = RamStorage().create_index(schema)
domain = [(u('Alpha Bravo Charlie Delta'), 2000),
(u('Echo Bravo Foxtrot'), 2000), (u('Bravo Golf Hotel'), 2002),
(u('Bravo India'), 2002), (u('Juliet Kilo Bravo'), 2004),
(u('Lima Bravo Mike'), 2004)]
w = ix.writer()
for title, year in domain:
w.add_document(title=title, year=year)
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("title", ix.schema)
q = qp.parse(u("title:bravo ANDMAYBE year:2004"))
titles = [hit["title"] for hit in s.search(q, limit=None)[:2]]
assert "Juliet Kilo Bravo" in titles
titles = [hit["title"] for hit in s.search(q, limit=2)]
assert "Juliet Kilo Bravo" in titles
def test_collect_limit():
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id="a", text=u("alfa bravo charlie delta echo"))
w.add_document(id="b", text=u("bravo charlie delta echo foxtrot"))
w.add_document(id="c", text=u("charlie delta echo foxtrot golf"))
w.add_document(id="d", text=u("delta echo foxtrot golf hotel"))
w.add_document(id="e", text=u("echo foxtrot golf hotel india"))
w.commit()
with ix.searcher() as s:
r = s.search(query.Term("text", u("golf")), limit=10)
assert len(r) == 3
count = 0
for _ in r:
count += 1
assert count == 3
w = ix.writer()
w.add_document(id="f", text=u("foxtrot golf hotel india juliet"))
w.add_document(id="g", text=u("golf hotel india juliet kilo"))
w.add_document(id="h", text=u("hotel india juliet kilo lima"))
w.add_document(id="i", text=u("india juliet kilo lima mike"))
w.add_document(id="j", text=u("juliet kilo lima mike november"))
w.commit(merge=False)
with ix.searcher() as s:
r = s.search(query.Term("text", u("golf")), limit=20)
assert len(r) == 5
count = 0
for _ in r:
count += 1
assert count == 5
def test_scorer():
schema = fields.Schema(key=fields.TEXT(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(key=u("alfa alfa alfa"))
w.add_document(key=u("alfa alfa alfa alfa"))
w.add_document(key=u("alfa alfa"))
w.commit()
w = ix.writer()
w.add_document(key=u("alfa alfa alfa alfa alfa alfa"))
w.add_document(key=u("alfa"))
w.add_document(key=u("alfa alfa alfa alfa alfa"))
w.commit(merge=False)
# dw = scoring.DebugModel()
# s = ix.searcher(weighting=dw)
# r = s.search(query.Term("key", "alfa"))
# log = dw.log
# assert log, [('key', 'alfa', 0, 3.0, 3),
# ('key', 'alfa', 1, 4.0, 4),
# ('key', 'alfa', 2, 2.0, 2),
# ('key', 'alfa', 0, 6.0, 6),
# ('key', 'alfa', 1, 1.0, 1),
# ('key', 'alfa', 2, 5.0, 5)])
def test_pos_scorer():
ana = analysis.SimpleAnalyzer()
schema = fields.Schema(id=fields.STORED, key=fields.TEXT(analyzer=ana))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=0, key=u("0 0 1 0 0 0"))
w.add_document(id=1, key=u("0 0 0 1 0 0"))
w.add_document(id=2, key=u("0 1 0 0 0 0"))
w.commit()
w = ix.writer()
w.add_document(id=3, key=u("0 0 0 0 0 1"))
w.add_document(id=4, key=u("1 0 0 0 0 0"))
w.add_document(id=5, key=u("0 0 0 0 1 0"))
w.commit(merge=False)
def pos_score_fn(searcher, fieldname, text, matcher):
poses = matcher.value_as("positions")
return 1.0 / (poses[0] + 1)
pos_weighting = scoring.FunctionWeighting(pos_score_fn)
s = ix.searcher(weighting=pos_weighting)
r = s.search(query.Term("key", "1"))
assert [hit["id"] for hit in r] == [4, 2, 0, 1, 5, 3]
# def test_too_many_prefix_positions():
# schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
# ix = RamStorage().create_index(schema)
# with ix.writer() as w:
# for i in xrange(200):
# text = u("a%s" % i)
# w.add_document(id=i, text=text)
#
# q = query.Prefix("text", u("a"))
# q.TOO_MANY_CLAUSES = 100
#
# with ix.searcher() as s:
# m = q.matcher(s)
# assert m.supports("positions")
# items = list(m.items_as("positions"))
# assert [(i, [0]) for i in xrange(200)] == items
def test_collapse():
from whoosh import collectors
# id, text, size, tag
domain = [("a", "blah blah blah", 5, "x"),
("b", "blah", 3, "y"),
("c", "blah blah blah blah", 2, "z"),
("d", "blah blah", 4, "x"),
("e", "bloop", 1, "-"),
("f", "blah blah blah blah blah", 6, "x"),
("g", "blah", 8, "w"),
("h", "blah blah", 7, "=")]
schema = fields.Schema(id=fields.STORED, text=fields.TEXT,
size=fields.NUMERIC,
tag=fields.KEYWORD(sortable=True))
ix = RamStorage().create_index(schema)
with ix.writer(codec=W3Codec()) as w:
for id, text, size, tag in domain:
w.add_document(id=u(id), text=u(text), size=size, tag=u(tag))
with ix.searcher() as s:
q = query.Term("text", "blah")
r = s.search(q, limit=None)
assert " ".join(hit["id"] for hit in r) == "f c a d h b g"
col = s.collector(limit=3)
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(q, col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "f c h"
col = s.collector(limit=None)
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(q, col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "f c h b g"
r = s.search(query.Every(), sortedby="size")
assert " ".join(hit["id"] for hit in r) == "e c b d a f h g"
col = s.collector(sortedby="size")
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(query.Every(), col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "e c b d h g"
def test_collapse_nocolumn():
from whoosh import collectors
# id, text, size, tag
domain = [("a", "blah blah blah", 5, "x"),
("b", "blah", 3, "y"),
("c", "blah blah blah blah", 2, "z"),
("d", "blah blah", 4, "x"),
("e", "bloop", 1, "-"),
("f", "blah blah blah blah blah", 6, "x"),
("g", "blah", 8, "w"),
("h", "blah blah", 7, "=")]
schema = fields.Schema(id=fields.STORED, text=fields.TEXT,
size=fields.NUMERIC,
tag=fields.KEYWORD)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
for id, text, size, tag in domain:
w.add_document(id=u(id), text=u(text), size=size, tag=u(tag))
with ix.searcher() as s:
q = query.Term("text", "blah")
r = s.search(q, limit=None)
assert " ".join(hit["id"] for hit in r) == "f c a d h b g"
col = s.collector(limit=3)
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(q, col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "f c h"
col = s.collector(limit=None)
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(q, col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "f c h b g"
r = s.search(query.Every(), sortedby="size")
assert " ".join(hit["id"] for hit in r) == "e c b d a f h g"
col = s.collector(sortedby="size")
col = collectors.CollapseCollector(col, "tag")
s.search_with_collector(query.Every(), col)
r = col.results()
assert " ".join(hit["id"] for hit in r) == "e c b d h g"
def test_collapse_length():
domain = u("alfa apple agnostic aplomb arc "
"bravo big braid beer "
"charlie crouch car "
"delta dog "
"echo "
"foxtrot fold flip "
"golf gym goop"
).split()
schema = fields.Schema(key=fields.ID(sortable=True),
word=fields.ID(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer(codec=W3Codec()) as w:
for word in domain:
w.add_document(key=word[0], word=word)
with ix.searcher() as s:
q = query.Every()
def check(r):
words = " ".join(hit["word"] for hit in r)
assert words == "alfa bravo charlie delta echo foxtrot golf"
assert r.scored_length() == 7
assert len(r) == 7
r = s.search(q, collapse="key", collapse_limit=1, limit=None)
check(r)
r = s.search(q, collapse="key", collapse_limit=1, limit=50)
check(r)
r = s.search(q, collapse="key", collapse_limit=1, limit=10)
check(r)
def test_collapse_length_nocolumn():
domain = u("alfa apple agnostic aplomb arc "
"bravo big braid beer "
"charlie crouch car "
"delta dog "
"echo "
"foxtrot fold flip "
"golf gym goop"
).split()
schema = fields.Schema(key=fields.ID(),
word=fields.ID(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
for word in domain:
w.add_document(key=word[0], word=word)
with ix.searcher() as s:
q = query.Every()
def check(r):
words = " ".join(hit["word"] for hit in r)
assert words == "alfa bravo charlie delta echo foxtrot golf"
assert r.scored_length() == 7
assert len(r) == 7
r = s.search(q, collapse="key", collapse_limit=1, limit=None)
check(r)
r = s.search(q, collapse="key", collapse_limit=1, limit=50)
check(r)
r = s.search(q, collapse="key", collapse_limit=1, limit=10)
check(r)
def test_collapse_order():
from whoosh import sorting
schema = fields.Schema(id=fields.STORED,
price=fields.NUMERIC(sortable=True),
rating=fields.NUMERIC(sortable=True),
tag=fields.ID(sortable=True))
ix = RamStorage().create_index(schema)
with ix.writer(codec=W3Codec()) as w:
w.add_document(id="a", price=10, rating=1, tag=u("x"))
w.add_document(id="b", price=80, rating=3, tag=u("y"))
w.add_document(id="c", price=60, rating=1, tag=u("z"))
w.add_document(id="d", price=30, rating=2)
w.add_document(id="e", price=50, rating=3, tag=u("x"))
w.add_document(id="f", price=20, rating=1, tag=u("y"))
w.add_document(id="g", price=50, rating=2, tag=u("z"))
w.add_document(id="h", price=90, rating=5)
w.add_document(id="i", price=50, rating=5, tag=u("x"))
w.add_document(id="j", price=40, rating=1, tag=u("y"))
w.add_document(id="k", price=50, rating=4, tag=u("z"))
w.add_document(id="l", price=70, rating=2)
with ix.searcher() as s:
def check(kwargs, target):
r = s.search(query.Every(), limit=None, **kwargs)
assert " ".join(hit["id"] for hit in r) == target
price = sorting.FieldFacet("price", reverse=True)
rating = sorting.FieldFacet("rating", reverse=True)
tag = sorting.FieldFacet("tag")
check(dict(sortedby=price), "h b l c e g i k j d f a")
check(dict(sortedby=price, collapse=tag), "h b l c e d")
check(dict(sortedby=price, collapse=tag, collapse_order=rating),
"h b l i k d")
def test_collapse_order_nocolumn():
from whoosh import sorting
schema = fields.Schema(id=fields.STORED,
price=fields.NUMERIC(),
rating=fields.NUMERIC(),
tag=fields.ID())
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(id="a", price=10, rating=1, tag=u("x"))
w.add_document(id="b", price=80, rating=3, tag=u("y"))
w.add_document(id="c", price=60, rating=1, tag=u("z"))
w.add_document(id="d", price=30, rating=2)
w.add_document(id="e", price=50, rating=3, tag=u("x"))
w.add_document(id="f", price=20, rating=1, tag=u("y"))
w.add_document(id="g", price=50, rating=2, tag=u("z"))
w.add_document(id="h", price=90, rating=5)
w.add_document(id="i", price=50, rating=5, tag=u("x"))
w.add_document(id="j", price=40, rating=1, tag=u("y"))
w.add_document(id="k", price=50, rating=4, tag=u("z"))
w.add_document(id="l", price=70, rating=2)
with ix.searcher() as s:
def check(kwargs, target):
r = s.search(query.Every(), limit=None, **kwargs)
assert " ".join(hit["id"] for hit in r) == target
price = sorting.FieldFacet("price", reverse=True)
rating = sorting.FieldFacet("rating", reverse=True)
tag = sorting.FieldFacet("tag")
check(dict(sortedby=price), "h b l c e g i k j d f a")
check(dict(sortedby=price, collapse=tag), "h b l c e d")
check(dict(sortedby=price, collapse=tag, collapse_order=rating),
"h b l i k d")
def test_coord():
from whoosh.matching import CoordMatcher
schema = fields.Schema(id=fields.STORED, hits=fields.STORED,
tags=fields.KEYWORD)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(id=0, hits=0, tags=u("blah blah blah blah"))
w.add_document(id=1, hits=0, tags=u("echo echo blah blah"))
w.add_document(id=2, hits=1, tags=u("bravo charlie delta echo"))
w.add_document(id=3, hits=2, tags=u("charlie delta echo foxtrot"))
w.add_document(id=4, hits=3, tags=u("delta echo foxtrot golf"))
w.add_document(id=5, hits=3, tags=u("echo foxtrot golf hotel"))
w.add_document(id=6, hits=2, tags=u("foxtrot golf hotel india"))
w.add_document(id=7, hits=1, tags=u("golf hotel india juliet"))
w.add_document(id=8, hits=0, tags=u("foxtrot foxtrot foo foo"))
w.add_document(id=9, hits=0, tags=u("foo foo foo foo"))
og = qparser.OrGroup.factory(0.99)
qp = qparser.QueryParser("tags", schema, group=og)
q = qp.parse("golf foxtrot echo")
assert q.__class__ == query.Or
assert q.scale == 0.99
with ix.searcher() as s:
m = q.matcher(s)
assert type(m) == CoordMatcher
r = s.search(q, optimize=False)
assert [hit["id"] for hit in r] == [4, 5, 3, 6, 1, 8, 2, 7]
def test_keyword_search():
schema = fields.Schema(tags=fields.KEYWORD)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(tags=u("keyword1 keyword2 keyword3 keyword4 keyword5"))
with ix.searcher() as s:
r = s.search_page(query.Term("tags", "keyword3"), 1)
assert r
def test_groupedby_with_terms():
schema = fields.Schema(content=fields.TEXT, organism=fields.ID)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(organism=u("mus"), content=u("IPFSTD1 IPFSTD_kdwq134 Kaminski-all Study00:00:00"))
w.add_document(organism=u("mus"), content=u("IPFSTD1 IPFSTD_kdwq134 Kaminski-all Study"))
w.add_document(organism=u("hs"), content=u("This is the first document we've added!"))
with ix.searcher() as s:
q = qparser.QueryParser("content", schema=ix.schema).parse(u("IPFSTD1"))
r = s.search(q, groupedby=["organism"], terms=True)
assert len(r) == 2
assert r.groups("organism") == {"mus": [1, 0]}
assert r.has_matched_terms()
assert r.matched_terms() == set([('content', b('ipfstd1'))])
def test_score_length():
schema = fields.Schema(a=fields.TEXT, b=fields.TEXT)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(a=u("alfa bravo charlie"))
w.add_document(b=u("delta echo foxtrot"))
w.add_document(a=u("golf hotel india"))
with ix.writer() as w:
w.merge = False
w.add_document(b=u("juliet kilo lima"))
# In the second segment, there is an "a" field here, but in the
# corresponding document in the first segment, the field doesn't exist,
# so if the scorer is getting segment offsets wrong, scoring this
# document will error
w.add_document(a=u("mike november oskar"))
w.add_document(b=u("papa quebec romeo"))
with ix.searcher() as s:
assert not s.is_atomic()
p = s.postings("a", "mike")
while p.is_active():
docnum = p.id()
score = p.score()
p.next()
def test_terms_with_filter():
schema = fields.Schema(text=fields.TEXT)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(text=u("alfa bravo charlie delta"))
w.add_document(text=u("bravo charlie delta echo"))
w.add_document(text=u("charlie delta echo foxtrot"))
w.add_document(text=u("delta echo foxtrot golf"))
w.add_document(text=u("echo foxtrot golf hotel"))
w.add_document(text=u("foxtrot golf hotel alfa"))
w.add_document(text=u("golf hotel alfa bravo"))
w.add_document(text=u("hotel alfa bravo charlie"))
with ix.searcher() as s:
workingset = set([1, 2, 3])
q = query.Term("text", u("foxtrot"))
r = s.search_page(q, pagenum=1, pagelen=5, terms=True,
filter=workingset)
assert r.scored_length() == 2
assert [hit.docnum for hit in r] == [2, 3]
def test_terms_to_bytes():
schema = fields.Schema(a=fields.TEXT, b=fields.NUMERIC, id=fields.STORED)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(id=0, a=u("alfa bravo"), b=100)
w.add_document(id=1, a=u("bravo charlie"), b=200)
w.add_document(id=2, a=u("charlie delta"), b=100)
w.add_document(id=3, a=u("delta echo"), b=200)
with ix.searcher() as s:
t1 = query.Term("b", 200)
t2 = query.Term("a", "bravo")
q = query.And([t1, t2])
r = s.search(q)
assert [hit["id"] for hit in r] == [1]
def test_issue_334():
schema = fields.Schema(
kind=fields.ID(stored=True),
name=fields.ID(stored=True),
returns=fields.ID(stored=True),
)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(kind=u('class'), name=u('Index'))
w.add_document(kind=u('method'), name=u('add document'),
returns=u('void'))
w.add_document(kind=u('method'), name=u('add reader'),
returns=u('void'))
w.add_document(kind=u('method'), name=u('close'),
returns=u('void'))
with w.group():
w.add_document(kind=u('class'), name=u('Accumulator'))
w.add_document(kind=u('method'), name=u('add'),
returns=u('void'))
w.add_document(kind=u('method'), name=u('get result'),
returns=u('number'))
with w.group():
w.add_document(kind=u('class'), name=u('Calculator'))
w.add_document(kind=u('method'), name=u('add'),
returns=u('number'))
w.add_document(kind=u('method'), name=u('add all'),
returns=u('number'))
w.add_document(kind=u('method'), name=u('add some'),
returns=u('number'))
w.add_document(kind=u('method'), name=u('multiply'),
returns=u('number'))
w.add_document(kind=u('method'), name=u('close'),
returns=u('void'))
with w.group():
w.add_document(kind=u('class'), name=u('Deleter'))
w.add_document(kind=u('method'), name=u('add'),
returns=u('void'))
w.add_document(kind=u('method'), name=u('delete'),
returns=u('void'))
with ix.searcher() as s:
pq = query.Term('kind', 'class')
cq = query.Term('name', 'Calculator')
q = query.NestedChildren(pq, cq) & query.Term('returns', 'void')
r = s.search(q)
assert len(r) == 1
assert r[0]["name"] == u("close")
def test_find_decimals():
from decimal import Decimal
schema = fields.Schema(name=fields.KEYWORD(stored=True),
num=fields.NUMERIC(Decimal, decimal_places=5))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(name=u("alfa"), num=Decimal("1.5"))
w.add_document(name=u("bravo"), num=Decimal("2.1"))
w.add_document(name=u("charlie"), num=Decimal("5.3"))
w.add_document(name=u("delta"), num=Decimal(3))
w.add_document(name=u("echo"), num=Decimal("3.00001"))
w.add_document(name=u("foxtrot"), num=Decimal("3"))
qp = qparser.QueryParser("name", ix.schema)
q = qp.parse("num:3.0")
assert isinstance(q, query.Term)
with ix.searcher() as s:
r = s.search(q)
names = " ".join(sorted(hit["name"] for hit in r))
assert names == "delta foxtrot"