debian-python-whoosh/tests/test_fields.py

598 lines
18 KiB
Python

from __future__ import with_statement
from datetime import datetime, timedelta
import pytest
from whoosh import fields, qparser, query
from whoosh.compat import long_type, u, b, xrange
from whoosh.filedb.filestore import RamStorage
from whoosh.util import times
def test_schema_eq():
a = fields.Schema()
b = fields.Schema()
assert a == b
a = fields.Schema(id=fields.ID)
b = a.copy()
assert a["id"] == b["id"]
assert a == b
c = fields.Schema(id=fields.TEXT)
assert a != c
def test_creation1():
s = fields.Schema()
s.add("content", fields.TEXT(phrase=True))
s.add("title", fields.TEXT(stored=True))
s.add("path", fields.ID(stored=True))
s.add("tags", fields.KEYWORD(stored=True))
s.add("quick", fields.NGRAM)
s.add("note", fields.STORED)
assert s.names() == ["content", "note", "path", "quick", "tags", "title"]
assert "content" in s
assert "buzz" not in s
assert isinstance(s["tags"], fields.KEYWORD)
def test_creation2():
s = fields.Schema(a=fields.ID(stored=True),
b=fields.ID,
c=fields.KEYWORD(scorable=True))
assert s.names() == ["a", "b", "c"]
assert "a" in s
assert "b" in s
assert "c" in s
def test_declarative():
class MySchema(fields.SchemaClass):
content = fields.TEXT
title = fields.TEXT
path = fields.ID
date = fields.DATETIME
ix = RamStorage().create_index(MySchema)
assert ix.schema.names() == ["content", "date", "path", "title"]
ix = RamStorage().create_index(MySchema())
assert ix.schema.names() == ["content", "date", "path", "title"]
with pytest.raises(fields.FieldConfigurationError):
RamStorage().create_index(object())
def test_declarative_inherit():
class Parent(fields.SchemaClass):
path = fields.ID
date = fields.DATETIME
class Child(Parent):
content = fields.TEXT
class Grandchild(Child):
title = fields.TEXT
s = Grandchild()
assert s.names() == ["content", "date", "path", "title"]
def test_badnames():
s = fields.Schema()
with pytest.raises(fields.FieldConfigurationError):
s.add("_test", fields.ID)
with pytest.raises(fields.FieldConfigurationError):
s.add("a f", fields.ID)
#def test_numeric_support():
# intf = fields.NUMERIC(int, shift_step=0)
# longf = fields.NUMERIC(int, bits=64, shift_step=0)
# floatf = fields.NUMERIC(float, shift_step=0)
#
# def roundtrip(obj, num):
# assert obj.from_bytes(obj.to_bytes(num)), num)
#
# roundtrip(intf, 0)
# roundtrip(intf, 12345)
# roundtrip(intf, -12345)
# roundtrip(longf, 0)
# roundtrip(longf, 85020450482)
# roundtrip(longf, -85020450482)
# roundtrip(floatf, 0)
# roundtrip(floatf, 582.592)
# roundtrip(floatf, -582.592)
# roundtrip(floatf, -99.42)
#
# from random import shuffle
#
# def roundtrip_sort(obj, start, end, step):
# count = start
# rng = []
# while count < end:
# rng.append(count)
# count += step
#
# scrabled = list(rng)
# shuffle(scrabled)
# round = [obj.from_text(t) for t
# in sorted([obj.to_text(n) for n in scrabled])]
# assert round, rng)
#
# roundtrip_sort(intf, -100, 100, 1)
# roundtrip_sort(longf, -58902, 58249, 43)
# roundtrip_sort(floatf, -99.42, 99.83, 2.38)
def test_index_numeric():
schema = fields.Schema(a=fields.NUMERIC(int, 32, signed=False),
b=fields.NUMERIC(int, 32, signed=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(a=1, b=1)
with ix.searcher() as s:
assert list(s.lexicon("a")) == \
[b('\x00\x00\x00\x00\x01'), b('\x04\x00\x00\x00\x00'),
b('\x08\x00\x00\x00\x00'), b('\x0c\x00\x00\x00\x00'),
b('\x10\x00\x00\x00\x00'), b('\x14\x00\x00\x00\x00'),
b('\x18\x00\x00\x00\x00'), b('\x1c\x00\x00\x00\x00')]
assert list(s.lexicon("b")) == \
[b('\x00\x80\x00\x00\x01'), b('\x04\x08\x00\x00\x00'),
b('\x08\x00\x80\x00\x00'), b('\x0c\x00\x08\x00\x00'),
b('\x10\x00\x00\x80\x00'), b('\x14\x00\x00\x08\x00'),
b('\x18\x00\x00\x00\x80'), b('\x1c\x00\x00\x00\x08')]
def test_numeric():
schema = fields.Schema(id=fields.ID(stored=True),
integer=fields.NUMERIC(int),
floating=fields.NUMERIC(float))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=u("a"), integer=5820, floating=1.2)
w.add_document(id=u("b"), integer=22, floating=2.3)
w.add_document(id=u("c"), integer=78, floating=3.4)
w.add_document(id=u("d"), integer=13, floating=4.5)
w.add_document(id=u("e"), integer=9, floating=5.6)
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("integer", schema)
q = qp.parse(u("5820"))
r = s.search(q)
assert len(r) == 1
assert r[0]["id"] == "a"
with ix.searcher() as s:
r = s.search(qp.parse("floating:4.5"))
assert len(r) == 1
assert r[0]["id"] == "d"
q = qp.parse("integer:*")
assert q.__class__ == query.Every
assert q.field() == "integer"
q = qp.parse("integer:5?6")
assert q == query.NullQuery
def test_decimal_numeric():
from decimal import Decimal
f = fields.NUMERIC(int, decimal_places=4)
schema = fields.Schema(id=fields.ID(stored=True), deci=f)
ix = RamStorage().create_index(schema)
# assert f.from_text(f.to_text(Decimal("123.56"))), Decimal("123.56"))
w = ix.writer()
w.add_document(id=u("a"), deci=Decimal("123.56"))
w.add_document(id=u("b"), deci=Decimal("0.536255"))
w.add_document(id=u("c"), deci=Decimal("2.5255"))
w.add_document(id=u("d"), deci=Decimal("58"))
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("deci", schema)
q = qp.parse(u("123.56"))
r = s.search(q)
assert len(r) == 1
assert r[0]["id"] == "a"
r = s.search(qp.parse(u("0.536255")))
assert len(r) == 1
assert r[0]["id"] == "b"
def test_numeric_parsing():
schema = fields.Schema(id=fields.ID(stored=True), number=fields.NUMERIC)
qp = qparser.QueryParser("number", schema)
q = qp.parse(u("[10 to *]"))
assert q == query.NullQuery
q = qp.parse(u("[to 400]"))
assert q.__class__ is query.NumericRange
assert q.start is None
assert q.end == 400
q = qp.parse(u("[10 to]"))
assert q.__class__ is query.NumericRange
assert q.start == 10
assert q.end is None
q = qp.parse(u("[10 to 400]"))
assert q.__class__ is query.NumericRange
assert q.start == 10
assert q.end == 400
def test_numeric_ranges():
schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC)
ix = RamStorage().create_index(schema)
w = ix.writer()
for i in xrange(400):
w.add_document(id=i, num=i)
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("num", schema)
def check(qs, target):
q = qp.parse(qs)
result = [s.stored_fields(d)["id"] for d in q.docs(s)]
assert result == target
# Note that range() is always inclusive-exclusive
check("[10 to 390]", list(range(10, 390 + 1)))
check("[100 to]", list(range(100, 400)))
check("[to 350]", list(range(0, 350 + 1)))
check("[16 to 255]", list(range(16, 255 + 1)))
check("{10 to 390]", list(range(11, 390 + 1)))
check("[10 to 390}", list(range(10, 390)))
check("{10 to 390}", list(range(11, 390)))
check("{16 to 255}", list(range(17, 255)))
def test_numeric_ranges_unsigned():
values = [1, 10, 100, 1000, 2, 20, 200, 2000, 9, 90, 900, 9000]
schema = fields.Schema(num2=fields.NUMERIC(stored=True, signed=False))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
for v in values:
w.add_document(num2=v)
with ix.searcher() as s:
q = query.NumericRange("num2", 55, None, True, False)
r = s.search(q, limit=None)
for hit in r:
assert int(hit["num2"]) >= 55
def test_decimal_ranges():
from decimal import Decimal
schema = fields.Schema(id=fields.STORED,
num=fields.NUMERIC(int, decimal_places=2))
ix = RamStorage().create_index(schema)
w = ix.writer()
count = Decimal("0.0")
inc = Decimal("0.2")
for _ in xrange(500):
w.add_document(id=str(count), num=count)
count += inc
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("num", schema)
def check(qs, start, end):
q = qp.parse(qs)
result = [s.stored_fields(d)["id"] for d in q.docs(s)]
target = []
count = Decimal(start)
limit = Decimal(end)
while count <= limit:
target.append(str(count))
count += inc
assert result == target
check("[10.2 to 80.8]", "10.2", "80.8")
check("{10.2 to 80.8]", "10.4", "80.8")
check("[10.2 to 80.8}", "10.2", "80.6")
check("{10.2 to 80.8}", "10.4", "80.6")
def test_numeric_errors():
f = fields.NUMERIC(int, bits=16, signed=True)
schema = fields.Schema(f=f)
with pytest.raises(ValueError):
list(f.index(-32769))
with pytest.raises(ValueError):
list(f.index(32768))
def test_nontext_document():
schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC,
date=fields.DATETIME, even=fields.BOOLEAN)
ix = RamStorage().create_index(schema)
dt = datetime.now()
w = ix.writer()
for i in xrange(50):
w.add_document(id=i, num=i, date=dt + timedelta(days=i),
even=not(i % 2))
w.commit()
with ix.searcher() as s:
def check(kwargs, target):
result = [d['id'] for d in s.documents(**kwargs)]
assert result == target
check({"num": 49}, [49])
check({"date": dt + timedelta(days=30)}, [30])
check({"even": True}, list(range(0, 50, 2)))
def test_nontext_update():
schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC(unique=True),
date=fields.DATETIME(unique=True))
ix = RamStorage().create_index(schema)
dt = datetime.now()
w = ix.writer()
for i in xrange(10):
w.add_document(id=i, num=i, date=dt + timedelta(days=i))
w.commit()
w = ix.writer()
w.update_document(num=8, id="a")
w.update_document(num=2, id="b")
w.update_document(num=4, id="c")
w.update_document(date=dt + timedelta(days=5), id="d")
w.update_document(date=dt + timedelta(days=1), id="e")
w.update_document(date=dt + timedelta(days=7), id="f")
w.commit()
def test_datetime():
dtf = fields.DATETIME(stored=True)
schema = fields.Schema(id=fields.ID(stored=True), date=dtf)
st = RamStorage()
ix = st.create_index(schema)
w = ix.writer()
for month in xrange(1, 12):
for day in xrange(1, 28):
w.add_document(id=u("%s-%s") % (month, day),
date=datetime(2010, month, day, 14, 0, 0))
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("id", schema)
r = s.search(qp.parse("date:20100523"))
assert len(r) == 1
assert r[0]["id"] == "5-23"
assert r[0]["date"].__class__ is datetime
assert r[0]["date"].month == 5
assert r[0]["date"].day == 23
r = s.search(qp.parse("date:'2010 02'"))
assert len(r) == 27
q = qp.parse(u("date:[2010-05 to 2010-08]"))
startdt = datetime(2010, 5, 1, 0, 0, 0, 0)
enddt = datetime(2010, 8, 31, 23, 59, 59, 999999)
assert q.__class__ is query.NumericRange
assert q.start == times.datetime_to_long(startdt)
assert q.end == times.datetime_to_long(enddt)
def test_boolean():
schema = fields.Schema(id=fields.ID(stored=True),
done=fields.BOOLEAN(stored=True))
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=u("a"), done=True)
w.add_document(id=u("b"), done=False)
w.add_document(id=u("c"), done=True)
w.add_document(id=u("d"), done=False)
w.add_document(id=u("e"), done=True)
w.commit()
with ix.searcher() as s:
qp = qparser.QueryParser("id", schema)
r = s.search(qp.parse("done:true"))
assert sorted([d["id"] for d in r]) == ["a", "c", "e"]
assert all(d["done"] for d in r)
r = s.search(qp.parse("done:yes"))
assert sorted([d["id"] for d in r]) == ["a", "c", "e"]
assert all(d["done"] for d in r)
q = qp.parse("done:false")
assert q.__class__ == query.Term
assert q.text is False
assert schema["done"].to_bytes(False) == b("f")
r = s.search(q)
assert sorted([d["id"] for d in r]) == ["b", "d"]
assert not any(d["done"] for d in r)
r = s.search(qp.parse("done:no"))
assert sorted([d["id"] for d in r]) == ["b", "d"]
assert not any(d["done"] for d in r)
def test_boolean2():
schema = fields.Schema(t=fields.TEXT(stored=True),
b=fields.BOOLEAN(stored=True))
ix = RamStorage().create_index(schema)
writer = ix.writer()
writer.add_document(t=u('some kind of text'), b=False)
writer.add_document(t=u('some other kind of text'), b=False)
writer.add_document(t=u('some more text'), b=False)
writer.add_document(t=u('some again'), b=True)
writer.commit()
with ix.searcher() as s:
qf = qparser.QueryParser('b', None).parse(u('f'))
qt = qparser.QueryParser('b', None).parse(u('t'))
r = s.search(qf)
assert len(r) == 3
assert [d["b"] for d in s.search(qt)] == [True]
assert [d["b"] for d in s.search(qf)] == [False] * 3
def test_boolean3():
schema = fields.Schema(t=fields.TEXT(stored=True, field_boost=5),
b=fields.BOOLEAN(stored=True),
c=fields.TEXT)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(t=u("with hardcopy"), b=True, c=u("alfa"))
w.add_document(t=u("no hardcopy"), b=False, c=u("bravo"))
with ix.searcher() as s:
q = query.Term("b", schema["b"].to_bytes(True))
ts = [hit["t"] for hit in s.search(q)]
assert ts == ["with hardcopy"]
def test_boolean_strings():
schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(i=0, b="true")
w.add_document(i=1, b="True")
w.add_document(i=2, b="false")
w.add_document(i=3, b="False")
w.add_document(i=4, b=u("true"))
w.add_document(i=5, b=u("True"))
w.add_document(i=6, b=u("false"))
w.add_document(i=7, b=u("False"))
with ix.searcher() as s:
qp = qparser.QueryParser("b", ix.schema)
def check(qs, nums):
q = qp.parse(qs)
r = s.search(q, limit=None)
assert [hit["i"] for hit in r] == nums
trues = [0, 1, 4, 5]
falses = [2, 3, 6, 7]
check("true", trues)
check("True", trues)
check("false", falses)
check("False", falses)
check("t", trues)
check("f", falses)
def test_boolean_find_deleted():
# "Random" string of ones and zeros representing deleted and undeleted
domain = "1110001010001110010101000101001011101010001011111101000101010101"
schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True))
ix = RamStorage().create_index(schema)
count = 0
# Create multiple segments just in case
for _ in xrange(5):
w = ix.writer()
for c in domain:
w.add_document(i=count, b=(c == "1"))
w.commit(merge=False)
# Delete documents where "b" is True
with ix.writer() as w:
w.delete_by_term("b", "t")
with ix.searcher() as s:
# Double check that documents with b=True are all deleted
reader = s.reader()
for docnum in xrange(s.doc_count_all()):
b = s.stored_fields(docnum)["b"]
assert b == reader.is_deleted(docnum)
# Try doing a search for documents where b=True
qp = qparser.QueryParser("b", ix.schema)
q = qp.parse("b:t")
r = s.search(q, limit=None)
assert len(r) == 0
# Make sure Every query doesn't match deleted docs
r = s.search(qp.parse("*"), limit=None)
assert not any(hit["b"] for hit in r)
assert not any(reader.is_deleted(hit.docnum) for hit in r)
r = s.search(qp.parse("*:*"), limit=None)
assert not any(hit["b"] for hit in r)
assert not any(reader.is_deleted(hit.docnum) for hit in r)
# Make sure Not query doesn't match deleted docs
q = qp.parse("NOT b:t")
r = s.search(q, limit=None)
assert not any(hit["b"] for hit in r)
assert not any(reader.is_deleted(hit.docnum) for hit in r)
r = s.search(q, limit=5)
assert not any(hit["b"] for hit in r)
assert not any(reader.is_deleted(hit.docnum) for hit in r)
def test_boolean_multifield():
schema = fields.Schema(name=fields.TEXT(stored=True),
bit=fields.BOOLEAN(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(name=u('audi'), bit=True)
w.add_document(name=u('vw'), bit=False)
w.add_document(name=u('porsche'), bit=False)
w.add_document(name=u('ferrari'), bit=True)
w.add_document(name=u('citroen'), bit=False)
with ix.searcher() as s:
qp = qparser.MultifieldParser(["name", "bit"], schema)
q = qp.parse(u("boop"))
r = s.search(q)
assert sorted(hit["name"] for hit in r) == ["audi", "ferrari"]
assert len(r) == 2
def test_missing_field():
schema = fields.Schema()
ix = RamStorage().create_index(schema)
with ix.searcher() as s:
with pytest.raises(KeyError):
s.document_numbers(id=u("test"))
def test_token_boost():
from whoosh.analysis import RegexTokenizer, DoubleMetaphoneFilter
ana = RegexTokenizer() | DoubleMetaphoneFilter()
field = fields.TEXT(analyzer=ana, phrase=False)
results = sorted(field.index(u("spruce view")))
assert results == [(b('F'), 1, 1.0, b('\x00\x00\x00\x01')),
(b('FF'), 1, 0.5, b('\x00\x00\x00\x01')),
(b('SPRS'), 1, 1.0, b('\x00\x00\x00\x01')),
]