454 lines
12 KiB
Python
454 lines
12 KiB
Python
import datetime
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import time
|
|
import random
|
|
|
|
import pytest
|
|
|
|
from quixote import cleanup
|
|
from wcs import publisher
|
|
from wcs.qommon.storage import StorableObject, cache_umask
|
|
import wcs.qommon.storage as st
|
|
|
|
from utilities import create_temporary_pub
|
|
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
global pub
|
|
pub = create_temporary_pub()
|
|
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
|
|
class Foobar(StorableObject):
|
|
_names = 'tests%s' % random.randint(0, 100000)
|
|
_indexes = ['unique_value']
|
|
_hashed_indexes = ['value', 'dict_value']
|
|
|
|
value = None
|
|
unique_value = None
|
|
dict_value = None
|
|
|
|
|
|
def test_store():
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
assert test.id == '1'
|
|
|
|
|
|
def test_get():
|
|
test = Foobar.get(1)
|
|
assert test.value == 'value'
|
|
|
|
|
|
def test_get_on_index():
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
|
|
def test_remove_self():
|
|
test = Foobar.get(1)
|
|
test.remove_self()
|
|
with pytest.raises(KeyError):
|
|
test = Foobar.get(1)
|
|
test = Foobar.get(1, ignore_errors=True)
|
|
assert test is None
|
|
|
|
|
|
def test_get_on_index_changes():
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
test = Foobar.get_on_index('unique-value2', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
with pytest.raises(KeyError):
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
|
|
|
|
def test_get_with_indexed_value():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('value', 'value'))
|
|
assert len(tests) == 2
|
|
assert 'unique-value' in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' not in [x.unique_value for x in tests]
|
|
|
|
|
|
def test_get_with_indexed_value_changes():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('value', 'value'))
|
|
assert len(tests) == 2
|
|
assert 'unique-value' in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' not in [x.unique_value for x in tests]
|
|
|
|
test = Foobar.get_on_index('unique-value3', 'unique_value')
|
|
test.value = 'value'
|
|
test.store()
|
|
tests = list(Foobar.get_with_indexed_value('value', 'value'))
|
|
assert len(tests) == 3
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
|
|
def test_get_with_indexed_value_dict():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.dict_value = {'plop': '2'}
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.dict_value = {'plop': '2'}
|
|
test.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('dict_value', '2'))
|
|
assert len(tests) == 2
|
|
assert 'unique-value' not in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
|
|
def test_get_with_indexed_value_dict_changes():
|
|
Foobar.wipe()
|
|
|
|
test1 = Foobar()
|
|
test1.unique_value = 'unique-value'
|
|
test1.store()
|
|
|
|
test2 = Foobar()
|
|
test2.value = 'value'
|
|
test2.unique_value = 'unique-value2'
|
|
test2.dict_value = {'plop': '2'}
|
|
test2.store()
|
|
|
|
test3 = Foobar()
|
|
test3.value = 'value1'
|
|
test3.unique_value = 'unique-value3'
|
|
test3.dict_value = {'plop': '2'}
|
|
test3.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('dict_value', '2'))
|
|
assert len(tests) == 2
|
|
assert 'unique-value' not in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
test1.dict_value = {'plop': '2'}
|
|
test1.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('dict_value', '2'))
|
|
assert len(tests) == 3
|
|
|
|
test1.dict_value = None
|
|
test1.store()
|
|
|
|
tests = list(Foobar.get_with_indexed_value('dict_value', '2'))
|
|
assert len(tests) == 2
|
|
|
|
|
|
def test_select():
|
|
Foobar.wipe()
|
|
|
|
for x in range(1, 51):
|
|
test = Foobar()
|
|
test.unique_value = x
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert len(Foobar.select(lambda x: x.unique_value < 26)) == 25
|
|
assert len(Foobar.select([st.Less('unique_value', 26)])) == 25
|
|
assert len(Foobar.select([st.Less('unique_value', 25), st.GreaterOrEqual('unique_value', 10)])) == 15
|
|
assert len(Foobar.select([st.NotEqual('unique_value', 25)])) == 49
|
|
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 26])])) == 3
|
|
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 86])])) == 2
|
|
assert len(Foobar.select([st.NotContains('unique_value', [24, 25, 86])])) == 48
|
|
|
|
|
|
def test_select_order_by():
|
|
Foobar.wipe()
|
|
|
|
for x in range(1, 51):
|
|
test = Foobar()
|
|
test.unique_value = 51-x
|
|
test.store()
|
|
|
|
assert [int(x.id) for x in Foobar.select(order_by='id')] == list(range(1, 51))
|
|
assert [int(x.id) for x in Foobar.select(order_by='-id')] == list(range(50, 0, -1))
|
|
assert [int(x.id) for x in Foobar.select(order_by='unique_value')] == list(range(50, 0, -1))
|
|
assert [int(x.id) for x in Foobar.select(order_by='-unique_value')] == list(range(1, 51))
|
|
|
|
|
|
def test_select_datetime():
|
|
Foobar.wipe()
|
|
|
|
d = datetime.datetime(2014, 1, 1)
|
|
for i in range(50):
|
|
test = Foobar()
|
|
test.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert len(Foobar.select(lambda x: x.receipt_time == d.timetuple())) == 1
|
|
assert len(Foobar.select([st.Equal('receipt_time', d.timetuple())])) == 1
|
|
assert len(Foobar.select([
|
|
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
|
|
assert len(Foobar.select([
|
|
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
|
|
|
|
|
|
def test_select_limit_offset():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10)] == list(range(1, 11))
|
|
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10, offset=10)] == list(range(11, 21))
|
|
assert [int(x.id) for x in Foobar.select(order_by='id', limit=20, offset=20)] == list(range(21, 41))
|
|
assert [int(x.id) for x in Foobar.select(order_by='id', offset=10)] == list(range(11, 51))
|
|
|
|
|
|
def test_select_criteria_overlaps():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.a = [1, 2]
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.a = []
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.a = [2, 3]
|
|
test.store()
|
|
|
|
assert len(Foobar.select([st.Intersects('a', [1])])) == 1
|
|
assert len(Foobar.select([st.Intersects('a', [2])])) == 2
|
|
assert len(Foobar.select([st.Intersects('a', [4])])) == 0
|
|
assert len(Foobar.select([st.Intersects('a', [1, 2, 3])])) == 2
|
|
|
|
|
|
def test_count():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
test.value = x + 1
|
|
test.store()
|
|
|
|
assert Foobar.count() == 50
|
|
assert Foobar.count([st.Less('value', 26)]) == 25
|
|
|
|
|
|
def test_select_criteria_or_and():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
test.value = x+1
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert [int(x.id) for x in Foobar.select([st.Or([st.Less('value', 10)])], order_by='id')] == list(range(1, 10))
|
|
assert [int(x.id) for x in Foobar.select([st.Or([
|
|
st.Less('value', 10), st.Equal('value', 15)])], order_by='value')] == list(range(1, 10)) + [15]
|
|
assert [int(x.id) for x in Foobar.select([st.And([st.Less('value', 10),
|
|
st.Greater('value', 5)])], order_by='id')] == list(range(6, 10))
|
|
|
|
|
|
def test_select_criteria_null():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
if x % 3:
|
|
test.value = None
|
|
else:
|
|
test.value = x
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert len(Foobar.select([st.Null('value')])) == 33
|
|
assert len(Foobar.select([st.NotNull('value')])) == 17
|
|
|
|
|
|
def test_select_criteria_ilike():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
if x < 20:
|
|
test.foo = 'foo'
|
|
else:
|
|
test.foo = 'bar'
|
|
test.store()
|
|
|
|
test.foo = None # makes sure it doesn't break on None
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert [int(x.id) for x in Foobar.select([st.ILike('foo', 'bar')], order_by='id')] == list(range(21, 50))
|
|
assert [int(x.id) for x in Foobar.select([st.ILike('foo', 'BAR')], order_by='id')] == list(range(21, 50))
|
|
|
|
|
|
def test_store_async():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store(async_op=True)
|
|
t0 = time.time()
|
|
while True:
|
|
time.sleep(0.1)
|
|
try:
|
|
test = Foobar.get(1)
|
|
break
|
|
except KeyError:
|
|
pass
|
|
if time.time()-t0 > 2:
|
|
raise Exception('timeout')
|
|
test = Foobar.get(1)
|
|
assert test.value == 'value'
|
|
|
|
|
|
def test_items():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
test.store()
|
|
|
|
assert sorted([(int(x), int(y.id)) for (x, y) in Foobar.items()]) == list(zip(range(1,51), range(1, 51)))
|
|
|
|
|
|
def test_reversed_order():
|
|
Foobar.wipe()
|
|
|
|
for x in range(50):
|
|
test = Foobar()
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
assert [int(x.id) for x in Foobar.select(order_by='-id', limit=10)] == list(range(50, 40, -1))
|
|
|
|
|
|
def test_destroy_rebuild_index():
|
|
test_get_with_indexed_value()
|
|
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
|
|
Foobar.destroy_indexes()
|
|
assert not os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
|
|
|
|
tests = list(Foobar.get_with_indexed_value('value', 'value'))
|
|
assert len(tests) == 2
|
|
assert 'unique-value' in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' not in [x.unique_value for x in tests]
|
|
|
|
# the indexes should have been rebuilt automatically
|
|
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
|
|
|
|
|
|
def test_concurrent_hashed_indexes():
|
|
Foobar.wipe()
|
|
|
|
children_pids = []
|
|
for i in range(10):
|
|
pid = os.fork()
|
|
if pid:
|
|
children_pids.append(pid)
|
|
else:
|
|
for j in range(10):
|
|
test = Foobar()
|
|
test.value = 'x'
|
|
test.dict_value = {'plop': random.randint(0, 10)}
|
|
test.store(async_op=False)
|
|
os._exit(0)
|
|
|
|
for pid in children_pids:
|
|
os.waitpid(pid, 0)
|
|
|
|
assert Foobar.count() == 100
|
|
for i in range(10):
|
|
manual_selection = set([x.id for x in Foobar.select() if x.dict_value == {'plop': i}])
|
|
index_selection = set([x.id for x in Foobar.get_with_indexed_value('dict_value', i)])
|
|
assert manual_selection == index_selection
|
|
|
|
|
|
def test_umask():
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
|
|
os.umask(0o022)
|
|
cache_umask()
|
|
test.store()
|
|
assert (os.stat(test.get_object_filename()).st_mode % 0o1000) == 0o644
|
|
|
|
os.umask(0o002)
|
|
cache_umask()
|
|
test.store()
|
|
assert (os.stat(test.get_object_filename()).st_mode % 0o1000) == 0o664
|