wcs/tests/test_sql.py

1004 lines
29 KiB
Python

import datetime
import os
import random
import shutil
import sys
import time
from quixote import cleanup
from wcs import formdef, publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
from wcs import sql
import wcs.qommon.storage as st
from utilities import create_temporary_pub
import pytest
postgresql = pytest.mark.postgresql
try:
import psycopg2
except ImportError:
pass
def setup_module(module):
global pub, formdef
cleanup()
pub = create_temporary_pub()
pub.is_using_postgresql = lambda: True
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
dbname = 'wcstests%d' % random.randint(0, 100000)
cur.execute('CREATE DATABASE %s' % dbname)
cur.close()
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
formdef = formdef.FormDef()
formdef.name = 'tests'
formdef.fields = [
fields.StringField(id='0', label='string'),
fields.EmailField(id='1', label='email'),
fields.TextField(id='2', label='text'),
fields.BoolField(id='3', label='bool'),
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
fields.DateField(id='5', label='date'),
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
]
formdef.store()
pub.initialize_sql()
conn.close()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
if hasattr(pub, 'pgconn') and pub.pgconn:
pub.pgconn.close()
conn = psycopg2.connect(user=os.environ['USER'])
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute('DROP DATABASE %s' % pub.cfg['postgresql']['database'])
cur.close()
@postgresql
def test_sql_table_name_invalid_chars():
test_formdef = FormDef()
test_formdef.name = 'test-some|char;are better/ignored'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_data_class():
data_class = formdef.data_class(mode='sql')
@postgresql
def test_sql_count():
data_class = formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_store():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
assert formdata.id
@postgresql
def test_sql_get():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.user_id == '5'
@postgresql
def test_sql_get_missing():
data_class = formdef.data_class(mode='sql')
with pytest.raises(KeyError):
data_class.get(123456)
@postgresql
def test_sql_get_missing_ignore_errors():
data_class = formdef.data_class(mode='sql')
assert data_class.get(123456, ignore_errors=True) is None
def check_sql_field(no, value):
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {no: value}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get(no) == value
@postgresql
def test_sql_field_string():
check_sql_field('0', 'hello world')
@postgresql
def test_sql_field_email():
check_sql_field('1', 'fred@example.com')
@postgresql
def test_sql_field_text():
check_sql_field('2', 'long text')
@postgresql
def test_sql_field_bool():
check_sql_field('3', False)
check_sql_field('3', True)
@postgresql
def test_sql_field_item():
check_sql_field('4', 'apricot')
@postgresql
def test_sql_field_date():
check_sql_field('5', datetime.date.today().timetuple())
@postgresql
def test_sql_field_items():
check_sql_field('6', ['apricot'])
check_sql_field('6', ['apricot', 'pear'])
@postgresql
def test_sql_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.data = {'0': 'test2'}
formdata.store()
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test2'
@postgresql
def test_sql_remove():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.remove_self()
with pytest.raises(KeyError):
data_class.get(id)
@postgresql
def test_sql_wipe():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.store()
assert data_class.count() != 0
data_class.wipe()
assert data_class.count() == 0
@postgresql
def test_sql_evolution():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
@postgresql
def test_sql_evolution_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
formdata.evolution[-1].comment = 'foobar'
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'foobar'
@postgresql
def test_sql_multiple_evolutions():
data_class = formdef.data_class(mode='sql')
for i in range(20):
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world %d' % i
formdata.evolution.append(evo)
formdata.store()
values = data_class.select()
data_class.load_all_evolutions(values)
assert [x._evolution for x in values]
@postgresql
def test_sql_get_ids_with_indexed_value():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('user_id', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_sql_get_ids_from_query():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.data = {'2': 'this is some reasonably long text'}
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.data = {'2': 'hello world is still a classical example'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.data = {'2': 'you would think other ideas of text would emerge'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_from_query('text')
assert set(ids) == set([id1, id3])
ids = data_class.get_ids_from_query('classical')
assert set(ids) == set([id2])
@postgresql
def test_sql_rollback_on_error():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
with pytest.raises(psycopg2.Error):
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
# column in the table.
data_class.get_ids_with_indexed_value('FOOBAR', '2')
data_class.wipe()
@postgresql
def test_sql_get_ids_with_indexed_value_dict():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_create_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
@postgresql
def test_get_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
assert sql.SqlUser.get(user.id) is not None
@postgresql
def test_get_missing_user():
sql.SqlUser.wipe()
with pytest.raises(KeyError):
sql.SqlUser.get(12345)
@postgresql
def test_get_missing_user_ignore_errors():
sql.SqlUser.wipe()
assert sql.SqlUser.get(12345, ignore_errors=True) is None
@postgresql
def test_get_users_with_role():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.roles = [1]
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_role(1)) == 1
@postgresql
def test_get_users_with_name_identifier():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
@postgresql
def test_urlname_change():
global formef
data_class = formdef.data_class(mode='sql')
data_class.wipe()
assert formdef.url_name == 'tests'
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests2'
formdef.name = 'tests'
formdef.store()
assert formdef.url_name == 'tests'
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
assert data_class.count() == 1
@postgresql
def test_sql_table_add_and_remove_fields():
test_formdef = FormDef()
test_formdef.name = 'tests and and remove fields'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
test_formdef.fields = [
fields.StringField(label='string'),
fields.EmailField(label='email'),
]
for field in test_formdef.fields:
if field.id is None:
field.id = test_formdef.get_new_field_id()
test_formdef.store()
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
previous_id = test_formdef.fields[-1].id
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields.append(fields.StringField(label='item'))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
assert test_formdef.fields[-1].id != previous_id
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
@postgresql
def test_sql_table_select():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.id < 26)) == 25
assert len(data_class.select([st.Less('id', 26)])) == 25
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
@postgresql
def test_sql_table_select_datetime():
test_formdef = FormDef()
test_formdef.name = 'table select datetime'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
d = datetime.datetime(2014, 1, 1)
for i in range(50):
t = data_class()
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert len(data_class.select([
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
assert len(data_class.select([
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
@postgresql
def test_select_limit_offset():
test_formdef = FormDef()
test_formdef.name = 'table select limit offset'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select(order_by='id')] == range(1, 51)
assert [x.id for x in data_class.select(order_by='id', limit=10)] == range(1, 11)
assert [x.id for x in data_class.select(order_by='id', limit=10, offset=10)] == range(11, 21)
assert [x.id for x in data_class.select(order_by='id', limit=20, offset=20)] == range(21, 41)
assert [x.id for x in data_class.select(order_by='id', offset=10)] == range(11, 51)
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10)]) == 10
@postgresql
def test_select_criteria_intersects():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot', 'pear']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': []}
formdata.store()
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
@postgresql
def test_count():
test_formdef = FormDef()
test_formdef.name = 'table select count'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert data_class.count([st.Less('id', 26)]) == 25
@postgresql
def test_select_criteria_or_and():
test_formdef = FormDef()
test_formdef.name = 'table select criteria or and'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
assert [x.id for x in data_class.select([st.Or([
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
assert [x.id for x in data_class.select([st.And([
st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == range(6, 10)
@postgresql
def test_sql_table_select_bool():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.data = {'3': False}
t.store()
t.data = {'3': True}
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select([st.Equal('f3', True)])) == 1
assert len(data_class.select([st.Equal('f3', False)])) == 49
@postgresql
def test_sql_criteria_ilike():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.store()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)
def table_exists(cur, table_name):
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = %s''', (table_name,))
return bool(cur.fetchone()[0] == 1)
def column_exists_in_table(cur, table_name, column_name):
cur.execute('''SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = %s
AND column_name = %s''', (table_name, column_name))
return bool(cur.fetchone()[0] == 1)
@postgresql
def test_sql_level():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
assert sql.get_sql_level(conn, cur) == 0
sql.migrate()
assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL
# insert negative SQL level, to trigger an error, and check it's not
# changed.
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''',
(str(-1), 'sql_level'))
assert sql.get_sql_level(conn, cur) == -1
with pytest.raises(RuntimeError):
sql.migrate()
assert sql.get_sql_level(conn, cur) == -1
conn.commit()
cur.close()
@postgresql
def test_migration_1_tracking_code():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
cur.execute('DROP TABLE tracking_codes')
sql.migrate()
assert table_exists(cur, 'tracking_codes')
assert table_exists(cur, 'wcs_meta')
conn.commit()
cur.close()
@postgresql
def test_migration_2_formdef_id_in_views():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN f4_display')
sql.redo_views(conn, cur, formdef, rebuild_global_views=False)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
assert table_exists(cur, 'wcs_view_1_tests')
assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
view_names = []
cur.execute('''SELECT table_name FROM information_schema.views
WHERE table_name LIKE %s''', ('wcs\_view\_%',))
while True:
row = cur.fetchone()
if row is None:
break
view_names.append(row[0])
fake_formdef = FormDef()
common_fields = sql.get_view_fields(fake_formdef)
# remove formdef_id for the purpose of this test
common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
', '.join([y[1] for y in common_fields]), x) for x in view_names])
assert not 'formdef_id' in union
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
sql.migrate()
assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
conn.commit()
cur.close()
@postgresql
def test_migration_6_actions_roles():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN actions_roles_array')
sql.drop_views(formdef, conn, cur)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
assert not column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_all_forms', 'actions_roles_array')
conn.commit()
cur.close()
def drop_formdef_tables():
conn, cur = sql.get_connection_and_cursor()
cur.execute('''SELECT table_name FROM information_schema.tables''')
table_names = []
while True:
row = cur.fetchone()
if row is None:
break
table_names.append(row[0])
for table_name in table_names:
if table_name.startswith('formdata_'):
cur.execute('DROP TABLE %s CASCADE' % table_name)
@postgresql
def test_is_at_endpoint():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test endpoint'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.forced_endpoint = True
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2']
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 2)
@postgresql
def test_views_fts():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef = FormDef()
formdef.name = 'test fts'
formdef.fields = [
fields.StringField(id='0', label='string'),
]
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.data = {'0': 'foo bar'}
formdata1.store()
formdata2 = data_class()
formdata2.data = {'0': 'foo'}
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('foo',))
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('bar',))
assert bool(cur.fetchone()[0] == 1)
@postgresql
def test_select_any_formdata():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
cnt = 0
for i in range(5):
formdef = FormDef()
formdef.name = 'test any %d' % i
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
for j in range(20):
formdata = data_class()
formdata.just_created()
formdata.user_id = '%s' % ((i+j)%11)
# set receipt_time to make sure all entries are unique.
formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple()
formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i+j)%4]
formdata.store()
cnt += 1
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 100
# make sure valid formdefs are used
assert len([x for x in objects if x.formdef.name == 'test any 0']) == 20
assert len([x for x in objects if x.formdef.name == 'test any 1']) == 20
# test sorting
objects = sql.AnyFormData.select(order_by='receipt_time')
assert len(objects) == 100
objects2 = sql.AnyFormData.select(order_by='-receipt_time')
assert [(x.formdef_id, x.id) for x in objects2] == list(reversed(
[(x.formdef_id, x.id) for x in objects]))
# test clauses
objects2 = sql.AnyFormData.select([st.Equal('user_id', '0')])
assert len(objects2) == len([x for x in objects if x.user_id == '0'])
objects2 = sql.AnyFormData.select([st.Equal('is_at_endpoint', True)])
assert len(objects2) == len([x for x in objects if x.status in ('wf-rejected', 'wf-finished')])
# test offset/limit
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=0)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][:10]
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=20)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][20:30]
@postgresql
def test_actions_roles():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '1']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test actions roles'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata_id = formdata.id
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.items[0].by = ['2', '3']
wf.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
assert bool(cur.fetchone()[0] == 0)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['2', '3']''')
assert bool(cur.fetchone()[0] == 1)
# using criterias
criterias = [st.Intersects('actions_roles_array', ['2', '3'])]
total_count = sql.AnyFormData.count(criterias)
formdatas = sql.AnyFormData.select(criterias)
assert total_count == 1
assert formdatas[0].id == formdata_id