debian-python-whoosh/tests/test_nested.py

362 lines
15 KiB
Python

from __future__ import with_statement
from whoosh import fields, qparser, query, sorting
from whoosh.compat import u
from whoosh.filedb.filestore import RamStorage
from whoosh.util.testing import TempIndex
def test_nested_parent():
schema = fields.Schema(name=fields.ID(stored=True), type=fields.ID,
part=fields.ID, price=fields.NUMERIC)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(name=u("iPad"), type=u("product"))
w.add_document(part=u("screen"), price=100)
w.add_document(part=u("battery"), price=50)
w.add_document(part=u("case"), price=20)
with w.group():
w.add_document(name=u("iPhone"), type=u("product"))
w.add_document(part=u("screen"), price=60)
w.add_document(part=u("battery"), price=30)
w.add_document(part=u("case"), price=10)
with w.group():
w.add_document(name=u("Mac mini"), type=u("product"))
w.add_document(part=u("hard drive"), price=50)
w.add_document(part=u("case"), price=50)
with ix.searcher() as s:
price = s.schema["price"]
pq = query.Term("type", "product")
cq = query.Term("price", 50)
q = query.NestedParent(pq, cq)
r = s.search(q)
assert sorted([hit["name"] for hit in r]) == ["Mac mini", "iPad"]
def test_scoring():
schema = fields.Schema(kind=fields.ID,
name=fields.KEYWORD(scorable=True, stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(kind=u("class"), name=u("Index"))
w.add_document(kind=u("method"), name=u("add document"))
w.add_document(kind=u("method"), name=u("add reader"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Accumulator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("get result"))
with w.group():
w.add_document(kind=u("class"), name=u("Calculator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("add all"))
w.add_document(kind=u("method"), name=u("add some"))
w.add_document(kind=u("method"), name=u("multiply"))
w.add_document(kind=u("method"), name=u("close"))
with ix.searcher() as s:
q = query.NestedParent(query.Term("kind", "class"),
query.Term("name", "add"))
r = s.search(q)
assert [hit["name"] for hit in r] == ["Calculator", "Index", "Accumulator"]
def test_missing():
schema = fields.Schema(kind=fields.ID,
name=fields.KEYWORD(scorable=True, stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(kind=u("class"), name=u("Index"))
w.add_document(kind=u("method"), name=u("add document"))
w.add_document(kind=u("method"), name=u("add reader"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Accumulator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("get result"))
with w.group():
w.add_document(kind=u("class"), name=u("Calculator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("add all"))
w.add_document(kind=u("method"), name=u("add some"))
w.add_document(kind=u("method"), name=u("multiply"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Deleter"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("delete"))
with ix.searcher() as s:
q = query.NestedParent(query.Term("kind", "class"),
query.Term("name", "add"))
r = s.search(q)
assert [hit["name"] for hit in r] == ["Calculator", "Index", "Accumulator", "Deleter"]
with ix.writer() as w:
w.delete_by_term("name", "Accumulator")
w.delete_by_term("name", "Calculator")
with ix.searcher() as s:
pq = query.Term("kind", "class")
assert len(list(pq.docs(s))) == 2
q = query.NestedParent(pq, query.Term("name", "add"))
r = s.search(q)
assert [hit["name"] for hit in r] == ["Index", "Deleter"]
def test_nested_delete():
schema = fields.Schema(kind=fields.ID,
name=fields.KEYWORD(scorable=True, stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(kind=u("class"), name=u("Index"))
w.add_document(kind=u("method"), name=u("add document"))
w.add_document(kind=u("method"), name=u("add reader"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Accumulator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("get result"))
with w.group():
w.add_document(kind=u("class"), name=u("Calculator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("add all"))
w.add_document(kind=u("method"), name=u("add some"))
w.add_document(kind=u("method"), name=u("multiply"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Deleter"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("delete"))
# Delete "Accumulator" class
with ix.writer() as w:
q = query.NestedParent(query.Term("kind", "class"),
query.Term("name", "Accumulator"))
w.delete_by_query(q)
# Check that Accumulator AND ITS METHODS are deleted
with ix.searcher() as s:
r = s.search(query.Term("kind", "class"))
assert sorted(hit["name"] for hit in r) == ["Calculator", "Deleter", "Index"]
names = [fs["name"] for _, fs in s.iter_docs()]
assert names == ["Index", "add document", "add reader", "close",
"Calculator", "add", "add all", "add some",
"multiply", "close", "Deleter", "add", "delete"]
# Delete any class with a close method
with ix.writer() as w:
q = query.NestedParent(query.Term("kind", "class"),
query.Term("name", "close"))
w.delete_by_query(q)
# Check the CLASSES AND METHODS are gone
with ix.searcher() as s:
names = [fs["name"] for _, fs in s.iter_docs()]
assert names == ["Deleter", "add", "delete"]
def test_all_parents_deleted():
schema = fields.Schema(kind=fields.ID,
name=fields.KEYWORD(scorable=True, stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(kind=u("class"), name=u("Index"))
w.add_document(kind=u("method"), name=u("add document"))
w.add_document(kind=u("method"), name=u("add reader"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Accumulator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("get result"))
with w.group():
w.add_document(kind=u("class"), name=u("Calculator"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("add all"))
w.add_document(kind=u("method"), name=u("add some"))
w.add_document(kind=u("method"), name=u("multiply"))
w.add_document(kind=u("method"), name=u("close"))
with w.group():
w.add_document(kind=u("class"), name=u("Deleter"))
w.add_document(kind=u("method"), name=u("add"))
w.add_document(kind=u("method"), name=u("delete"))
with ix.writer() as w:
w.delete_by_term("name", "Index")
w.delete_by_term("name", "Accumulator")
w.delete_by_term("name", "Calculator")
w.delete_by_term("name", "Deleter")
with ix.searcher() as s:
q = query.NestedParent(query.Term("kind", "class"),
query.Term("name", "add"))
r = s.search(q)
assert r.is_empty()
def test_everything_is_a_parent():
schema = fields.Schema(id=fields.STORED, kind=fields.ID,
name=fields.ID(stored=True))
k = u("alfa")
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(id=0, kind=k, name=u("one"))
w.add_document(id=1, kind=k, name=u("two"))
w.add_document(id=2, kind=k, name=u("three"))
w.add_document(id=3, kind=k, name=u("four"))
w.add_document(id=4, kind=k, name=u("one"))
w.add_document(id=5, kind=k, name=u("two"))
w.add_document(id=6, kind=k, name=u("three"))
w.add_document(id=7, kind=k, name=u("four"))
w.add_document(id=8, kind=k, name=u("one"))
w.add_document(id=9, kind=k, name=u("two"))
w.add_document(id=10, kind=k, name=u("three"))
w.add_document(id=11, kind=k, name=u("four"))
with ix.searcher() as s:
pq = query.Term("kind", k)
cq = query.Or([query.Term("name", "two"), query.Term("name", "four")])
q = query.NestedParent(pq, cq)
r = s.search(q)
assert [hit["id"] for hit in r] == [1, 3, 5, 7, 9, 11]
def test_no_parents():
schema = fields.Schema(id=fields.STORED, kind=fields.ID,
name=fields.ID(stored=True))
k = u("alfa")
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(id=0, kind=k, name=u("one"))
w.add_document(id=1, kind=k, name=u("two"))
w.add_document(id=2, kind=k, name=u("three"))
w.add_document(id=3, kind=k, name=u("four"))
w.add_document(id=4, kind=k, name=u("one"))
w.add_document(id=5, kind=k, name=u("two"))
w.add_document(id=6, kind=k, name=u("three"))
w.add_document(id=7, kind=k, name=u("four"))
w.add_document(id=8, kind=k, name=u("one"))
w.add_document(id=9, kind=k, name=u("two"))
w.add_document(id=10, kind=k, name=u("three"))
w.add_document(id=11, kind=k, name=u("four"))
with ix.searcher() as s:
pq = query.Term("kind", "bravo")
cq = query.Or([query.Term("name", "two"), query.Term("name", "four")])
q = query.NestedParent(pq, cq)
r = s.search(q)
assert r.is_empty()
def test_nested_children():
schema = fields.Schema(t=fields.ID(stored=True),
track=fields.NUMERIC(stored=True),
album_name=fields.TEXT(stored=True),
song_name=fields.TEXT(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
with w.group():
w.add_document(t=u("album"), album_name=u("alfa bravo charlie"))
w.add_document(t=u("track"), track=1,
song_name=u("delta echo foxtrot"))
w.add_document(t=u("track"), track=2,
song_name=u("golf hotel india"))
w.add_document(t=u("track"), track=3,
song_name=u("juliet kilo lima"))
with w.group():
w.add_document(t=u("album"), album_name=u("mike november oskar"))
w.add_document(t=u("track"), track=1,
song_name=u("papa quebec romeo"))
w.add_document(t=u("track"), track=2,
song_name=u("sierra tango ultra"))
w.add_document(t=u("track"), track=3,
song_name=u("victor whiskey xray"))
with w.group():
w.add_document(t=u("album"), album_name=u("yankee zulu one"))
w.add_document(t=u("track"), track=1,
song_name=u("two three four"))
w.add_document(t=u("track"), track=2,
song_name=u("five six seven"))
w.add_document(t=u("track"), track=3,
song_name=u("eight nine ten"))
with ix.searcher() as s:
pq = query.Term("t", "album")
aq = query.Term("album_name", "november")
r = s.search(query.NestedChildren(pq, pq), limit=None)
assert len(r) == 9
assert [str(hit["t"]) for hit in r] == ["track"] * 9
ncq = query.NestedChildren(pq, aq)
assert list(ncq.docs(s)) == [5, 6, 7]
r = s.search(ncq, limit=None)
assert len(r) == 3
assert [str(hit["song_name"]) for hit in r] == ["papa quebec romeo",
"sierra tango ultra",
"victor whiskey xray"]
zq = query.NestedChildren(pq, query.Term("album_name", "zulu"))
f = sorting.StoredFieldFacet("song_name")
r = s.search(zq, sortedby=f)
assert [hit["track"] for hit in r] == [3, 2, 1]
def test_nested_skip():
schema = fields.Schema(
id=fields.ID(unique=True, stored=True),
name=fields.TEXT(stored=True),
name_ngrams=fields.NGRAMWORDS(minsize=4, field_boost=1.2),
type=fields.TEXT,
)
domain = [
(u"book_1", u"The Dark Knight Returns", u"book"),
(u"chapter_1", u"The Dark Knight Returns", u"chapter"),
(u"chapter_2", u"The Dark Knight Triumphant", u"chapter"),
(u"chapter_3", u"Hunt the Dark Knight", u"chapter"),
(u"chapter_4", u"The Dark Knight Falls", u"chapter")
]
with TempIndex(schema) as ix:
with ix.writer() as w:
for id, name, typ in domain:
w.add_document(id=id, name=name, name_ngrams=name, type=typ)
with ix.searcher() as s:
all_parents = query.Term("type", "book")
wanted_parents = query.Term("name", "dark")
children_of_wanted_parents = query.NestedChildren(all_parents,
wanted_parents)
r1 = s.search(children_of_wanted_parents)
assert r1.scored_length() == 4
assert [hit["id"] for hit in r1] == ["chapter_1", "chapter_2",
"chapter_3", "chapter_4"]
wanted_children = query.And([query.Term("type", "chapter"),
query.Term("name", "hunt")])
r2 = s.search(wanted_children)
assert r2.scored_length() == 1
assert [hit["id"] for hit in r2] == ["chapter_3"]
complex_query = query.And([children_of_wanted_parents,
wanted_children])
r3 = s.search(complex_query)
assert r3.scored_length() == 1
assert [hit["id"] for hit in r3] == ["chapter_3"]