281 lines
9.2 KiB
Python
281 lines
9.2 KiB
Python
from __future__ import with_statement
|
|
import inspect, random, sys
|
|
|
|
from whoosh import columns, fields, query
|
|
from whoosh.codec.whoosh3 import W3Codec
|
|
from whoosh.compat import b, u, BytesIO, bytes_type, text_type
|
|
from whoosh.compat import izip, xrange, dumps, loads
|
|
from whoosh.filedb import compound
|
|
from whoosh.filedb.filestore import RamStorage
|
|
from whoosh.util.testing import TempIndex, TempStorage
|
|
|
|
|
|
def test_pickleability():
|
|
# Ignore base classes
|
|
ignore = (columns.Column, columns.WrappedColumn, columns.ListColumn)
|
|
# Required arguments
|
|
init_args = {"ClampedNumericColumn": (columns.NumericColumn("B"),),
|
|
"FixedBytesColumn": (5,),
|
|
"FixedBytesListColumn": (5,),
|
|
"NumericColumn": ("i",),
|
|
"PickleColumn": (columns.VarBytesColumn(),),
|
|
"StructColumn": ("=if", (0, 0.0)),
|
|
}
|
|
|
|
coltypes = [c for _, c in inspect.getmembers(columns, inspect.isclass)
|
|
if issubclass(c, columns.Column) and not c in ignore]
|
|
|
|
for coltype in coltypes:
|
|
args = init_args.get(coltype.__name__, ())
|
|
try:
|
|
inst = coltype(*args)
|
|
except TypeError:
|
|
e = sys.exc_info()[1]
|
|
raise TypeError("Error instantiating %r: %s" % (coltype, e))
|
|
_ = loads(dumps(inst, -1))
|
|
|
|
|
|
def test_multistream():
|
|
domain = [("a", "12345"), ("b", "abc"), ("c", "AaBbC"),
|
|
("a", "678"), ("c", "cDdEeF"), ("b", "defgh"),
|
|
("b", "ijk"), ("c", "fGgHh"), ("a", "9abc")]
|
|
|
|
st = RamStorage()
|
|
msw = compound.CompoundWriter(st)
|
|
files = dict((name, msw.create_file(name)) for name in "abc")
|
|
for name, data in domain:
|
|
files[name].write(b(data))
|
|
f = st.create_file("test")
|
|
msw.save_as_compound(f)
|
|
|
|
f = st.open_file("test")
|
|
msr = compound.CompoundStorage(f)
|
|
assert msr.open_file("a").read() == b("123456789abc")
|
|
assert msr.open_file("b").read() == b("abcdefghijk")
|
|
assert msr.open_file("c").read() == b("AaBbCcDdEeFfGgHh")
|
|
|
|
|
|
def test_random_multistream():
|
|
letters = "abcdefghijklmnopqrstuvwxyz"
|
|
|
|
def randstring(n):
|
|
s = "".join(random.choice(letters) for _ in xrange(n))
|
|
return s.encode("latin1")
|
|
|
|
domain = {}
|
|
for _ in xrange(100):
|
|
name = randstring(random.randint(5, 10))
|
|
value = randstring(2500)
|
|
domain[name] = value
|
|
|
|
outfiles = dict((name, BytesIO(value)) for name, value in domain.items())
|
|
|
|
with TempStorage() as st:
|
|
msw = compound.CompoundWriter(st, buffersize=1024)
|
|
mfiles = {}
|
|
for name in domain:
|
|
mfiles[name] = msw.create_file(name)
|
|
while outfiles:
|
|
name = random.choice(list(outfiles.keys()))
|
|
v = outfiles[name].read(1000)
|
|
mfiles[name].write(v)
|
|
if len(v) < 1000:
|
|
del outfiles[name]
|
|
f = st.create_file("test")
|
|
msw.save_as_compound(f)
|
|
|
|
f = st.open_file("test")
|
|
msr = compound.CompoundStorage(f)
|
|
for name, value in domain.items():
|
|
assert msr.open_file(name).read() == value
|
|
msr.close()
|
|
|
|
|
|
def _rt(c, values, default):
|
|
# Continuous
|
|
st = RamStorage()
|
|
f = st.create_file("test1")
|
|
f.write(b("hello"))
|
|
w = c.writer(f)
|
|
for docnum, v in enumerate(values):
|
|
w.add(docnum, v)
|
|
w.finish(len(values))
|
|
length = f.tell() - 5
|
|
f.close()
|
|
|
|
f = st.open_file("test1")
|
|
r = c.reader(f, 5, length, len(values))
|
|
assert values == list(r)
|
|
for x in range(len(values)):
|
|
assert values[x] == r[x]
|
|
f.close()
|
|
|
|
# Sparse
|
|
doccount = len(values) * 7 + 15
|
|
target = [default] * doccount
|
|
|
|
f = st.create_file("test2")
|
|
f.write(b("hello"))
|
|
w = c.writer(f)
|
|
for docnum, v in izip(xrange(10, doccount, 7), values):
|
|
target[docnum] = v
|
|
w.add(docnum, v)
|
|
w.finish(doccount)
|
|
length = f.tell() - 5
|
|
f.close()
|
|
|
|
f = st.open_file("test2")
|
|
r = c.reader(f, 5, length, doccount)
|
|
assert target == list(r)
|
|
for x in range(doccount):
|
|
assert target[x] == r[x]
|
|
|
|
lr = r.load()
|
|
assert target == list(lr)
|
|
f.close()
|
|
|
|
|
|
def test_roundtrip():
|
|
_rt(columns.VarBytesColumn(),
|
|
[b("a"), b("ccc"), b("bbb"), b("e"), b("dd")], b(""))
|
|
_rt(columns.FixedBytesColumn(5),
|
|
[b("aaaaa"), b("eeeee"), b("ccccc"), b("bbbbb"), b("eeeee")],
|
|
b("\x00") * 5)
|
|
_rt(columns.RefBytesColumn(),
|
|
[b("a"), b("ccc"), b("bb"), b("ccc"), b("a"), b("bb")], b(""))
|
|
_rt(columns.RefBytesColumn(3),
|
|
[b("aaa"), b("bbb"), b("ccc"), b("aaa"), b("bbb"), b("ccc")],
|
|
b("\x00") * 3)
|
|
_rt(columns.StructColumn("ifH", (0, 0.0, 0)),
|
|
[(100, 1.5, 15000), (-100, -5.0, 0), (5820, 6.5, 462),
|
|
(-57829, -1.5, 6), (0, 0, 0)],
|
|
(0, 0.0, 0))
|
|
|
|
numcol = columns.NumericColumn
|
|
_rt(numcol("b"), [10, -20, 30, -25, 15], 0)
|
|
_rt(numcol("B"), [10, 20, 30, 25, 15], 0)
|
|
_rt(numcol("h"), [1000, -2000, 3000, -15000, 32000], 0)
|
|
_rt(numcol("H"), [1000, 2000, 3000, 15000, 50000], 0)
|
|
_rt(numcol("i"), [2 ** 16, -(2 ** 20), 2 ** 24, -(2 ** 28), 2 ** 30], 0)
|
|
_rt(numcol("I"), [2 ** 16, 2 ** 20, 2 ** 24, 2 ** 28, 2 ** 31 & 0xFFFFFFFF], 0)
|
|
_rt(numcol("q"), [10, -20, 30, -25, 15], 0)
|
|
_rt(numcol("Q"), [2 ** 35, 2 ** 40, 2 ** 48, 2 ** 52, 2 ** 63], 0)
|
|
_rt(numcol("f"), [1.5, -2.5, 3.5, -4.5, 1.25], 0)
|
|
_rt(numcol("d"), [1.5, -2.5, 3.5, -4.5, 1.25], 0)
|
|
|
|
c = columns.BitColumn(compress_at=10)
|
|
_rt(c, [bool(random.randint(0, 1)) for _ in xrange(70)], False)
|
|
_rt(c, [bool(random.randint(0, 1)) for _ in xrange(90)], False)
|
|
|
|
c = columns.PickleColumn(columns.VarBytesColumn())
|
|
_rt(c, [None, True, False, 100, -7, "hello"], None)
|
|
|
|
|
|
def test_multivalue():
|
|
schema = fields.Schema(s=fields.TEXT(sortable=True),
|
|
n=fields.NUMERIC(sortable=True))
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer(codec=W3Codec()) as w:
|
|
w.add_document(s=u("alfa foxtrot charlie").split(), n=[100, 200, 300])
|
|
w.add_document(s=u("juliet bravo india").split(), n=[10, 20, 30])
|
|
|
|
with ix.reader() as r:
|
|
scr = r.column_reader("s")
|
|
assert list(scr) == ["alfa", "juliet"]
|
|
|
|
ncr = r.column_reader("n")
|
|
assert list(ncr) == [100, 10]
|
|
|
|
|
|
def test_column_field():
|
|
schema = fields.Schema(a=fields.TEXT(sortable=True),
|
|
b=fields.COLUMN(columns.RefBytesColumn()))
|
|
with TempIndex(schema, "columnfield") as ix:
|
|
with ix.writer(codec=W3Codec()) as w:
|
|
w.add_document(a=u("alfa bravo"), b=b("charlie delta"))
|
|
w.add_document(a=u("bravo charlie"), b=b("delta echo"))
|
|
w.add_document(a=u("charlie delta"), b=b("echo foxtrot"))
|
|
|
|
with ix.reader() as r:
|
|
assert r.has_column("a")
|
|
assert r.has_column("b")
|
|
|
|
cra = r.column_reader("a")
|
|
assert cra[0] == u("alfa bravo")
|
|
assert type(cra[0]) == text_type
|
|
|
|
crb = r.column_reader("b")
|
|
assert crb[0] == b("charlie delta")
|
|
assert type(crb[0]) == bytes_type
|
|
|
|
|
|
def test_column_query():
|
|
schema = fields.Schema(id=fields.STORED,
|
|
a=fields.ID(sortable=True),
|
|
b=fields.NUMERIC(sortable=True))
|
|
with TempIndex(schema, "columnquery") as ix:
|
|
with ix.writer(codec=W3Codec()) as w:
|
|
w.add_document(id=1, a=u("alfa"), b=10)
|
|
w.add_document(id=2, a=u("bravo"), b=20)
|
|
w.add_document(id=3, a=u("charlie"), b=30)
|
|
w.add_document(id=4, a=u("delta"), b=40)
|
|
w.add_document(id=5, a=u("echo"), b=50)
|
|
w.add_document(id=6, a=u("foxtrot"), b=60)
|
|
|
|
with ix.searcher() as s:
|
|
def check(q):
|
|
return [s.stored_fields(docnum)["id"] for docnum in q.docs(s)]
|
|
|
|
q = query.ColumnQuery("a", u("bravo"))
|
|
assert check(q) == [2]
|
|
|
|
q = query.ColumnQuery("b", 30)
|
|
assert check(q) == [3]
|
|
|
|
q = query.ColumnQuery("a", lambda v: v != u("delta"))
|
|
assert check(q) == [1, 2, 3, 5, 6]
|
|
|
|
q = query.ColumnQuery("b", lambda v: v > 30)
|
|
assert check(q) == [4, 5, 6]
|
|
|
|
|
|
def test_ref_switch():
|
|
import warnings
|
|
|
|
col = columns.RefBytesColumn()
|
|
|
|
def rw(size):
|
|
st = RamStorage()
|
|
|
|
f = st.create_file("test")
|
|
cw = col.writer(f)
|
|
for i in xrange(size):
|
|
cw.add(i, hex(i).encode("latin1"))
|
|
cw.finish(size)
|
|
length = f.tell()
|
|
f.close()
|
|
|
|
f = st.open_file("test")
|
|
cr = col.reader(f, 0, length, size)
|
|
for i in xrange(size):
|
|
v = cr[i]
|
|
# Column ignores additional unique values after 65535
|
|
if i <= 65535 - 1:
|
|
assert v == hex(i).encode("latin1")
|
|
else:
|
|
assert v == b('')
|
|
f.close()
|
|
|
|
rw(255)
|
|
|
|
# warnings.catch_warnings is not available in Python 2.5
|
|
if hasattr(warnings, "catch_warnings"):
|
|
# Column warns on additional unique values after 65535
|
|
with warnings.catch_warnings(record=True) as w:
|
|
# Cause all warnings to always be triggered.
|
|
warnings.simplefilter("always")
|
|
rw(65537)
|
|
|
|
assert len(w) == 2
|
|
assert issubclass(w[-1].category, UserWarning)
|