wcs/tests/test_sql.py

2114 lines
62 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import print_function
import datetime
import os
import random
import shutil
import string
import sys
import time
from django.core.management import call_command
from quixote import cleanup
from wcs.qommon import force_str
from wcs import publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs.workflows import Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs import sql
import wcs.qommon.storage as st
from utilities import create_temporary_pub, clean_temporary_pub
import pytest
postgresql = pytest.mark.postgresql
try:
import psycopg2
except ImportError:
pass
def setup_module(module):
global pub, formdef
cleanup()
pub = create_temporary_pub(sql_mode=True)
formdef = FormDef()
formdef.name = 'tests'
formdef.fields = [
fields.StringField(id='0', label='string'),
fields.EmailField(id='1', label='email'),
fields.TextField(id='2', label='text'),
fields.BoolField(id='3', label='bool'),
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
fields.DateField(id='5', label='date'),
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
]
formdef.store()
def teardown_module(module):
clean_temporary_pub()
@postgresql
def test_sql_table_name_invalid_chars():
test_formdef = FormDef()
test_formdef.name = 'test-some|char;are better/ignored'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_data_class():
data_class = formdef.data_class(mode='sql')
@postgresql
def test_sql_count():
data_class = formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_store():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
assert formdata.id
@postgresql
def test_sql_get():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.user_id == '5'
assert formdata.status == 'wf-0'
@postgresql
def test_sql_store_channel():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.submission_channel = 'mail'
formdata.store()
assert data_class.get(formdata.id).submission_channel == 'mail'
formdata.submission_channel = None
formdata.store()
assert data_class.get(formdata.id).submission_channel is None
@postgresql
def test_sql_get_missing():
data_class = formdef.data_class(mode='sql')
with pytest.raises(KeyError):
data_class.get(123456)
with pytest.raises(KeyError):
data_class.get('xxx')
@postgresql
def test_sql_get_missing_ignore_errors():
data_class = formdef.data_class(mode='sql')
assert data_class.get(123456, ignore_errors=True) is None
assert data_class.get('xxx', ignore_errors=True) is None
assert data_class.get(None, ignore_errors=True) is None
def check_sql_field(no, value):
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {no: value}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get(no) == value
@postgresql
def test_sql_field_string():
check_sql_field('0', 'hello world')
check_sql_field('0', 'élo world')
check_sql_field('0', None)
@postgresql
def test_sql_field_email():
check_sql_field('1', 'fred@example.com')
@postgresql
def test_sql_field_text():
check_sql_field('2', 'long text')
check_sql_field('2', 'long tèxt')
@postgresql
def test_sql_field_bool():
check_sql_field('3', False)
check_sql_field('3', True)
@postgresql
def test_sql_field_item():
check_sql_field('4', 'apricot')
@postgresql
def test_sql_field_date():
check_sql_field('5', datetime.date.today().timetuple())
@postgresql
def test_sql_field_items():
check_sql_field('6', ['apricot'])
check_sql_field('6', ['apricot', 'pear'])
check_sql_field('6', ['pomme', 'poire', 'pêche'])
@postgresql
def test_sql_geoloc():
test_formdef = FormDef()
test_formdef.name = 'geoloc'
test_formdef.fields = []
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {}
formdata.store() # NULL geolocation
formdata2 = data_class.get(formdata.id)
assert not formdata2.geolocations
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
formdata.geolocations = {}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
@postgresql
def test_sql_multi_geoloc():
test_formdef = FormDef()
test_formdef.name = 'geoloc'
test_formdef.fields = []
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {}
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
test_formdef.geolocations = {'base': 'Plop', '2nd': 'XXX'}
test_formdef.store()
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}, '2nd': {'lat': -34, 'lon': -12}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == {'base': {'lat': 12, 'lon': 21}}
@postgresql
def test_sql_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.data = {'0': 'test2'}
formdata.store()
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test2'
@postgresql
def test_sql_remove():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.remove_self()
with pytest.raises(KeyError):
data_class.get(id)
@postgresql
def test_sql_wipe():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.store()
assert data_class.count() != 0
data_class.wipe()
assert data_class.count() == 0
@postgresql
def test_sql_evolution():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
@postgresql
def test_sql_evolution_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
formdata.evolution[-1].comment = 'foobar'
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'foobar'
@postgresql
def test_sql_multiple_evolutions():
data_class = formdef.data_class(mode='sql')
for i in range(20):
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world %d' % i
formdata.evolution.append(evo)
formdata.store()
values = data_class.select()
data_class.load_all_evolutions(values)
assert [x._evolution for x in values]
@postgresql
def test_sql_get_ids_with_indexed_value():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('user_id', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_sql_get_ids_from_query():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.data = {'2': 'this is some reasonably long text'}
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.data = {'2': 'hello world is still a classical example'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.data = {'2': 'you would think other ideas of text would emerge'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_from_query('text')
assert set(ids) == set([id1, id3])
ids = data_class.get_ids_from_query('classical')
assert set(ids) == set([id2])
@postgresql
def test_sql_rollback_on_error():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
with pytest.raises(psycopg2.Error):
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
# column in the table.
data_class.get_ids_with_indexed_value('FOOBAR', '2')
data_class.wipe()
@postgresql
def test_sql_get_ids_with_indexed_value_dict():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_create_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
@postgresql
def test_get_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
assert sql.SqlUser.get(user.id) is not None
@postgresql
def test_get_missing_user():
sql.SqlUser.wipe()
with pytest.raises(KeyError):
sql.SqlUser.get(12345)
@postgresql
def test_get_missing_user_ignore_errors():
sql.SqlUser.wipe()
assert sql.SqlUser.get(12345, ignore_errors=True) is None
@postgresql
def test_user_formdef():
sql.SqlUser.wipe()
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields = [fields.StringField(id='3', label='test', type='string')]
formdef.store()
user = sql.SqlUser()
user.name = 'Pierre'
user.form_data = {'3': 'Papier'}
user.store()
assert sql.SqlUser.get(user.id, ignore_errors=True).form_data['3'] == 'Papier'
del pub.cfg['users']['formdef']
pub.write_cfg()
@postgresql
def test_get_users_with_role():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.roles = [1]
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_role(1)) == 1
@postgresql
def test_get_users_with_name_identifier():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
@postgresql
def test_get_users_fts():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre')[0]).id == user_id
@postgresql
def test_get_users_formdef_fts():
sql.SqlUser.wipe()
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields = [fields.StringField(id='3', label='test', type='string')]
formdef.store()
user = sql.SqlUser()
user.name = 'Pierre'
user.form_data = {'3': 'Papier'}
user.store()
user_id = user.id
assert len(sql.SqlUser.get_ids_from_query('pierre papier')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre papier')[0]).id == user_id
assert len(sql.SqlUser.get_ids_from_query('papier pierre')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('papier pierre')[0]).id == user_id
del pub.cfg['users']['formdef']
pub.write_cfg()
@postgresql
def test_urlname_change():
global formef
data_class = formdef.data_class(mode='sql')
data_class.wipe()
assert formdef.url_name == 'tests'
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
formdef.name = 'tests'
formdef.store()
assert formdef.url_name == 'tests'
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
assert data_class.count() == 1
@postgresql
def test_sql_table_add_and_remove_fields():
test_formdef = FormDef()
test_formdef.name = 'tests and and remove fields'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
test_formdef.fields = [
fields.StringField(label='string'),
fields.EmailField(label='email'),
]
for field in test_formdef.fields:
if field.id is None:
field.id = test_formdef.get_new_field_id()
test_formdef.store()
test_formdef.fields.append(fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
previous_id = test_formdef.fields[-1].id
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields.append(fields.StringField(label='item'))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
assert test_formdef.fields[-1].id != previous_id
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.fields.append(fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
@postgresql
def test_sql_table_wipe_and_drop():
test_formdef = FormDef()
test_formdef.name = 'tests wipe and drop'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
conn, cur = sql.get_connection_and_cursor()
assert table_exists(cur, test_formdef.table_name)
conn.commit()
cur.close()
data_class.wipe(drop=True)
conn, cur = sql.get_connection_and_cursor()
assert not table_exists(cur, test_formdef.table_name)
assert not table_exists(cur, test_formdef.table_name + '_evolutions')
conn.commit()
cur.close()
test_formdef.store()
conn, cur = sql.get_connection_and_cursor()
assert table_exists(cur, test_formdef.table_name)
@postgresql
def test_sql_indexes():
test_formdef = FormDef()
test_formdef.name = 'tests indexes'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
conn, cur = sql.get_connection_and_cursor()
assert index_exists(cur, test_formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
data_class.wipe(drop=True)
conn, cur = sql.get_connection_and_cursor()
assert not index_exists(cur, test_formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
@postgresql
def test_sql_table_select():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.id < 26)) == 25
assert len(data_class.select([st.Less('id', 26)])) == 25
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
assert (
len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
)
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
assert len(data_class.select([st.Contains('id', [])])) == 0
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
assert len(data_class.select([st.NotContains('id', [])])) == 50
@postgresql
def test_sql_table_select_iterator():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
with pytest.raises(TypeError):
assert len(data_class.select(iterator=True)) == 50
# TypeError: object of type 'generator' has no len()
assert len(list(data_class.select(iterator=True))) == 50
assert len(list(data_class.select(lambda x: True, iterator=True))) == 50
assert len(list(data_class.select(lambda x: x.id < 26, iterator=True))) == 25
assert len(list(data_class.select([st.Less('id', 26)], iterator=True))) == 25
assert len(list(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)], iterator=True))) == 15
assert (
len(
list(
data_class.select(
[st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15], iterator=True
)
)
)
== 10
)
@postgresql
def test_sql_table_select_datetime():
test_formdef = FormDef()
test_formdef.name = 'table select datetime'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
d = datetime.datetime(2014, 1, 1)
for i in range(50):
t = data_class()
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert (
len(data_class.select([st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
)
assert (
len(data_class.select([st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())]))
== 29
)
@postgresql
def test_select_limit_offset():
test_formdef = FormDef()
test_formdef.name = 'table select limit offset'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert len(data_class.select()) == 50
for iterator in (False, True):
for func_clause in (lambda x: True, None):
assert [x.id for x in data_class.select(func_clause, order_by='id', iterator=iterator)] == list(
range(1, 51)
)
assert [
x.id for x in data_class.select(func_clause, order_by='id', limit=10, iterator=iterator)
] == list(range(1, 11))
assert [
x.id
for x in data_class.select(func_clause, order_by='id', limit=10, offset=10, iterator=iterator)
] == list(range(11, 21))
assert [
x.id
for x in data_class.select(func_clause, order_by='id', limit=20, offset=20, iterator=iterator)
] == list(range(21, 41))
assert [
x.id for x in data_class.select(func_clause, order_by='id', offset=10, iterator=iterator)
] == list(range(11, 51))
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10, iterator=iterator)]) == 10
@postgresql
def test_select_criteria_intersects():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot', 'pear']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': []}
formdata.store()
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
@postgresql
def test_count():
test_formdef = FormDef()
test_formdef.name = 'table select count'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert data_class.count([st.Less('id', 26)]) == 25
@postgresql
def test_select_criteria_or_and():
test_formdef = FormDef()
test_formdef.name = 'table select criteria or and'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert [int(x.id) for x in data_class.select([st.Or([])], order_by='id')] == []
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == list(
range(1, 10)
)
assert [
x.id for x in data_class.select([st.Or([st.Less('id', 10), st.Equal('id', 15)])], order_by='id')
] == list(range(1, 10)) + [15]
assert [
x.id for x in data_class.select([st.And([st.Less('id', 10), st.Greater('id', 5)])], order_by='id')
] == list(range(6, 10))
@postgresql
def test_select_criteria_null():
test_formdef = FormDef()
test_formdef.name = 'table select criteria null'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for x in range(50):
t = data_class()
if x % 3:
t.submission_channel = None
else:
t.submission_channel = 'mail'
t.store()
assert len(data_class.select([st.Null('submission_channel')])) == 33
assert len(data_class.select([st.NotNull('submission_channel')])) == 17
@postgresql
def test_sql_table_select_bool():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.data = {'3': False}
t.store()
t.data = {'3': True}
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select([st.Equal('f3', True)])) == 1
assert len(data_class.select([st.Equal('f3', False)])) == 49
@postgresql
def test_sql_criteria_ilike():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.store()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == list(range(21, 51))
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == list(range(21, 51))
@postgresql
def test_sql_criteria_fts():
test_formdef = FormDef()
test_formdef.name = 'table select fts'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.just_created()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert set(data_class.get_ids_from_query('BAR')) == set(range(21, 51))
assert [x.id for x in data_class.select([st.FtsMatch('BAR')], order_by='id')] == list(range(21, 51))
# check fts against data in history
assert len(data_class.select([st.FtsMatch('XXX')])) == 0
formdata1 = data_class.select([st.FtsMatch('BAR')])[0]
formdata1.evolution[0].comment = 'XXX'
formdata1.store()
assert len(data_class.select([st.FtsMatch('XXX')])) == 1
assert data_class.select([st.FtsMatch('XXX')])[0].id == formdata1.id
assert len(data_class.select([st.FtsMatch('yyy')])) == 0
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<span>ÿÿÿ</span>'
item.perform(formdata1)
assert formdata1.evolution[-1].display_parts()[-1] == '<span>ÿÿÿ</span>'
formdata1.store()
assert len(data_class.select([st.FtsMatch('yyy')])) == 1
assert len(data_class.select([st.FtsMatch('span')])) == 0
assert data_class.count([st.FtsMatch('Pierre')]) == 0
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
t.user_id = user.id
t.store()
assert data_class.count([st.FtsMatch('Pierre')]) == 1
# check unaccent
user = sql.SqlUser()
user.name = force_str(u'Frédéric')
user.store()
t.user_id = user.id
t.store()
assert data_class.count([st.FtsMatch(user.name)]) == 1
assert data_class.count([st.FtsMatch('Frederic')]) == 1
# check looking up a display id
assert len(data_class.get_ids_from_query(formdata1.id_display)) == 1
assert len(data_class.select([st.FtsMatch(formdata1.id_display)])) == 1
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
def table_exists(cur, table_name):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = %s''',
(table_name,),
)
return bool(cur.fetchone()[0] == 1)
def column_exists_in_table(cur, table_name, column_name):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = %s
AND column_name = %s''',
(table_name, column_name),
)
return bool(cur.fetchone()[0] == 1)
def index_exists(cur, index_name):
cur.execute(
'''SELECT COUNT(*) FROM pg_indexes
WHERE schemaname = 'public'
AND indexname = %s''',
(index_name,),
)
return bool(cur.fetchone()[0] == 1)
@postgresql
def test_sql_level():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
assert sql.get_sql_level(conn, cur) == 0
sql.migrate()
assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL[0]
# insert negative SQL level, to trigger an error, and check it's not
# changed.
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(-1), 'sql_level'))
assert sql.get_sql_level(conn, cur) == -1
with pytest.raises(RuntimeError):
sql.migrate()
assert sql.get_sql_level(conn, cur) == -1
conn.commit()
cur.close()
def migration_level(cur):
cur.execute('SELECT value FROM wcs_meta WHERE key = %s', ('sql_level',))
row = cur.fetchone()
return int(row[0])
@postgresql
def test_migration_1_tracking_code():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
cur.execute('DROP TABLE tracking_codes')
sql.migrate()
assert table_exists(cur, 'tracking_codes')
assert table_exists(cur, 'wcs_meta')
assert migration_level(cur) >= 1
conn.commit()
cur.close()
@postgresql
def test_migration_2_formdef_id_in_views():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN f4_display')
sql.redo_views(conn, cur, formdef, rebuild_global_views=False)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert table_exists(cur, 'wcs_view_1_tests')
assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
view_names = []
cur.execute(
'''SELECT table_name FROM information_schema.views
WHERE table_name LIKE %s''',
('wcs\_view\_%',),
)
while True:
row = cur.fetchone()
if row is None:
break
view_names.append(row[0])
fake_formdef = FormDef()
common_fields = sql.get_view_fields(fake_formdef)
# remove formdef_id for the purpose of this test
common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
union = ' UNION '.join(
['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
)
assert not 'formdef_id' in union
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
sql.migrate()
assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
assert migration_level(cur) >= 2
conn.commit()
cur.close()
@postgresql
def test_migration_6_actions_roles():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN actions_roles_array')
sql.drop_views(formdef, conn, cur)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert not column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_all_forms', 'actions_roles_array')
assert migration_level(cur) >= 6
conn.commit()
cur.close()
@postgresql
def test_migration_10_submission_channel():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_channel')
sql.drop_views(formdef, conn, cur)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert not column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'submission_channel')
assert column_exists_in_table(cur, 'wcs_all_forms', 'submission_channel')
assert migration_level(cur) >= 10
conn.commit()
cur.close()
@postgresql
def test_migration_12_users_fts():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',))
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
# remove the fts column
cur.execute('ALTER TABLE users DROP COLUMN fts')
assert not column_exists_in_table(cur, 'users', 'fts')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'fts')
assert migration_level(cur) >= 12
# no fts, migration only prepare re-index
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 0
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True
sql.reindex()
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False
# make sure the fts is filled after the migration
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1
conn.commit()
cur.close()
@postgresql
def test_migration_21_users_ascii_name():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',))
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Jean Sénisme'
user.store()
# remove the ascii_name column
cur.execute('ALTER TABLE users DROP COLUMN ascii_name')
assert not column_exists_in_table(cur, 'users', 'ascii_name')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'ascii_name')
assert migration_level(cur) >= 21
# no fts, migration only prepare re-index
assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 0
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True
sql.reindex()
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False
# make sure the ascii_name is filled after the migration
assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 1
conn.commit()
cur.close()
@postgresql
def test_migration_24_evolution_index():
formdef = FormDef()
formdef.name = 'tests migration 24'
formdef.fields = []
formdef.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP INDEX %s_evolutions_fid' % formdef.table_name)
cur.execute('UPDATE wcs_meta SET value = 23 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
assert not index_exists(cur, formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
sql.migrate()
conn, cur = sql.get_connection_and_cursor()
assert index_exists(cur, formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
@postgresql
def test_migration_38_user_deleted():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Jean Sénisme'
user.store()
assert sql.SqlUser.count() == 1
conn, cur = sql.get_connection_and_cursor()
cur.execute('ALTER TABLE users DROP COLUMN deleted_timestamp')
assert not column_exists_in_table(cur, 'users', 'deleted_timestamp')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'ascii_name')
assert migration_level(cur) >= 38
assert sql.SqlUser.count() == 1
assert not sql.SqlUser.get(id=user.id).deleted_timestamp
def drop_formdef_tables():
conn, cur = sql.get_connection_and_cursor()
cur.execute('''SELECT table_name FROM information_schema.tables''')
table_names = []
while True:
row = cur.fetchone()
if row is None:
break
table_names.append(row[0])
for table_name in table_names:
if table_name.startswith('formdata_'):
cur.execute('DROP TABLE %s CASCADE' % table_name)
@postgresql
def test_is_at_endpoint():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test endpoint'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.forced_endpoint = True
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2']
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 2)
@postgresql
def test_views_fts():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef = FormDef()
formdef.name = 'test fts'
formdef.fields = [
fields.StringField(id='0', label='string'),
]
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.data = {'0': 'foo bar'}
formdata1.store()
formdata2 = data_class()
formdata2.data = {'0': 'foo'}
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('foo',))
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('bar',))
assert bool(cur.fetchone()[0] == 1)
@postgresql
def test_select_any_formdata():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
cnt = 0
for i in range(5):
formdef = FormDef()
formdef.name = 'test any %d' % i
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
for j in range(20):
formdata = data_class()
formdata.just_created()
formdata.user_id = '%s' % ((i + j) % 11)
# set receipt_time to make sure all entries are unique.
formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple()
formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i + j) % 4]
if j < 5:
formdata.submission_channel = 'mail'
formdata.store()
cnt += 1
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 100
# make sure valid formdefs are used
assert len([x for x in objects if x.formdef.name == 'test any 0']) == 20
assert len([x for x in objects if x.formdef.name == 'test any 1']) == 20
# test sorting
objects = sql.AnyFormData.select(order_by='receipt_time')
assert len(objects) == 100
objects2 = sql.AnyFormData.select(order_by='-receipt_time')
assert [(x.formdef_id, x.id) for x in objects2] == list(reversed([(x.formdef_id, x.id) for x in objects]))
# test clauses
objects2 = sql.AnyFormData.select([st.Equal('user_id', '0')])
assert len(objects2) == len([x for x in objects if x.user_id == '0'])
objects2 = sql.AnyFormData.select([st.Equal('is_at_endpoint', True)])
assert len(objects2) == len([x for x in objects if x.status in ('wf-rejected', 'wf-finished')])
objects2 = sql.AnyFormData.select([st.Equal('submission_channel', 'mail')])
assert len(objects2) == len([x for x in objects if x.submission_channel == 'mail'])
assert objects2[0].submission_channel == 'mail'
# test offset/limit
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=0)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][:10]
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=20)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][20:30]
@postgresql
def test_load_all_evolutions_on_any_formdata():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
cnt = 0
for i in range(5):
formdef = FormDef()
formdef.name = 'test any %d' % i
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
for j in range(20):
formdata = data_class()
formdata.just_created()
formdata.user_id = '%s' % ((i + j) % 11)
# set receipt_time to make sure all entries are unique.
formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple()
formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i + j) % 4]
formdata.store()
cnt += 1
objects = sql.AnyFormData.select()
assert len(objects) == 100
assert len([x for x in objects if x._evolution is None]) == 100
sql.AnyFormData.load_all_evolutions(objects)
assert len([x for x in objects if x._evolution is not None]) == 100
@postgresql
def test_geoloc_in_global_view():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
formdef = FormDef()
formdef.name = 'test no geoloc'
formdef.fields = []
formdef.store()
formdef2 = FormDef()
formdef2.name = 'test with geoloc'
formdef2.fields = []
formdef2.geolocations = {'base': 'Plop'}
formdef2.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
data_class = formdef2.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 2
# test clauses
objects2 = sql.AnyFormData.select([st.Null('geoloc_base_x')])
assert len(objects2) == 1
assert not objects2[0].geolocations
objects2 = sql.AnyFormData.select([st.NotNull('geoloc_base_x')])
assert len(objects2) == 1
assert int(objects2[0].geolocations['base']['lat']) == formdata.geolocations['base']['lat']
assert int(objects2[0].geolocations['base']['lon']) == formdata.geolocations['base']['lon']
@postgresql
def test_actions_roles():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '1']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test actions roles'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata_id = formdata.id
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute(
'''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']'''
)
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.items[0].by = ['2', '3']
wf.store()
cur.execute(
'''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']'''
)
assert bool(cur.fetchone()[0] == 0)
cur.execute(
'''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['2', '3']'''
)
assert bool(cur.fetchone()[0] == 1)
# using criterias
criterias = [st.Intersects('actions_roles_array', ['2', '3'])]
total_count = sql.AnyFormData.count(criterias)
formdatas = sql.AnyFormData.select(criterias)
assert total_count == 1
assert formdatas[0].id == formdata_id
@postgresql
def test_last_update_time():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test last update time')
st1 = wf.add_status('Status1', 'st1')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
formdef = FormDef()
formdef.name = 'test last update time'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.status = 'wf-st1'
formdata1.just_created()
formdata1.evolution[0].comment = 'comment'
formdata1.jump_status('st1') # will add another evolution entry
formdata1.evolution[0].time = datetime.datetime(2015, 1, 1, 0, 0, 0).timetuple()
formdata1.evolution[1].time = datetime.datetime(2015, 1, 2, 0, 0, 0).timetuple()
formdata1.store()
formdata2 = data_class()
formdata2.status = 'wf-st1'
formdata2.just_created()
formdata2.evolution[0].comment = 'comment'
formdata2.jump_status('st1') # will add another evolution entry
formdata2.evolution[0].time = datetime.datetime(2015, 1, 3, 0, 0, 0).timetuple()
formdata2.evolution[1].time = datetime.datetime(2015, 1, 4, 0, 0, 0).timetuple()
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-02 00:00' ''')
assert bool(cur.fetchone()[0] == formdata1.id)
cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-04 00:00' ''')
assert bool(cur.fetchone()[0] == formdata2.id)
@postgresql
def test_view_formdef_name():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef1 = FormDef()
formdef1.name = 'test formdef name 1'
formdef1.fields = []
formdef1.store()
data_class = formdef1.data_class()
formdata1 = data_class()
formdata1.just_created()
formdata1.store()
formdef2 = FormDef()
formdef2.name = 'test formdef name 2'
formdef2.fields = []
formdef2.store()
data_class = formdef2.data_class()
formdata2 = data_class()
formdata2.just_created()
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 1' ''')
assert bool(str(cur.fetchone()[0]) == str(formdef1.id))
cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 2' ''')
assert bool(str(cur.fetchone()[0]) == str(formdef2.id))
@postgresql
def test_view_user_name():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef1 = FormDef()
formdef1.name = 'test user name'
formdef1.fields = []
formdef1.store()
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Foobar'
user.store()
data_class = formdef1.data_class()
formdata1 = data_class()
formdata1.just_created()
formdata1.store()
data_class = formdef1.data_class()
formdata2 = data_class()
formdata2.user_id = user.id
formdata2.just_created()
formdata2.store()
cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata1.id,))
assert bool(cur.fetchone()[0] is None)
cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata2.id,))
assert bool(cur.fetchone()[0] == user.name)
@postgresql
def test_select_formdata_after_formdef_removal():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
for i in range(2):
formdef = FormDef()
formdef.name = 'test formdef removal'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 2
formdef.remove_self()
objects = sql.AnyFormData.select()
assert len(objects) == 1
@postgresql
def test_views_submission_info():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef = FormDef()
formdef.name = 'test backoffice submission'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.submission_channel = 'mail'
formdata1.backoffice_submission = True
formdata1.store()
formdata2 = data_class()
formdata2.backoffice_submission = False
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS TRUE''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS FALSE''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE submission_channel = %s''', ('mail',))
assert bool(cur.fetchone()[0] == 1)
@postgresql
def test_get_formdef_new_id():
test1_formdef = FormDef()
test1_formdef.name = 'new formdef'
test1_formdef.fields = []
test1_formdef.store()
test1_id = test1_formdef.id
test1_table_name = test1_formdef.table_name
test1_formdef.remove_self()
test2_formdef = FormDef()
test2_formdef.name = 'new formdef'
test2_formdef.fields = []
test2_formdef.store()
assert test1_id != test2_formdef.id
assert test1_table_name != test2_formdef.table_name
@postgresql
def test_criticality_levels():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
workflow1 = Workflow(name='criticality1')
workflow1.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
WorkflowCriticalityLevel(name='redder'),
WorkflowCriticalityLevel(name='reddest'),
]
workflow1.store()
workflow2 = Workflow(name='criticality2')
workflow2.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='reddest'),
]
workflow2.store()
formdef1 = FormDef()
formdef1.name = 'test criticality levels 1'
formdef1.fields = []
formdef1.workflow_id = workflow1.id
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test criticality levels 2'
formdef2.fields = []
formdef2.workflow_id = workflow2.id
formdef2.store()
data_class = formdef1.data_class(mode='sql')
for i in range(5):
formdata = data_class()
formdata.set_criticality_level(i)
formdata.store()
data_class = formdef2.data_class(mode='sql')
for i in range(2):
formdata = data_class()
formdata.set_criticality_level(i)
formdata.store()
objects = sql.AnyFormData.select(order_by='-criticality_level')
# make sure first two formdata are the highest priority ones, and the last
# two formdata are the lowest priority ones.
assert objects[0].get_criticality_level_object().name == 'reddest'
assert objects[1].get_criticality_level_object().name == 'reddest'
assert objects[-1].get_criticality_level_object().name == 'green'
assert objects[-2].get_criticality_level_object().name == 'green'
@postgresql
def test_view_performances():
pytest.skip('takes too much time')
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
nb_users = 1000
nb_roles = 10
nb_workflows = 5
nb_formdefs = 10
nb_fields = 10
nb_formdatas = 1000
nb_users = 10
nb_formdatas = 10000
random.seed('foobar')
# create users
sql.SqlUser.wipe()
users = []
for i in range(nb_users):
user = sql.SqlUser()
user.name = 'user %s' % i
user.store()
users.append(user)
# create roles
roles = []
for i in range(nb_roles):
role = Role(name='role%s' % i)
role.store()
roles.append(role)
# create workflows
workflows = []
for i in range(nb_workflows):
workflow = Workflow(name='test perf wf %s' % i)
for j in range(5):
status = workflow.add_status('Status %d' % j, 'st%s' % j)
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable%s' % j
commentable.by = [random.choice(roles).id, random.choice(roles).id]
status.items.append(commentable)
commentable.parent = status
if j != 4:
jump = JumpWorkflowStatusItem()
jump.id = '_jump%s' % j
jump.by = []
jump.timeout = 5
jump.status = 'st%s' % (j + 1)
status.items.append(jump)
jump.parent = status
workflow.store()
workflows.append(workflow)
# create formdefs
formdefs = []
for i in range(nb_formdefs):
formdef = FormDef()
formdef.name = 'test performance view %s' % i
formdef.fields = []
for j in range(nb_fields):
formdef.fields.append(fields.StringField(id=str(j + 1), label='string'))
formdef.workflow_id = workflows[i % 5].id
formdef.store()
formdefs.append(formdef)
print('create formdatas')
# create formdatas
for i in range(nb_formdatas):
data_class = random.choice(formdefs).data_class()
formdata = data_class()
formdata.data = {}
for j in range(10):
formdata.data[str(j + 1)] = ''.join(
[random.choice(string.letters) for x in range(random.randint(10, 30))]
)
formdata.user_id = random.choice(users).id
formdata.status = 'wf-st1'
formdata.just_created()
for j in range(5):
formdata.jump_status('st%s' % (j + 2))
if random.random() < 0.5:
break
print('done')
t0 = time.time()
user_roles = [random.choice(roles).id, random.choice(roles).id]
criterias = []
criterias.append(st.NotEqual('status', 'draft'))
criterias.append(st.Equal('is_at_endpoint', False))
criterias.append(st.Intersects('actions_roles_array', user_roles))
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time', limit=20, offset=0)
print(time.time() - t0)
assert (time.time() - t0) < 0.5
@postgresql
def test_migration_30_anonymize_evo_who():
formdef = FormDef()
formdef.name = 'tests migration 24'
formdef.fields = []
formdef.store()
user = sql.SqlUser()
user.name = 'JohnDoe'
user.store()
klass = formdef.data_class()
formdata = klass()
formdata.evolution = []
formdata.anonymised = datetime.datetime.now()
evo = Evolution(formdata)
evo.who = user.id
evo.time = time.localtime()
formdata.evolution.append(evo)
formdata.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 29 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name)
assert cur.fetchone() == (0,)
cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',))
assert cur.fetchone() == (0,)
conn.commit()
cur.close()
sql.migrate()
conn, cur = sql.get_connection_and_cursor()
cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name)
assert cur.fetchone() == (1,)
cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',))
assert cur.fetchone() == (1,)
conn.commit()
cur.close()
@postgresql
def test_migration_31_user_label():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
sql.drop_views(formdef, conn, cur)
assert not column_exists_in_table(cur, 'formdata_1_tests', 'user_label')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'user_label')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'user_label')
assert column_exists_in_table(cur, 'wcs_all_forms', 'user_label')
assert migration_level(cur) >= 31
conn.commit()
cur.close()
@postgresql
def test_migration_38_submission_agent_id():
for formdef in FormDef.select():
formdef.data_class().wipe()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {}
formdata.status = 'wf-0'
formdata.submission_context = {'agent_id': 12}
formdata.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_agent_id')
sql.drop_views(formdef, conn, cur)
assert not column_exists_in_table(cur, 'formdata_1_tests', 'submission_agent_id')
sql.migrate()
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True
assert column_exists_in_table(cur, 'formdata_1_tests', 'submission_agent_id')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'submission_agent_id')
assert column_exists_in_table(cur, 'wcs_all_forms', 'submission_agent_id')
assert migration_level(cur) >= 38
sql.reindex()
cur.execute('''SELECT submission_agent_id FROM wcs_all_forms WHERE id = %s ''', (formdata.id,))
assert cur.fetchone()[0] == '12'
conn.commit()
cur.close()
@postgresql
def test_migration_40_user_is_active():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 39 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Jean Sénisme'
user.deleted_timestamp = datetime.datetime.now()
user.store()
user2 = sql.SqlUser()
user2.name = 'Jean II'
user2.store()
assert sql.SqlUser.count() == 2
conn, cur = sql.get_connection_and_cursor()
cur.execute('ALTER TABLE users DROP COLUMN is_active')
assert not column_exists_in_table(cur, 'users', 'is_active')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'is_active')
assert migration_level(cur) >= 40
assert sql.SqlUser.count() == 2
assert sql.SqlUser.get(id=user.id).is_active is False
assert sql.SqlUser.get(id=user2.id).is_active is True