230 lines
6.2 KiB
Python
230 lines
6.2 KiB
Python
import datetime
|
|
import sys
|
|
import shutil
|
|
import tempfile
|
|
import random
|
|
|
|
import pytest
|
|
|
|
from quixote import cleanup
|
|
from wcs import publisher
|
|
from wcs.qommon.storage import StorableObject
|
|
import wcs.qommon.storage as st
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
|
|
global pub
|
|
|
|
publisher.WcsPublisher.APP_DIR = tempfile.mkdtemp()
|
|
pub = publisher.WcsPublisher.create_publisher()
|
|
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
|
|
class Foobar(StorableObject):
|
|
_names = 'tests%s' % random.randint(0, 100000)
|
|
_indexes = ['unique_value']
|
|
_hashed_indexes = ['value', 'dict_value']
|
|
|
|
value = None
|
|
unique_value = None
|
|
dict_value = None
|
|
|
|
def test_store():
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
assert test.id == 1
|
|
|
|
def test_get():
|
|
test = Foobar.get(1)
|
|
assert test.value == 'value'
|
|
|
|
def test_get_on_index():
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
def test_remove_self():
|
|
test = Foobar.get(1)
|
|
test.remove_self()
|
|
with pytest.raises(KeyError):
|
|
test = Foobar.get(1)
|
|
test = Foobar.get(1, ignore_errors=True)
|
|
assert test is None
|
|
|
|
def test_get_on_index_changes():
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
test = Foobar.get_on_index('unique-value2', 'unique_value')
|
|
assert test.value == 'value'
|
|
|
|
with pytest.raises(KeyError):
|
|
test = Foobar.get_on_index('unique-value', 'unique_value')
|
|
|
|
def test_get_with_indexed_value():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('value', 'value')
|
|
assert len(tests) == 2
|
|
assert 'unique-value' in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' not in [x.unique_value for x in tests]
|
|
|
|
def test_get_with_indexed_value_changes():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('value', 'value')
|
|
assert len(tests) == 2
|
|
assert 'unique-value' in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' not in [x.unique_value for x in tests]
|
|
|
|
test = Foobar.get_on_index('unique-value3', 'unique_value')
|
|
test.value = 'value'
|
|
test.store()
|
|
tests = Foobar.get_with_indexed_value('value', 'value')
|
|
assert len(tests) == 3
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
def test_get_with_indexed_value_dict():
|
|
Foobar.wipe()
|
|
|
|
test = Foobar()
|
|
test.unique_value = 'unique-value'
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value'
|
|
test.unique_value = 'unique-value2'
|
|
test.dict_value = {'plop': '2'}
|
|
test.store()
|
|
|
|
test = Foobar()
|
|
test.value = 'value1'
|
|
test.unique_value = 'unique-value3'
|
|
test.dict_value = {'plop': '2'}
|
|
test.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('dict_value', '2')
|
|
assert len(tests) == 2
|
|
assert 'unique-value' not in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
|
|
def test_get_with_indexed_value_dict_changes():
|
|
Foobar.wipe()
|
|
|
|
test1 = Foobar()
|
|
test1.unique_value = 'unique-value'
|
|
test1.store()
|
|
|
|
test2 = Foobar()
|
|
test2.value = 'value'
|
|
test2.unique_value = 'unique-value2'
|
|
test2.dict_value = {'plop': '2'}
|
|
test2.store()
|
|
|
|
test3 = Foobar()
|
|
test3.value = 'value1'
|
|
test3.unique_value = 'unique-value3'
|
|
test3.dict_value = {'plop': '2'}
|
|
test3.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('dict_value', '2')
|
|
assert len(tests) == 2
|
|
assert 'unique-value' not in [x.unique_value for x in tests]
|
|
assert 'unique-value2' in [x.unique_value for x in tests]
|
|
assert 'unique-value3' in [x.unique_value for x in tests]
|
|
|
|
test1.dict_value = {'plop': '2'}
|
|
test1.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('dict_value', '2')
|
|
assert len(tests) == 3
|
|
|
|
test1.dict_value = None
|
|
test1.store()
|
|
|
|
tests = Foobar.get_with_indexed_value('dict_value', '2')
|
|
assert len(tests) == 2
|
|
|
|
|
|
def test_select():
|
|
Foobar.wipe()
|
|
|
|
for x in range(1, 51):
|
|
test = Foobar()
|
|
test.unique_value = x
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert len(Foobar.select(lambda x: x.unique_value < 26)) == 25
|
|
assert len(Foobar.select([st.Less('unique_value', 26)])) == 25
|
|
assert len(Foobar.select([st.Less('unique_value', 25), st.GreaterOrEqual('unique_value', 10)])) == 15
|
|
assert len(Foobar.select([st.NotEqual('unique_value', 25)])) == 49
|
|
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 26])])) == 3
|
|
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 86])])) == 2
|
|
assert len(Foobar.select([st.NotContains('unique_value', [24, 25, 86])])) == 48
|
|
|
|
|
|
def test_select_datetime():
|
|
Foobar.wipe()
|
|
|
|
d = datetime.datetime(2014, 1, 1)
|
|
for i in range(50):
|
|
test = Foobar()
|
|
test.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
|
|
test.store()
|
|
|
|
assert len(Foobar.select()) == 50
|
|
|
|
assert len(Foobar.select(lambda x: x.receipt_time == d.timetuple())) == 1
|
|
assert len(Foobar.select([st.Equal('receipt_time', d.timetuple())])) == 1
|
|
assert len(Foobar.select([
|
|
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
|
|
assert len(Foobar.select([
|
|
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
|