wcs/tests/test_sql.py

670 lines
18 KiB
Python

import datetime
import os
import random
import shutil
import sys
import time
from quixote import cleanup
from wcs import formdef, publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs import sql
import wcs.qommon.storage as st
from utilities import create_temporary_pub
import pytest
postgresql = pytest.mark.postgresql
try:
import psycopg2
except ImportError:
pass
def setup_module(module):
global pub, formdef
cleanup()
pub = create_temporary_pub()
pub.has_site_option = lambda x: True
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
dbname = 'wcstests%d' % random.randint(0, 100000)
cur.execute('CREATE DATABASE %s' % dbname)
cur.close()
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
formdef = formdef.FormDef()
formdef.name = 'tests'
formdef.fields = [
fields.StringField(id='0', label='string'),
fields.EmailField(id='1', label='email'),
fields.TextField(id='2', label='text'),
fields.BoolField(id='3', label='bool'),
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
fields.DateField(id='5', label='date'),
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
]
formdef.store()
sql.do_user_table()
conn.close()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
if hasattr(pub, 'pgconn') and pub.pgconn:
pub.pgconn.close()
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute('DROP DATABASE %s' % pub.cfg['postgresql']['database'])
cur.close()
@postgresql
def test_sql_table_name_invalid_chars():
test_formdef = FormDef()
test_formdef.name = 'test-some|char;are better/ignored'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_data_class():
data_class = formdef.data_class(mode='sql')
@postgresql
def test_sql_count():
data_class = formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_store():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
assert formdata.id
@postgresql
def test_sql_get():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.user_id == '5'
@postgresql
def test_sql_get_missing():
data_class = formdef.data_class(mode='sql')
with pytest.raises(KeyError):
data_class.get(123456)
@postgresql
def test_sql_get_missing_ignore_errors():
data_class = formdef.data_class(mode='sql')
assert data_class.get(123456, ignore_errors=True) is None
def check_sql_field(no, value):
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {no: value}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get(no) == value
@postgresql
def test_sql_field_string():
check_sql_field('0', 'hello world')
@postgresql
def test_sql_field_email():
check_sql_field('1', 'fred@example.com')
@postgresql
def test_sql_field_text():
check_sql_field('2', 'long text')
@postgresql
def test_sql_field_bool():
check_sql_field('3', False)
check_sql_field('3', True)
@postgresql
def test_sql_field_item():
check_sql_field('4', 'apricot')
@postgresql
def test_sql_field_date():
check_sql_field('5', datetime.date.today().timetuple())
@postgresql
def test_sql_field_items():
check_sql_field('6', ['apricot'])
check_sql_field('6', ['apricot', 'pear'])
@postgresql
def test_sql_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.data = {'0': 'test2'}
formdata.store()
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test2'
@postgresql
def test_sql_remove():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.remove_self()
with pytest.raises(KeyError):
data_class.get(id)
@postgresql
def test_sql_wipe():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.store()
assert data_class.count() != 0
data_class.wipe()
assert data_class.count() == 0
@postgresql
def test_sql_evolution():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
@postgresql
def test_sql_evolution_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
formdata.evolution[-1].comment = 'foobar'
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'foobar'
@postgresql
def test_sql_get_ids_with_indexed_value():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('user_id', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_sql_get_ids_from_query():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.data = {'2': 'this is some reasonably long text'}
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.data = {'2': 'hello world is still a classical example'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.data = {'2': 'you would think other ideas of text would emerge'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_from_query('text')
assert set(ids) == set([id1, id3])
ids = data_class.get_ids_from_query('classical')
assert set(ids) == set([id2])
@postgresql
def test_sql_rollback_on_error():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
with pytest.raises(psycopg2.Error):
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
# column in the table.
data_class.get_ids_with_indexed_value('FOOBAR', '2')
data_class.wipe()
@postgresql
def test_sql_get_ids_with_indexed_value_dict():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_create_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
@postgresql
def test_get_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
assert sql.SqlUser.get(user.id) is not None
@postgresql
def test_get_missing_user():
sql.SqlUser.wipe()
with pytest.raises(KeyError):
sql.SqlUser.get(12345)
@postgresql
def test_get_missing_user_ignore_errors():
sql.SqlUser.wipe()
assert sql.SqlUser.get(12345, ignore_errors=True) is None
@postgresql
def test_get_users_with_role():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.roles = [1]
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_role(1)) == 1
@postgresql
def test_get_users_with_name_identifier():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
@postgresql
def test_urlname_change():
global formef
data_class = formdef.data_class(mode='sql')
data_class.wipe()
assert formdef.url_name == 'tests'
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests2'
formdef.name = 'tests'
formdef.store()
assert formdef.url_name == 'tests'
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
assert data_class.count() == 1
@postgresql
def test_sql_table_add_and_remove_fields():
test_formdef = FormDef()
test_formdef.name = 'tests and and remove fields'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
test_formdef.fields = [
fields.StringField(label='string'),
fields.EmailField(label='email'),
]
for field in test_formdef.fields:
if field.id is None:
field.id = test_formdef.get_new_field_id()
test_formdef.store()
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
previous_id = test_formdef.fields[-1].id
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields.append(fields.StringField(label='item'))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
assert test_formdef.fields[-1].id != previous_id
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
@postgresql
def test_sql_table_select():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.id < 26)) == 25
assert len(data_class.select([st.Less('id', 26)])) == 25
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
@postgresql
def test_sql_table_select_datetime():
test_formdef = FormDef()
test_formdef.name = 'table select datetime'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
d = datetime.datetime(2014, 1, 1)
for i in range(50):
t = data_class()
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert len(data_class.select([
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
assert len(data_class.select([
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
@postgresql
def test_select_limit_offset():
test_formdef = FormDef()
test_formdef.name = 'table select limit offset'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select(order_by='id')] == range(1, 51)
assert [x.id for x in data_class.select(order_by='id', limit=10)] == range(1, 11)
assert [x.id for x in data_class.select(order_by='id', limit=10, offset=10)] == range(11, 21)
assert [x.id for x in data_class.select(order_by='id', limit=20, offset=20)] == range(21, 41)
assert [x.id for x in data_class.select(order_by='id', offset=10)] == range(11, 51)
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10)]) == 10
@postgresql
def test_select_criteria_intersects():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot', 'pear']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': []}
formdata.store()
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
@postgresql
def test_count():
test_formdef = FormDef()
test_formdef.name = 'table select count'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert data_class.count([st.Less('id', 26)]) == 25
@postgresql
def test_select_criteria_or_and():
test_formdef = FormDef()
test_formdef.name = 'table select criteria or and'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
assert [x.id for x in data_class.select([st.Or([
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
assert [x.id for x in data_class.select([st.And([
st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == range(6, 10)
@postgresql
def test_sql_table_select_bool():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.data = {'3': False}
t.store()
t.data = {'3': True}
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select([st.Equal('f3', True)])) == 1
assert len(data_class.select([st.Equal('f3', False)])) == 49
@postgresql
def test_sql_criteria_ilike():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.store()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)