670 lines
18 KiB
Python
670 lines
18 KiB
Python
import datetime
|
|
import os
|
|
import random
|
|
import shutil
|
|
import sys
|
|
import time
|
|
|
|
from quixote import cleanup
|
|
|
|
from wcs import formdef, publisher, fields
|
|
from wcs.formdef import FormDef
|
|
from wcs.formdata import Evolution
|
|
from wcs import sql
|
|
import wcs.qommon.storage as st
|
|
|
|
from utilities import create_temporary_pub
|
|
|
|
import pytest
|
|
postgresql = pytest.mark.postgresql
|
|
|
|
try:
|
|
import psycopg2
|
|
except ImportError:
|
|
pass
|
|
|
|
def setup_module(module):
|
|
global pub, formdef
|
|
|
|
cleanup()
|
|
|
|
pub = create_temporary_pub()
|
|
pub.has_site_option = lambda x: True
|
|
|
|
conn = psycopg2.connect(user=os.environ['USER'])
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = conn.cursor()
|
|
dbname = 'wcstests%d' % random.randint(0, 100000)
|
|
cur.execute('CREATE DATABASE %s' % dbname)
|
|
cur.close()
|
|
|
|
pub.cfg['postgresql'] = {'database': dbname, 'user': os.environ['USER']}
|
|
|
|
formdef = formdef.FormDef()
|
|
formdef.name = 'tests'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='string'),
|
|
fields.EmailField(id='1', label='email'),
|
|
fields.TextField(id='2', label='text'),
|
|
fields.BoolField(id='3', label='bool'),
|
|
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
|
|
fields.DateField(id='5', label='date'),
|
|
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
|
|
]
|
|
formdef.store()
|
|
|
|
sql.do_user_table()
|
|
|
|
conn.close()
|
|
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
if hasattr(pub, 'pgconn') and pub.pgconn:
|
|
pub.pgconn.close()
|
|
|
|
conn = psycopg2.connect(user=os.environ['USER'])
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = conn.cursor()
|
|
cur.execute('DROP DATABASE %s' % pub.cfg['postgresql']['database'])
|
|
cur.close()
|
|
|
|
@postgresql
|
|
def test_sql_table_name_invalid_chars():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'test-some|char;are better/ignored'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
assert test_formdef.table_name is not None
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_data_class():
|
|
data_class = formdef.data_class(mode='sql')
|
|
|
|
@postgresql
|
|
def test_sql_count():
|
|
data_class = formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_store():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
assert formdata.id
|
|
|
|
@postgresql
|
|
def test_sql_get():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.user_id == '5'
|
|
|
|
@postgresql
|
|
def test_sql_get_missing():
|
|
data_class = formdef.data_class(mode='sql')
|
|
with pytest.raises(KeyError):
|
|
data_class.get(123456)
|
|
|
|
@postgresql
|
|
def test_sql_get_missing_ignore_errors():
|
|
data_class = formdef.data_class(mode='sql')
|
|
assert data_class.get(123456, ignore_errors=True) is None
|
|
|
|
def check_sql_field(no, value):
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {no: value}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get(no) == value
|
|
|
|
@postgresql
|
|
def test_sql_field_string():
|
|
check_sql_field('0', 'hello world')
|
|
|
|
@postgresql
|
|
def test_sql_field_email():
|
|
check_sql_field('1', 'fred@example.com')
|
|
|
|
@postgresql
|
|
def test_sql_field_text():
|
|
check_sql_field('2', 'long text')
|
|
|
|
@postgresql
|
|
def test_sql_field_bool():
|
|
check_sql_field('3', False)
|
|
check_sql_field('3', True)
|
|
|
|
@postgresql
|
|
def test_sql_field_item():
|
|
check_sql_field('4', 'apricot')
|
|
|
|
@postgresql
|
|
def test_sql_field_date():
|
|
check_sql_field('5', datetime.date.today().timetuple())
|
|
|
|
@postgresql
|
|
def test_sql_field_items():
|
|
check_sql_field('6', ['apricot'])
|
|
check_sql_field('6', ['apricot', 'pear'])
|
|
|
|
@postgresql
|
|
def test_sql_change():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {'0': 'test'}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test'
|
|
|
|
formdata.data = {'0': 'test2'}
|
|
formdata.store()
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test2'
|
|
|
|
@postgresql
|
|
def test_sql_remove():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.data = {'0': 'test'}
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert formdata.data.get('0') == 'test'
|
|
|
|
formdata.remove_self()
|
|
with pytest.raises(KeyError):
|
|
data_class.get(id)
|
|
|
|
@postgresql
|
|
def test_sql_wipe():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.store()
|
|
|
|
assert data_class.count() != 0
|
|
data_class.wipe()
|
|
assert data_class.count() == 0
|
|
|
|
@postgresql
|
|
def test_sql_evolution():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 1
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world'
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'hello world'
|
|
|
|
@postgresql
|
|
def test_sql_evolution_change():
|
|
data_class = formdef.data_class(mode='sql')
|
|
formdata = data_class()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
id = formdata.id
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 1
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world'
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'hello world'
|
|
|
|
formdata.evolution[-1].comment = 'foobar'
|
|
formdata.store()
|
|
|
|
formdata = data_class.get(id)
|
|
assert len(formdata.evolution) == 2
|
|
assert formdata.evolution[-1].comment == 'foobar'
|
|
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_with_indexed_value():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.user_id = '2'
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.user_id = '2'
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_with_indexed_value('user_id', '2')
|
|
assert set(ids) == set([id2, id3])
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_from_query():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'this is some reasonably long text'}
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'hello world is still a classical example'}
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.data = {'2': 'you would think other ideas of text would emerge'}
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_from_query('text')
|
|
assert set(ids) == set([id1, id3])
|
|
|
|
ids = data_class.get_ids_from_query('classical')
|
|
assert set(ids) == set([id2])
|
|
|
|
|
|
@postgresql
|
|
def test_sql_rollback_on_error():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
with pytest.raises(psycopg2.Error):
|
|
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
|
|
# column in the table.
|
|
data_class.get_ids_with_indexed_value('FOOBAR', '2')
|
|
data_class.wipe()
|
|
|
|
|
|
@postgresql
|
|
def test_sql_get_ids_with_indexed_value_dict():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
|
|
formdata = data_class()
|
|
formdata.store()
|
|
id1 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.workflow_roles = {'plop': '2'}
|
|
formdata.store()
|
|
id2 = formdata.id
|
|
|
|
formdata = data_class()
|
|
formdata.workflow_roles = {'plop': '2'}
|
|
formdata.store()
|
|
id3 = formdata.id
|
|
|
|
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
|
|
assert set(ids) == set([id2, id3])
|
|
|
|
@postgresql
|
|
def test_create_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.store()
|
|
|
|
@postgresql
|
|
def test_get_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.store()
|
|
|
|
assert sql.SqlUser.get(user.id) is not None
|
|
|
|
@postgresql
|
|
def test_get_missing_user():
|
|
sql.SqlUser.wipe()
|
|
|
|
with pytest.raises(KeyError):
|
|
sql.SqlUser.get(12345)
|
|
|
|
@postgresql
|
|
def test_get_missing_user_ignore_errors():
|
|
sql.SqlUser.wipe()
|
|
|
|
assert sql.SqlUser.get(12345, ignore_errors=True) is None
|
|
|
|
@postgresql
|
|
def test_get_users_with_role():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.roles = [1]
|
|
user.store()
|
|
user_id = user.id
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Papier'
|
|
user.store()
|
|
|
|
assert len(sql.SqlUser.get_users_with_role(1)) == 1
|
|
|
|
@postgresql
|
|
def test_get_users_with_name_identifier():
|
|
sql.SqlUser.wipe()
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Pierre'
|
|
user.name_identifiers = ['foo']
|
|
user.store()
|
|
user_id = user.id
|
|
|
|
user = sql.SqlUser()
|
|
user.name = 'Papier'
|
|
user.store()
|
|
|
|
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
|
|
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
|
|
|
|
|
|
@postgresql
|
|
def test_urlname_change():
|
|
global formef
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
formdef.name = 'tests2'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests2'
|
|
|
|
formdef.name = 'tests'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
data_class = formdef.data_class(mode='sql')
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.store()
|
|
|
|
formdef.name = 'tests2'
|
|
formdef.store()
|
|
assert formdef.url_name == 'tests'
|
|
|
|
assert data_class.count() == 1
|
|
|
|
@postgresql
|
|
def test_sql_table_add_and_remove_fields():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'tests and and remove fields'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
assert test_formdef.table_name is not None
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
test_formdef.fields = [
|
|
fields.StringField(label='string'),
|
|
fields.EmailField(label='email'),
|
|
]
|
|
|
|
for field in test_formdef.fields:
|
|
if field.id is None:
|
|
field.id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
test_formdef.fields.append(
|
|
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
previous_id = test_formdef.fields[-1].id
|
|
test_formdef.fields = test_formdef.fields[:-1]
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
test_formdef.fields.append(fields.StringField(label='item'))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
assert test_formdef.fields[-1].id != previous_id
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
test_formdef.fields = test_formdef.fields[:-1]
|
|
test_formdef.fields.append(
|
|
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
|
|
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
|
|
test_formdef.store()
|
|
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
data_class.select()
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert len(data_class.select(lambda x: x.id < 26)) == 25
|
|
assert len(data_class.select([st.Less('id', 26)])) == 25
|
|
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
|
|
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
|
|
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
|
|
|
|
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
|
|
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
|
|
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select_datetime():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select datetime'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
d = datetime.datetime(2014, 1, 1)
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
|
|
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
|
|
assert len(data_class.select([
|
|
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
|
|
assert len(data_class.select([
|
|
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
|
|
|
|
|
|
@postgresql
|
|
def test_select_limit_offset():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select limit offset'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert len(data_class.select()) == 50
|
|
assert [x.id for x in data_class.select(order_by='id')] == range(1, 51)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=10)] == range(1, 11)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=10, offset=10)] == range(11, 21)
|
|
assert [x.id for x in data_class.select(order_by='id', limit=20, offset=20)] == range(21, 41)
|
|
assert [x.id for x in data_class.select(order_by='id', offset=10)] == range(11, 51)
|
|
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10)]) == 10
|
|
|
|
|
|
@postgresql
|
|
def test_select_criteria_intersects():
|
|
data_class = formdef.data_class(mode='sql')
|
|
data_class.wipe()
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': ['apricot']}
|
|
formdata.store()
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': ['apricot', 'pear']}
|
|
formdata.store()
|
|
|
|
formdata = data_class()
|
|
formdata.status = 'wf-0'
|
|
formdata.user_id = '5'
|
|
formdata.data = {'6': []}
|
|
formdata.store()
|
|
|
|
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
|
|
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
|
|
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
|
|
|
|
|
|
@postgresql
|
|
def test_count():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select count'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert data_class.count([st.Less('id', 26)]) == 25
|
|
|
|
|
|
@postgresql
|
|
def test_select_criteria_or_and():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select criteria or and'
|
|
test_formdef.fields = []
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.store()
|
|
|
|
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
|
|
assert [x.id for x in data_class.select([st.Or([
|
|
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
|
|
assert [x.id for x in data_class.select([st.And([
|
|
st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == range(6, 10)
|
|
|
|
|
|
@postgresql
|
|
def test_sql_table_select_bool():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select bool'
|
|
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
t.data = {'3': False}
|
|
t.store()
|
|
t.data = {'3': True}
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
assert len(data_class.select([st.Equal('f3', True)])) == 1
|
|
assert len(data_class.select([st.Equal('f3', False)])) == 49
|
|
|
|
|
|
@postgresql
|
|
def test_sql_criteria_ilike():
|
|
test_formdef = FormDef()
|
|
test_formdef.name = 'table select bool'
|
|
test_formdef.fields = [fields.StringField(id='3', label='string')]
|
|
test_formdef.store()
|
|
data_class = test_formdef.data_class(mode='sql')
|
|
assert data_class.count() == 0
|
|
|
|
for i in range(50):
|
|
t = data_class()
|
|
if i < 20:
|
|
t.data = {'3': 'foo'}
|
|
else:
|
|
t.data = {'3': 'bar'}
|
|
t.store()
|
|
t.store()
|
|
|
|
assert data_class.count() == 50
|
|
assert len(data_class.select()) == 50
|
|
|
|
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
|
|
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)
|