wcs/tests/test_storage.py

362 lines
9.8 KiB
Python

import datetime
import os
import sys
import shutil
import time
import random
import pytest
from quixote import cleanup
from wcs import publisher
from wcs.qommon.storage import StorableObject
import wcs.qommon.storage as st
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
class Foobar(StorableObject):
_names = 'tests%s' % random.randint(0, 100000)
_indexes = ['unique_value']
_hashed_indexes = ['value', 'dict_value']
value = None
unique_value = None
dict_value = None
def test_store():
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
assert test.id == 1
def test_get():
test = Foobar.get(1)
assert test.value == 'value'
def test_get_on_index():
test = Foobar.get_on_index('unique-value', 'unique_value')
assert test.value == 'value'
def test_remove_self():
test = Foobar.get(1)
test.remove_self()
with pytest.raises(KeyError):
test = Foobar.get(1)
test = Foobar.get(1, ignore_errors=True)
assert test is None
def test_get_on_index_changes():
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test = Foobar.get_on_index('unique-value', 'unique_value')
assert test.value == 'value'
test.unique_value = 'unique-value2'
test.store()
test = Foobar.get_on_index('unique-value2', 'unique_value')
assert test.value == 'value'
with pytest.raises(KeyError):
test = Foobar.get_on_index('unique-value', 'unique_value')
def test_get_with_indexed_value():
Foobar.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value2'
test.store()
test = Foobar()
test.value = 'value1'
test.unique_value = 'unique-value3'
test.store()
tests = Foobar.get_with_indexed_value('value', 'value')
assert len(tests) == 2
assert 'unique-value' in [x.unique_value for x in tests]
assert 'unique-value2' in [x.unique_value for x in tests]
assert 'unique-value3' not in [x.unique_value for x in tests]
def test_get_with_indexed_value_changes():
Foobar.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value2'
test.store()
test = Foobar()
test.value = 'value1'
test.unique_value = 'unique-value3'
test.store()
tests = Foobar.get_with_indexed_value('value', 'value')
assert len(tests) == 2
assert 'unique-value' in [x.unique_value for x in tests]
assert 'unique-value2' in [x.unique_value for x in tests]
assert 'unique-value3' not in [x.unique_value for x in tests]
test = Foobar.get_on_index('unique-value3', 'unique_value')
test.value = 'value'
test.store()
tests = Foobar.get_with_indexed_value('value', 'value')
assert len(tests) == 3
assert 'unique-value3' in [x.unique_value for x in tests]
def test_get_with_indexed_value_dict():
Foobar.wipe()
test = Foobar()
test.unique_value = 'unique-value'
test.store()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value2'
test.dict_value = {'plop': '2'}
test.store()
test = Foobar()
test.value = 'value1'
test.unique_value = 'unique-value3'
test.dict_value = {'plop': '2'}
test.store()
tests = Foobar.get_with_indexed_value('dict_value', '2')
assert len(tests) == 2
assert 'unique-value' not in [x.unique_value for x in tests]
assert 'unique-value2' in [x.unique_value for x in tests]
assert 'unique-value3' in [x.unique_value for x in tests]
def test_get_with_indexed_value_dict_changes():
Foobar.wipe()
test1 = Foobar()
test1.unique_value = 'unique-value'
test1.store()
test2 = Foobar()
test2.value = 'value'
test2.unique_value = 'unique-value2'
test2.dict_value = {'plop': '2'}
test2.store()
test3 = Foobar()
test3.value = 'value1'
test3.unique_value = 'unique-value3'
test3.dict_value = {'plop': '2'}
test3.store()
tests = Foobar.get_with_indexed_value('dict_value', '2')
assert len(tests) == 2
assert 'unique-value' not in [x.unique_value for x in tests]
assert 'unique-value2' in [x.unique_value for x in tests]
assert 'unique-value3' in [x.unique_value for x in tests]
test1.dict_value = {'plop': '2'}
test1.store()
tests = Foobar.get_with_indexed_value('dict_value', '2')
assert len(tests) == 3
test1.dict_value = None
test1.store()
tests = Foobar.get_with_indexed_value('dict_value', '2')
assert len(tests) == 2
def test_select():
Foobar.wipe()
for x in range(1, 51):
test = Foobar()
test.unique_value = x
test.store()
assert len(Foobar.select()) == 50
assert len(Foobar.select(lambda x: x.unique_value < 26)) == 25
assert len(Foobar.select([st.Less('unique_value', 26)])) == 25
assert len(Foobar.select([st.Less('unique_value', 25), st.GreaterOrEqual('unique_value', 10)])) == 15
assert len(Foobar.select([st.NotEqual('unique_value', 25)])) == 49
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 26])])) == 3
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 86])])) == 2
assert len(Foobar.select([st.NotContains('unique_value', [24, 25, 86])])) == 48
def test_select_datetime():
Foobar.wipe()
d = datetime.datetime(2014, 1, 1)
for i in range(50):
test = Foobar()
test.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
test.store()
assert len(Foobar.select()) == 50
assert len(Foobar.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(Foobar.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert len(Foobar.select([
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
assert len(Foobar.select([
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
def test_select_limit_offset():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select(order_by='id', limit=10)] == range(1, 11)
assert [x.id for x in Foobar.select(order_by='id', limit=10, offset=10)] == range(11, 21)
assert [x.id for x in Foobar.select(order_by='id', limit=20, offset=20)] == range(21, 41)
assert [x.id for x in Foobar.select(order_by='id', offset=10)] == range(11, 51)
def test_select_criteria_overlaps():
Foobar.wipe()
test = Foobar()
test.a = [1, 2]
test.store()
test = Foobar()
test.a = []
test.store()
test = Foobar()
test.a = [2, 3]
test.store()
assert len(Foobar.select([st.Intersects('a', [1])])) == 1
assert len(Foobar.select([st.Intersects('a', [2])])) == 2
assert len(Foobar.select([st.Intersects('a', [4])])) == 0
assert len(Foobar.select([st.Intersects('a', [1, 2, 3])])) == 2
def test_count():
Foobar.wipe()
for x in range(1, 51):
test = Foobar()
test.store()
assert Foobar.count() == 50
assert Foobar.count([st.Less('id', 26)]) == 25
def test_select_criteria_or_and():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
assert [x.id for x in Foobar.select([st.Or([
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
assert [x.id for x in Foobar.select([st.And([st.Less('id', 10),
st.Greater('id', 5)])], order_by='id')] == range(6, 10)
def test_select_criteria_ilike():
Foobar.wipe()
for x in range(50):
test = Foobar()
if x < 20:
test.foo = 'foo'
else:
test.foo = 'bar'
test.store()
test.foo = None # makes sure it doesn't break on None
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select([st.ILike('foo', 'bar')], order_by='id')] == range(21, 50)
assert [x.id for x in Foobar.select([st.ILike('foo', 'BAR')], order_by='id')] == range(21, 50)
def test_store_async():
Foobar.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store(async=True)
assert test.id == 1
t0 = time.time()
while not Foobar.keys():
time.sleep(0.1)
if time.time()-t0 > 2:
raise Exception('timeout')
test = Foobar.get(1)
assert test.value == 'value'
def test_items():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.store()
assert sorted([(int(x), int(y.id)) for (x, y) in Foobar.items()]) == zip(range(1,51), range(1, 51))
def test_reversed_order():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select(order_by='-id', limit=10)] == range(50, 40, -1)
def test_destroy_rebuild_index():
test_get_with_indexed_value()
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
Foobar.destroy_indexes()
assert not os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
tests = Foobar.get_with_indexed_value('value', 'value')
assert len(tests) == 2
assert 'unique-value' in [x.unique_value for x in tests]
assert 'unique-value2' in [x.unique_value for x in tests]
assert 'unique-value3' not in [x.unique_value for x in tests]
# the indexes should have been rebuilt automatically
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))