wcs/tests/test_storage.py

350 lines
9.2 KiB
Python

import datetime
import os
import random
import time
import pytest
from quixote import cleanup
import wcs.qommon.storage as st
from wcs.qommon.storage import StorableObject, cache_umask
from .utilities import clean_temporary_pub, create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def teardown_module(module):
clean_temporary_pub()
class Foobar(StorableObject):
_names = 'tests%s' % random.randint(0, 100000)
_indexes = ['unique_value']
_hashed_indexes = ['value']
value = None
unique_value = None
class Foobar2(StorableObject):
_names = 'tests%s' % random.randint(0, 100000)
_indexes = ['unique_value']
_hashed_indexes = ['value']
value = None
unique_value = None
def test_store():
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
assert test.id == '1'
def test_get():
test = Foobar.get(1)
assert test.value == 'value'
def test_get_on_index():
test = Foobar.get_on_index('unique-value', 'unique_value')
assert test.value == 'value'
def test_remove_self():
test = Foobar.get(1)
test.remove_self()
with pytest.raises(KeyError):
test = Foobar.get(1)
test = Foobar.get(1, ignore_errors=True)
assert test is None
def test_get_on_index_changes():
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test = Foobar.get_on_index('unique-value', 'unique_value')
assert test.value == 'value'
test.unique_value = 'unique-value2'
test.store()
test = Foobar.get_on_index('unique-value2', 'unique_value')
assert test.value == 'value'
with pytest.raises(KeyError):
test = Foobar.get_on_index('unique-value', 'unique_value')
def test_select():
Foobar.wipe()
for x in range(1, 51):
test = Foobar()
test.unique_value = x
test.store()
assert len(Foobar.select()) == 50
assert len(Foobar.select(lambda x: x.unique_value < 26)) == 25
assert len(Foobar.select([st.Less('unique_value', 26)])) == 25
assert len(Foobar.select([st.Not(st.Equal('unique_value', 25))])) == 49
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 26])])) == 3
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 86])])) == 2
assert len(Foobar.select([st.Not(st.Contains('unique_value', [24, 25, 86]))])) == 48
def test_select_order_by():
Foobar.wipe()
for x in range(1, 51):
test = Foobar()
test.unique_value = 51 - x
test.store()
assert [int(x.id) for x in Foobar.select(order_by='id')] == list(range(1, 51))
assert [int(x.id) for x in Foobar.select(order_by='-id')] == list(range(50, 0, -1))
assert [int(x.id) for x in Foobar.select(order_by='unique_value')] == list(range(50, 0, -1))
assert [int(x.id) for x in Foobar.select(order_by='-unique_value')] == list(range(1, 51))
def test_select_datetime():
Foobar.wipe()
d = datetime.datetime(2014, 1, 1)
for i in range(50):
test = Foobar()
test.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
test.store()
assert len(Foobar.select()) == 50
assert len(Foobar.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(Foobar.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert len(Foobar.select([st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
assert (
len(Foobar.select([st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
)
def test_select_limit_offset():
Foobar.wipe()
for _ in range(50):
test = Foobar()
test.store()
assert len(Foobar.select()) == 50
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10)] == list(range(1, 11))
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10, offset=10)] == list(range(11, 21))
assert [int(x.id) for x in Foobar.select(order_by='id', limit=20, offset=20)] == list(range(21, 41))
assert [int(x.id) for x in Foobar.select(order_by='id', offset=10)] == list(range(11, 51))
def test_select_criteria_overlaps():
Foobar.wipe()
test = Foobar()
test.a = [1, 2]
test.store()
test = Foobar()
test.a = []
test.store()
test = Foobar()
test.a = [2, 3]
test.store()
assert len(Foobar.select([st.Intersects('a', [1])])) == 1
assert len(Foobar.select([st.Intersects('a', [2])])) == 2
assert len(Foobar.select([st.Intersects('a', [4])])) == 0
assert len(Foobar.select([st.Intersects('a', [1, 2, 3])])) == 2
def test_count():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.value = x + 1
test.store()
assert Foobar.count() == 50
assert Foobar.count([st.Less('value', 26)]) == 25
def test_select_criteria_or_and():
Foobar.wipe()
for x in range(50):
test = Foobar()
test.value = x + 1
test.store()
assert len(Foobar.select()) == 50
assert [int(x.id) for x in Foobar.select([st.Or([])], order_by='id')] == []
assert [int(x.id) for x in Foobar.select([st.Or([st.Less('value', 10)])], order_by='id')] == list(
range(1, 10)
)
assert [
int(x.id)
for x in Foobar.select([st.Or([st.Less('value', 10), st.Equal('value', 15)])], order_by='value')
] == list(range(1, 10)) + [15]
assert [
int(x.id)
for x in Foobar.select([st.And([st.Less('value', 10), st.Greater('value', 5)])], order_by='id')
] == list(range(6, 10))
def test_store_async():
Foobar.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store(async_op=True)
t0 = time.time()
while True:
time.sleep(0.1)
try:
test = Foobar.get(1)
break
except KeyError:
pass
if time.time() - t0 > 2:
raise Exception('timeout')
test = Foobar.get(1)
assert test.value == 'value'
def test_items():
Foobar.wipe()
for _ in range(50):
test = Foobar()
test.store()
assert sorted((int(x), int(y.id)) for (x, y) in Foobar.items()) == list(zip(range(1, 51), range(1, 51)))
def test_reversed_order():
Foobar.wipe()
for _ in range(50):
test = Foobar()
test.store()
assert len(Foobar.select()) == 50
assert [int(x.id) for x in Foobar.select(order_by='-id', limit=10)] == list(range(50, 40, -1))
def test_destroy_rebuild_index():
Foobar.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value2'
test.store()
test = Foobar()
test.value = 'value1'
test.unique_value = 'unique-value3'
test.store()
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
Foobar.destroy_indexes()
assert not os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
assert len(Foobar.get_ids_with_indexed_value('value', 'value')) == 2
# the indexes should have been rebuilt automatically
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
def test_concurrent_hashed_indexes():
Foobar.wipe()
children_pids = []
for i in range(10):
pid = os.fork()
if pid:
children_pids.append(pid)
else:
for i in range(10):
test = Foobar()
test.value = 'x'
test.value = [str(i)]
test.store(async_op=False)
os._exit(0)
for pid in children_pids:
os.waitpid(pid, 0)
assert Foobar.count() == 100
for i in range(10):
manual_selection = {x.id for x in Foobar.select() if x.value == [str(i)]}
index_selection = {x for x in Foobar.get_ids_with_indexed_value('value', str(i))}
assert manual_selection == index_selection
def test_umask():
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
os.umask(0o022)
cache_umask()
test.store()
assert (os.stat(test.get_object_filename()).st_mode % 0o1000) == 0o644
os.umask(0o002)
cache_umask()
test.store()
assert (os.stat(test.get_object_filename()).st_mode % 0o1000) == 0o664
def test_publisher_cache():
pub.reset_caches()
Foobar.wipe()
Foobar2.wipe()
test = Foobar()
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
test2 = Foobar2()
test2.value = 'value'
test2.unique_value = 'unique-value'
test2.store()
test = Foobar.cached_get('1')
assert test.value == 'value'
assert Foobar.cached_get('1') is test # same object
assert Foobar.get_on_index('unique-value', 'unique_value') is not test
assert Foobar.get_on_index('unique-value', 'unique_value', use_cache=True) is test
assert Foobar2.cached_get('1') is not test
assert Foobar2.cached_get('1') is Foobar2.get_on_index('unique-value', 'unique_value', use_cache=True)
with pytest.raises(KeyError):
Foobar2.get_on_index('unique-value', 'invalid', use_cache=True)
assert Foobar2.get_on_index('unique-value', 'invalid', use_cache=True, ignore_errors=True) is None