1004 lines
29 KiB
Python
1004 lines
29 KiB
Python
import datetime
|
|
import os
|
|
import random
|
|
import shutil
|
|
import sys
|
|
import time
|
|
|
|
from quixote import cleanup
|
|
|
|
from wcs import formdef, publisher, fields
|
|
from wcs.formdef import FormDef
|
|
from wcs.formdata import Evolution
|
|
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
|
|
from wcs import sql
|
|
import wcs.qommon.storage as st
|
|
|
|
from utilities import create_temporary_pub
|
|
|
|
import pytest
|
|
postgresql = pytest.mark.postgresql
|
|
|
|
try:
|
|
import psycopg2
|
|
except ImportError:
|
|
pass
|
|
|
|
def setup_module(module):
|
|
global pub, formdef
|
|
|
|
cleanup()
|
|
|
|
pub = create_temporary_pub()
|
|
pub.is_using_postgresql = lambda: True
|
|
|
|
conn = psycopg2.connect(user=os.environ['USER'])
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = conn.cursor()
|
|
dbname = 'wcstests%d' % random.randint(0, 100000)
|
|
cur.execute('CREATE DATABASE %s' % dbname)
|
|
cur.close()
|
|
|
|
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
|
|
|
|
formdef = formdef.FormDef()
|
|
formdef.name = 'tests'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='string'),
|
|
fields.EmailField(id='1', label='email'),
|
|
fields.TextField(id='2', label='text'),
|
|
fields.BoolField(id='3', label='bool'),
|
|
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
|
|
fields.DateField(id='5', label='date'),
|
|
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
|
|
]
|
|
formdef.store()
|
|
|
|
pub.initialize_sql()
|
|
|
|
conn.close()
|
|
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
if hasattr(pub, 'pgconn') and pub.pgconn:
|
|
pub.pgconn.close()
|
|
|
|
conn = psycopg2.connect(user=os.environ['USER'])
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = conn.cursor()
|
|
cur.execute('DROP DATABASE %s' % pub.cfg['postgresql']['database'])
|
|
cur.close()
|
|
|
|
@postgresql
|
|
def test_sql_table_name_invalid_chars():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'test-some|char;are better/ignored'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
assert test_formdef.table_name is not None
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_data_class():
|
|
data_class = formdef.data_class(mode='sql')
|
|
|
|
@postgresql
|
|
def test_sql_count():
|
|
data_class = formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_store():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
assert formdata.id
|
|
|
|
@postgresql
|
|
def test_sql_get():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.user_id == '5'
|
|
|
|
@postgresql
|
|
def test_sql_get_missing():
|
|
data_class = formdef.data_class(mode='sql')
|
|
with pytest.raises(KeyError):
|
|
data_class.get(123456)
|
|
|
|
@postgresql
|
|
def test_sql_get_missing_ignore_errors():
|
|
data_class = formdef.data_class(mode='sql')
|
|
assert data_class.get(123456, ignore_errors=True) is None
|
|
|
|
def check_sql_field(no, value):
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {no: value}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get(no) == value
|
|
|
|
@postgresql
|
|
def test_sql_field_string():
|
|
check_sql_field('0', 'hello world')
|
|
|
|
@postgresql
|
|
def test_sql_field_email():
|
|
check_sql_field('1', 'fred@example.com')
|
|
|
|
@postgresql
|
|
def test_sql_field_text():
|
|
check_sql_field('2', 'long text')
|
|
|
|
@postgresql
|
|
def test_sql_field_bool():
|
|
check_sql_field('3', False)
|
|
check_sql_field('3', True)
|
|
|
|
@postgresql
|
|
def test_sql_field_item():
|
|
check_sql_field('4', 'apricot')
|
|
|
|
@postgresql
|
|
def test_sql_field_date():
|
|
check_sql_field('5', datetime.date.today().timetuple())
|
|
|
|
@postgresql
|
|
def test_sql_field_items():
|
|
check_sql_field('6', ['apricot'])
|
|
check_sql_field('6', ['apricot', 'pear'])
|
|
|
|
@postgresql
|
|
def test_sql_change():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {'0': 'test'}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test'
|
|
|
|
formdata.data = {'0': 'test2'}
|
|
formdata.store()
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test2'
|
|
|
|
@postgresql
|
|
def test_sql_remove():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {'0': 'test'}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test'
|
|
|
|
formdata.remove_self()
|
|
with pytest.raises(KeyError):
|
|
data_class.get(id)
|
|
|
|
@postgresql
|
|
def test_sql_wipe():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.store()
|
|
|
|
assert data_class.count() != 0
|
|
data_class.wipe()
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_evolution():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 1
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world'
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'hello world'
|
|
|
|
@postgresql
|
|
def test_sql_evolution_change():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 1
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world'
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'hello world'
|
|
|
|
formdata.evolution[-1].comment = 'foobar'
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'foobar'
|
|
|
|
@postgresql
|
|
def test_sql_multiple_evolutions():
|
|
data_class = formdef.data_class(mode='sql')
|
|
for i in range(20):
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world %d' % i
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
|
|
values = data_class.select()
|
|
data_class.load_all_evolutions(values)
|
|
assert [x._evolution for x in values]
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_with_indexed_value():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.user_id = '2'
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.user_id = '2'
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_with_indexed_value('user_id', '2')
|
|
assert set(ids) == set([id2, id3])
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_from_query():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'this is some reasonably long text'}
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'hello world is still a classical example'}
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'you would think other ideas of text would emerge'}
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_from_query('text')
|
|
assert set(ids) == set([id1, id3])
|
|
|
|
ids = data_class.get_ids_from_query('classical')
|
|
assert set(ids) == set([id2])
|
|
|
|
|
|
@postgresql
|
|
def test_sql_rollback_on_error():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
with pytest.raises(psycopg2.Error):
|
|
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
|
|
# column in the table.
|
|
data_class.get_ids_with_indexed_value('FOOBAR', '2')
|
|
data_class.wipe()
|
|
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_with_indexed_value_dict():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.workflow_roles = {'plop': '2'}
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.workflow_roles = {'plop': '2'}
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
|
|
assert set(ids) == set([id2, id3])
|
|
|
|
@postgresql
|
|
def test_create_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.store()
|
|
|
|
@postgresql
|
|
def test_get_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.store()
|
|
|
|
assert sql.SqlUser.get(user.id) is not None
|
|
|
|
@postgresql
|
|
def test_get_missing_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
with pytest.raises(KeyError):
|
|
sql.SqlUser.get(12345)
|
|
|
|
@postgresql
|
|
def test_get_missing_user_ignore_errors():
|
|
sql.SqlUser.wipe()
|
|
|
|
assert sql.SqlUser.get(12345, ignore_errors=True) is None
|
|
|
|
@postgresql
|
|
def test_get_users_with_role():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.roles = [1]
|
|
user.store()
|
|
user_id = user.id
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Papier'
|
|
user.store()
|
|
|
|
assert len(sql.SqlUser.get_users_with_role(1)) == 1
|
|
|
|
@postgresql
|
|
def test_get_users_with_name_identifier():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.name_identifiers = ['foo']
|
|
user.store()
|
|
user_id = user.id
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Papier'
|
|
user.store()
|
|
|
|
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
|
|
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
|
|
|
|
|
|
@postgresql
|
|
def test_urlname_change():
|
|
global formef
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
formdef.name = 'tests2'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests2'
|
|
|
|
formdef.name = 'tests'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
|
|
formdef.name = 'tests2'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
assert data_class.count() == 1
|
|
|
|
@postgresql
|
|
def test_sql_table_add_and_remove_fields():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'tests and and remove fields'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
assert test_formdef.table_name is not None
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
test_formdef.fields = [
|
|
fields.StringField(label='string'),
|
|
fields.EmailField(label='email'),
|
|
]
|
|
|
|
for field in test_formdef.fields:
|
|
if field.id is None:
|
|
field.id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
test_formdef.fields.append(
|
|
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
previous_id = test_formdef.fields[-1].id
|
|
test_formdef.fields = test_formdef.fields[:-1]
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
test_formdef.fields.append(fields.StringField(label='item'))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
assert test_formdef.fields[-1].id != previous_id
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
test_formdef.fields = test_formdef.fields[:-1]
|
|
test_formdef.fields.append(
|
|
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert len(data_class.select(lambda x: x.id < 26)) == 25
|
|
assert len(data_class.select([st.Less('id', 26)])) == 25
|
|
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
|
|
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
|
|
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
|
|
|
|
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
|
|
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
|
|
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select_datetime():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select datetime'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
d = datetime.datetime(2014, 1, 1)
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
|
|
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
|
|
assert len(data_class.select([
|
|
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
|
|
assert len(data_class.select([
|
|
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
|
|
|
|
|
|
@postgresql
|
|
def test_select_limit_offset():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select limit offset'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert len(data_class.select()) == 50
|
|
assert [x.id for x in data_class.select(order_by='id')] == range(1, 51)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=10)] == range(1, 11)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=10, offset=10)] == range(11, 21)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=20, offset=20)] == range(21, 41)
|
|
assert [x.id for x in data_class.select(order_by='id', offset=10)] == range(11, 51)
|
|
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10)]) == 10
|
|
|
|
|
|
@postgresql
|
|
def test_select_criteria_intersects():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': ['apricot']}
|
|
formdata.store()
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': ['apricot', 'pear']}
|
|
formdata.store()
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': []}
|
|
formdata.store()
|
|
|
|
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
|
|
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
|
|
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
|
|
|
|
|
|
@postgresql
|
|
def test_count():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select count'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert data_class.count([st.Less('id', 26)]) == 25
|
|
|
|
|
|
@postgresql
|
|
def test_select_criteria_or_and():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select criteria or and'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
|
|
assert [x.id for x in data_class.select([st.Or([
|
|
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
|
|
assert [x.id for x in data_class.select([st.And([
|
|
st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == range(6, 10)
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select_bool():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select bool'
|
|
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.data = {'3': False}
|
|
t.store()
|
|
t.data = {'3': True}
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
assert len(data_class.select([st.Equal('f3', True)])) == 1
|
|
assert len(data_class.select([st.Equal('f3', False)])) == 49
|
|
|
|
|
|
@postgresql
|
|
def test_sql_criteria_ilike():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select bool'
|
|
test_formdef.fields = [fields.StringField(id='3', label='string')]
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
if i < 20:
|
|
t.data = {'3': 'foo'}
|
|
else:
|
|
t.data = {'3': 'bar'}
|
|
t.store()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
|
|
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)
|
|
|
|
def table_exists(cur, table_name):
|
|
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
|
|
WHERE table_name = %s''', (table_name,))
|
|
return bool(cur.fetchone()[0] == 1)
|
|
|
|
def column_exists_in_table(cur, table_name, column_name):
|
|
cur.execute('''SELECT COUNT(*) FROM information_schema.columns
|
|
WHERE table_name = %s
|
|
AND column_name = %s''', (table_name, column_name))
|
|
return bool(cur.fetchone()[0] == 1)
|
|
|
|
@postgresql
|
|
def test_sql_level():
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
cur.execute('DROP TABLE wcs_meta')
|
|
assert sql.get_sql_level(conn, cur) == 0
|
|
sql.migrate()
|
|
assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL
|
|
|
|
# insert negative SQL level, to trigger an error, and check it's not
|
|
# changed.
|
|
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''',
|
|
(str(-1), 'sql_level'))
|
|
assert sql.get_sql_level(conn, cur) == -1
|
|
with pytest.raises(RuntimeError):
|
|
sql.migrate()
|
|
assert sql.get_sql_level(conn, cur) == -1
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
@postgresql
|
|
def test_migration_1_tracking_code():
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
cur.execute('DROP TABLE wcs_meta')
|
|
cur.execute('DROP TABLE tracking_codes')
|
|
sql.migrate()
|
|
assert table_exists(cur, 'tracking_codes')
|
|
assert table_exists(cur, 'wcs_meta')
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
@postgresql
|
|
def test_migration_2_formdef_id_in_views():
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
|
|
cur.execute('DROP VIEW wcs_all_forms')
|
|
|
|
# hack a formdef table the wrong way, to check it is reconstructed
|
|
# properly before the views are created
|
|
formdef.fields[4] = fields.StringField(id='4', label='item')
|
|
cur.execute('DROP VIEW wcs_view_1_tests')
|
|
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN f4_display')
|
|
sql.redo_views(conn, cur, formdef, rebuild_global_views=False)
|
|
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
|
|
assert table_exists(cur, 'wcs_view_1_tests')
|
|
assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
|
|
|
|
view_names = []
|
|
cur.execute('''SELECT table_name FROM information_schema.views
|
|
WHERE table_name LIKE %s''', ('wcs\_view\_%',))
|
|
while True:
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
break
|
|
view_names.append(row[0])
|
|
|
|
fake_formdef = FormDef()
|
|
common_fields = sql.get_view_fields(fake_formdef)
|
|
# remove formdef_id for the purpose of this test
|
|
common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
|
|
|
|
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
|
|
', '.join([y[1] for y in common_fields]), x) for x in view_names])
|
|
assert not 'formdef_id' in union
|
|
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
|
|
|
|
sql.migrate()
|
|
|
|
assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
@postgresql
|
|
def test_migration_6_actions_roles():
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
|
|
cur.execute('DROP VIEW wcs_all_forms')
|
|
|
|
# hack a formdef table the wrong way, to check it is reconstructed
|
|
# properly before the views are created
|
|
formdef.fields[4] = fields.StringField(id='4', label='item')
|
|
cur.execute('DROP VIEW wcs_view_1_tests')
|
|
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN actions_roles_array')
|
|
sql.drop_views(formdef, conn, cur)
|
|
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
|
|
assert not column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
|
|
|
|
sql.migrate()
|
|
|
|
assert column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
|
|
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'actions_roles_array')
|
|
assert column_exists_in_table(cur, 'wcs_all_forms', 'actions_roles_array')
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
|
|
def drop_formdef_tables():
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
cur.execute('''SELECT table_name FROM information_schema.tables''')
|
|
table_names = []
|
|
while True:
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
break
|
|
table_names.append(row[0])
|
|
for table_name in table_names:
|
|
if table_name.startswith('formdata_'):
|
|
cur.execute('DROP TABLE %s CASCADE' % table_name)
|
|
|
|
|
|
@postgresql
|
|
def test_is_at_endpoint():
|
|
drop_formdef_tables()
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
|
|
wf = Workflow(name='test endpoint')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf.store()
|
|
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test endpoint'
|
|
formdef.fields = []
|
|
formdef.workflow = wf
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-st1'
|
|
formdata.store()
|
|
formdata = data_class()
|
|
formdata.status = 'wf-st2'
|
|
formdata.store()
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
|
|
assert bool(cur.fetchone()[0] == 2)
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''')
|
|
assert bool(cur.fetchone()[0] == 1)
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
|
|
assert bool(cur.fetchone()[0] == 1)
|
|
|
|
# check a change to workflow is reflected in the database
|
|
st1.forced_endpoint = True
|
|
wf.store()
|
|
assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2']
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
|
|
assert bool(cur.fetchone()[0] == 2)
|
|
|
|
@postgresql
|
|
def test_views_fts():
|
|
drop_formdef_tables()
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test fts'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='string'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata1 = data_class()
|
|
formdata1.data = {'0': 'foo bar'}
|
|
formdata1.store()
|
|
|
|
formdata2 = data_class()
|
|
formdata2.data = {'0': 'foo'}
|
|
formdata2.store()
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('foo',))
|
|
assert bool(cur.fetchone()[0] == 2)
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('bar',))
|
|
assert bool(cur.fetchone()[0] == 1)
|
|
|
|
@postgresql
|
|
def test_select_any_formdata():
|
|
drop_formdef_tables()
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
cnt = 0
|
|
for i in range(5):
|
|
formdef = FormDef()
|
|
formdef.name = 'test any %d' % i
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
for j in range(20):
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.user_id = '%s' % ((i+j)%11)
|
|
# set receipt_time to make sure all entries are unique.
|
|
formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple()
|
|
formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i+j)%4]
|
|
formdata.store()
|
|
cnt += 1
|
|
|
|
# test generic select
|
|
objects = sql.AnyFormData.select()
|
|
assert len(objects) == 100
|
|
|
|
# make sure valid formdefs are used
|
|
assert len([x for x in objects if x.formdef.name == 'test any 0']) == 20
|
|
assert len([x for x in objects if x.formdef.name == 'test any 1']) == 20
|
|
|
|
# test sorting
|
|
objects = sql.AnyFormData.select(order_by='receipt_time')
|
|
assert len(objects) == 100
|
|
|
|
objects2 = sql.AnyFormData.select(order_by='-receipt_time')
|
|
assert [(x.formdef_id, x.id) for x in objects2] == list(reversed(
|
|
[(x.formdef_id, x.id) for x in objects]))
|
|
|
|
# test clauses
|
|
objects2 = sql.AnyFormData.select([st.Equal('user_id', '0')])
|
|
assert len(objects2) == len([x for x in objects if x.user_id == '0'])
|
|
|
|
objects2 = sql.AnyFormData.select([st.Equal('is_at_endpoint', True)])
|
|
assert len(objects2) == len([x for x in objects if x.status in ('wf-rejected', 'wf-finished')])
|
|
|
|
# test offset/limit
|
|
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=0)
|
|
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][:10]
|
|
|
|
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=20)
|
|
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][20:30]
|
|
|
|
@postgresql
|
|
def test_actions_roles():
|
|
drop_formdef_tables()
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
|
|
wf = Workflow(name='test endpoint')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '1']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf.store()
|
|
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test actions roles'
|
|
formdef.fields = []
|
|
formdef.workflow = wf
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-st1'
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
formdata = data_class()
|
|
formdata.status = 'wf-st2'
|
|
formdata.store()
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
|
|
assert bool(cur.fetchone()[0] == 2)
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
|
|
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
|
|
assert bool(cur.fetchone()[0] == 1)
|
|
|
|
# check a change to workflow is reflected in the database
|
|
st1.items[0].by = ['2', '3']
|
|
wf.store()
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
|
|
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
|
|
assert bool(cur.fetchone()[0] == 0)
|
|
|
|
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
|
|
WHERE actions_roles_array && ARRAY['2', '3']''')
|
|
assert bool(cur.fetchone()[0] == 1)
|
|
|
|
# using criterias
|
|
criterias = [st.Intersects('actions_roles_array', ['2', '3'])]
|
|
total_count = sql.AnyFormData.count(criterias)
|
|
formdatas = sql.AnyFormData.select(criterias)
|
|
assert total_count == 1
|
|
assert formdatas[0].id == formdata_id
|