wcs/tests/test_sql.py

1858 lines
57 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import print_function
import datetime
import os
import random
import shutil
import string
import sys
import time
from django.core.management import call_command
from quixote import cleanup
from wcs.qommon import force_str
from wcs import formdef, publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs.workflows import Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs import sql
import wcs.qommon.storage as st
from utilities import create_temporary_pub, clean_temporary_pub
import pytest
postgresql = pytest.mark.postgresql
try:
import psycopg2
except ImportError:
pass
def setup_module(module):
global pub, formdef
cleanup()
pub = create_temporary_pub(sql_mode=True)
formdef = formdef.FormDef()
formdef.name = 'tests'
formdef.fields = [
fields.StringField(id='0', label='string'),
fields.EmailField(id='1', label='email'),
fields.TextField(id='2', label='text'),
fields.BoolField(id='3', label='bool'),
fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')),
fields.DateField(id='5', label='date'),
fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')),
]
formdef.store()
def teardown_module(module):
clean_temporary_pub()
@postgresql
def test_sql_table_name_invalid_chars():
test_formdef = FormDef()
test_formdef.name = 'test-some|char;are better/ignored'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_data_class():
data_class = formdef.data_class(mode='sql')
@postgresql
def test_sql_count():
data_class = formdef.data_class(mode='sql')
assert data_class.count() == 0
@postgresql
def test_sql_store():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
assert formdata.id
@postgresql
def test_sql_get():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.user_id == '5'
assert formdata.status == 'wf-0'
@postgresql
def test_sql_store_channel():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.submission_channel = 'mail'
formdata.store()
assert data_class.get(formdata.id).submission_channel == 'mail'
formdata.submission_channel = None
formdata.store()
assert data_class.get(formdata.id).submission_channel is None
@postgresql
def test_sql_get_missing():
data_class = formdef.data_class(mode='sql')
with pytest.raises(KeyError):
data_class.get(123456)
@postgresql
def test_sql_get_missing_ignore_errors():
data_class = formdef.data_class(mode='sql')
assert data_class.get(123456, ignore_errors=True) is None
def check_sql_field(no, value):
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {no: value}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get(no) == value
@postgresql
def test_sql_field_string():
check_sql_field('0', 'hello world')
check_sql_field('0', 'élo world')
check_sql_field('0', None)
@postgresql
def test_sql_field_email():
check_sql_field('1', 'fred@example.com')
@postgresql
def test_sql_field_text():
check_sql_field('2', 'long text')
check_sql_field('2', 'long tèxt')
@postgresql
def test_sql_field_bool():
check_sql_field('3', False)
check_sql_field('3', True)
@postgresql
def test_sql_field_item():
check_sql_field('4', 'apricot')
@postgresql
def test_sql_field_date():
check_sql_field('5', datetime.date.today().timetuple())
@postgresql
def test_sql_field_items():
check_sql_field('6', ['apricot'])
check_sql_field('6', ['apricot', 'pear'])
check_sql_field('6', ['pomme', 'poire', 'pêche'])
@postgresql
def test_sql_geoloc():
test_formdef = FormDef()
test_formdef.name = 'geoloc'
test_formdef.fields = []
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {}
formdata.store() # NULL geolocation
formdata2 = data_class.get(formdata.id)
assert not formdata2.geolocations
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
formdata.geolocations = {}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
@postgresql
def test_sql_multi_geoloc():
test_formdef = FormDef()
test_formdef.name = 'geoloc'
test_formdef.fields = []
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {}
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
test_formdef.geolocations = {'base': 'Plop', '2nd': 'XXX'}
test_formdef.store()
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}, '2nd': {'lat': -34, 'lon': -12}}
formdata.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == formdata.geolocations
test_formdef.geolocations = {'base': 'Plop'}
test_formdef.store()
formdata2 = data_class.get(formdata.id)
assert formdata2.geolocations == {'base': {'lat': 12, 'lon': 21}}
@postgresql
def test_sql_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.data = {'0': 'test2'}
formdata.store()
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test2'
@postgresql
def test_sql_remove():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.data = {'0': 'test'}
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert formdata.data.get('0') == 'test'
formdata.remove_self()
with pytest.raises(KeyError):
data_class.get(id)
@postgresql
def test_sql_wipe():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.store()
assert data_class.count() != 0
data_class.wipe()
assert data_class.count() == 0
@postgresql
def test_sql_evolution():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
@postgresql
def test_sql_evolution_change():
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
assert len(formdata.evolution) == 1
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'hello world'
formdata.evolution[-1].comment = 'foobar'
formdata.store()
formdata = data_class.get(id)
assert len(formdata.evolution) == 2
assert formdata.evolution[-1].comment == 'foobar'
@postgresql
def test_sql_multiple_evolutions():
data_class = formdef.data_class(mode='sql')
for i in range(20):
formdata = data_class()
formdata.just_created()
formdata.store()
id = formdata.id
formdata = data_class.get(id)
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world %d' % i
formdata.evolution.append(evo)
formdata.store()
values = data_class.select()
data_class.load_all_evolutions(values)
assert [x._evolution for x in values]
@postgresql
def test_sql_get_ids_with_indexed_value():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.user_id = '2'
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('user_id', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_sql_get_ids_from_query():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.data = {'2': 'this is some reasonably long text'}
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.data = {'2': 'hello world is still a classical example'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.data = {'2': 'you would think other ideas of text would emerge'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_from_query('text')
assert set(ids) == set([id1, id3])
ids = data_class.get_ids_from_query('classical')
assert set(ids) == set([id2])
@postgresql
def test_sql_rollback_on_error():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
with pytest.raises(psycopg2.Error):
# this will raise a psycopg2.ProgrammingError as there's no FOOBAR
# column in the table.
data_class.get_ids_with_indexed_value('FOOBAR', '2')
data_class.wipe()
@postgresql
def test_sql_get_ids_with_indexed_value_dict():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.store()
id1 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id2 = formdata.id
formdata = data_class()
formdata.workflow_roles = {'plop': '2'}
formdata.store()
id3 = formdata.id
ids = data_class.get_ids_with_indexed_value('workflow_roles', '2')
assert set(ids) == set([id2, id3])
@postgresql
def test_create_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
@postgresql
def test_get_user():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
assert sql.SqlUser.get(user.id) is not None
@postgresql
def test_get_missing_user():
sql.SqlUser.wipe()
with pytest.raises(KeyError):
sql.SqlUser.get(12345)
@postgresql
def test_get_missing_user_ignore_errors():
sql.SqlUser.wipe()
assert sql.SqlUser.get(12345, ignore_errors=True) is None
@postgresql
def test_user_formdef():
sql.SqlUser.wipe()
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields = [fields.StringField(id='3', label='test', type='string')]
formdef.store()
user = sql.SqlUser()
user.name = 'Pierre'
user.form_data = {'3': 'Papier'}
user.store()
assert sql.SqlUser.get(user.id, ignore_errors=True).form_data['3'] == 'Papier'
del pub.cfg['users']['formdef']
pub.write_cfg()
@postgresql
def test_get_users_with_role():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.roles = [1]
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_role(1)) == 1
@postgresql
def test_get_users_with_name_identifier():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1
assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre'
@postgresql
def test_get_users_fts():
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.name_identifiers = ['foo']
user.store()
user_id = user.id
user = sql.SqlUser()
user.name = 'Papier'
user.store()
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre')[0]).id == user_id
@postgresql
def test_get_users_formdef_fts():
sql.SqlUser.wipe()
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields = [fields.StringField(id='3', label='test', type='string')]
formdef.store()
user = sql.SqlUser()
user.name = 'Pierre'
user.form_data = {'3': 'Papier'}
user.store()
user_id = user.id
assert len(sql.SqlUser.get_ids_from_query('pierre papier')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre papier')[0]).id == user_id
assert len(sql.SqlUser.get_ids_from_query('papier pierre')) == 1
assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('papier pierre')[0]).id == user_id
del pub.cfg['users']['formdef']
pub.write_cfg()
@postgresql
def test_urlname_change():
global formef
data_class = formdef.data_class(mode='sql')
data_class.wipe()
assert formdef.url_name == 'tests'
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
formdef.name = 'tests'
formdef.store()
assert formdef.url_name == 'tests'
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.store()
formdef.name = 'tests2'
formdef.store()
assert formdef.url_name == 'tests'
assert data_class.count() == 1
@postgresql
def test_sql_table_add_and_remove_fields():
test_formdef = FormDef()
test_formdef.name = 'tests and and remove fields'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
test_formdef.fields = [
fields.StringField(label='string'),
fields.EmailField(label='email'),
]
for field in test_formdef.fields:
if field.id is None:
field.id = test_formdef.get_new_field_id()
test_formdef.store()
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
previous_id = test_formdef.fields[-1].id
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields.append(fields.StringField(label='item'))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
assert test_formdef.fields[-1].id != previous_id
data_class = test_formdef.data_class(mode='sql')
data_class.select()
test_formdef.fields = test_formdef.fields[:-1]
test_formdef.fields.append(
fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot')))
test_formdef.fields[-1].id = test_formdef.get_new_field_id()
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
data_class.select()
@postgresql
def test_sql_table_wipe_and_drop():
test_formdef = FormDef()
test_formdef.name = 'tests wipe and drop'
test_formdef.fields = []
test_formdef.store()
assert test_formdef.table_name is not None
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
conn, cur = sql.get_connection_and_cursor()
assert table_exists(cur, test_formdef.table_name)
conn.commit()
cur.close()
data_class.wipe(drop=True)
conn, cur = sql.get_connection_and_cursor()
assert not table_exists(cur, test_formdef.table_name)
assert not table_exists(cur, test_formdef.table_name + '_evolutions')
conn.commit()
cur.close()
test_formdef.store()
conn, cur = sql.get_connection_and_cursor()
assert table_exists(cur, test_formdef.table_name)
@postgresql
def test_sql_indexes():
test_formdef = FormDef()
test_formdef.name = 'tests indexes'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
conn, cur = sql.get_connection_and_cursor()
assert index_exists(cur, test_formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
data_class.wipe(drop=True)
conn, cur = sql.get_connection_and_cursor()
assert not index_exists(cur, test_formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
@postgresql
def test_sql_table_select():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.id < 26)) == 25
assert len(data_class.select([st.Less('id', 26)])) == 25
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15
assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10
assert len(data_class.select([st.NotEqual('id', 26)])) == 49
assert len(data_class.select([st.Contains('id', [])])) == 0
assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3
assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2
assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48
assert len(data_class.select([st.NotContains('id', [])])) == 50
@postgresql
def test_sql_table_select_iterator():
test_formdef = FormDef()
test_formdef.name = 'table select'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
with pytest.raises(TypeError):
assert len(data_class.select(iterator=True)) == 50
# TypeError: object of type 'generator' has no len()
assert len(list(data_class.select(iterator=True))) == 50
assert len(list(data_class.select(lambda x: True, iterator=True))) == 50
assert len(list(data_class.select(lambda x: x.id < 26, iterator=True))) == 25
assert len(list(data_class.select([st.Less('id', 26)], iterator=True))) == 25
assert len(list(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)],
iterator=True))) == 15
assert len(list(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x:
x.id >= 15], iterator=True))) == 10
@postgresql
def test_sql_table_select_datetime():
test_formdef = FormDef()
test_formdef.name = 'table select datetime'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
d = datetime.datetime(2014, 1, 1)
for i in range(50):
t = data_class()
t.receipt_time = (d + datetime.timedelta(days=i)).timetuple()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1
assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1
assert len(data_class.select([
st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20
assert len(data_class.select([
st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29
@postgresql
def test_select_limit_offset():
test_formdef = FormDef()
test_formdef.name = 'table select limit offset'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert len(data_class.select()) == 50
for iterator in (False, True):
for func_clause in (lambda x: True, None):
assert [x.id for x in data_class.select(func_clause, order_by='id', iterator=iterator)] == list(range(1, 51))
assert [x.id for x in data_class.select(func_clause, order_by='id', limit=10, iterator=iterator)] == list(range(1, 11))
assert [x.id for x in data_class.select(func_clause, order_by='id', limit=10, offset=10, iterator=iterator)] == list(range(11, 21))
assert [x.id for x in data_class.select(func_clause, order_by='id', limit=20, offset=20, iterator=iterator)] == list(range(21, 41))
assert [x.id for x in data_class.select(func_clause, order_by='id', offset=10, iterator=iterator)] == list(range(11, 51))
assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10, iterator=iterator)]) == 10
@postgresql
def test_select_criteria_intersects():
data_class = formdef.data_class(mode='sql')
data_class.wipe()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': ['apricot', 'pear']}
formdata.store()
formdata = data_class()
formdata.status = 'wf-0'
formdata.user_id = '5'
formdata.data = {'6': []}
formdata.store()
assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2
assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1
assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0
@postgresql
def test_count():
test_formdef = FormDef()
test_formdef.name = 'table select count'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert data_class.count() == 50
assert data_class.count([st.Less('id', 26)]) == 25
@postgresql
def test_select_criteria_or_and():
test_formdef = FormDef()
test_formdef.name = 'table select criteria or and'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.store()
assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == list(range(1, 10))
assert [x.id for x in data_class.select([st.Or([
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == list(range(1, 10)) + [15]
assert [x.id for x in data_class.select([st.And([
st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == list(range(6, 10))
@postgresql
def test_select_criteria_null():
test_formdef = FormDef()
test_formdef.name = 'table select criteria null'
test_formdef.fields = []
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for x in range(50):
t = data_class()
if x % 3:
t.submission_channel = None
else:
t.submission_channel = 'mail'
t.store()
assert len(data_class.select([st.Null('submission_channel')])) == 33
assert len(data_class.select([st.NotNull('submission_channel')])) == 17
@postgresql
def test_sql_table_select_bool():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.BoolField(id='3', label='bool')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
t.data = {'3': False}
t.store()
t.data = {'3': True}
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert len(data_class.select([st.Equal('f3', True)])) == 1
assert len(data_class.select([st.Equal('f3', False)])) == 49
@postgresql
def test_sql_criteria_ilike():
test_formdef = FormDef()
test_formdef.name = 'table select bool'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.store()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == list(range(21, 51))
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == list(range(21, 51))
@postgresql
def test_sql_criteria_fts():
test_formdef = FormDef()
test_formdef.name = 'table select fts'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
assert data_class.count() == 0
for i in range(50):
t = data_class()
if i < 20:
t.data = {'3': 'foo'}
else:
t.data = {'3': 'bar'}
t.just_created()
t.store()
assert data_class.count() == 50
assert len(data_class.select()) == 50
assert [x.id for x in data_class.select([st.FtsMatch('BAR')], order_by='id')] == list(range(21, 51))
# check fts against data in history
assert len(data_class.select([st.FtsMatch('XXX')])) == 0
formdata1 = data_class.select([st.FtsMatch('BAR')])[0]
formdata1.evolution[0].comment = 'XXX'
formdata1.store()
assert len(data_class.select([st.FtsMatch('XXX')])) == 1
assert data_class.select([st.FtsMatch('XXX')])[0].id == formdata1.id
assert len(data_class.select([st.FtsMatch('yyy')])) == 0
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<span>ÿÿÿ</span>'
item.perform(formdata1)
assert formdata1.evolution[-1].display_parts()[-1] == '<span>ÿÿÿ</span>'
formdata1.store()
assert len(data_class.select([st.FtsMatch('yyy')])) == 1
assert len(data_class.select([st.FtsMatch('span')])) == 0
assert data_class.count([st.FtsMatch('Pierre')]) == 0
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
t.user_id = user.id
t.store()
assert data_class.count([st.FtsMatch('Pierre')]) == 1
# check unaccent
user = sql.SqlUser()
user.name = force_str(u'Frédéric')
user.store()
t.user_id = user.id
t.store()
assert data_class.count([st.FtsMatch(user.name)]) == 1
assert data_class.count([st.FtsMatch('Frederic')]) == 1
def table_exists(cur, table_name):
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = %s''', (table_name,))
return bool(cur.fetchone()[0] == 1)
def column_exists_in_table(cur, table_name, column_name):
cur.execute('''SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = %s
AND column_name = %s''', (table_name, column_name))
return bool(cur.fetchone()[0] == 1)
def index_exists(cur, index_name):
cur.execute('''SELECT COUNT(*) FROM pg_indexes
WHERE schemaname = 'public'
AND indexname = %s''', (index_name,))
return bool(cur.fetchone()[0] == 1)
@postgresql
def test_sql_level():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
assert sql.get_sql_level(conn, cur) == 0
sql.migrate()
assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL
# insert negative SQL level, to trigger an error, and check it's not
# changed.
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''',
(str(-1), 'sql_level'))
assert sql.get_sql_level(conn, cur) == -1
with pytest.raises(RuntimeError):
sql.migrate()
assert sql.get_sql_level(conn, cur) == -1
conn.commit()
cur.close()
def migration_level(cur):
cur.execute('SELECT value FROM wcs_meta WHERE key = %s', ('sql_level',))
row = cur.fetchone()
return int(row[0])
@postgresql
def test_migration_1_tracking_code():
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_meta')
cur.execute('DROP TABLE tracking_codes')
sql.migrate()
assert table_exists(cur, 'tracking_codes')
assert table_exists(cur, 'wcs_meta')
assert migration_level(cur) >= 1
conn.commit()
cur.close()
@postgresql
def test_migration_2_formdef_id_in_views():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN f4_display')
sql.redo_views(conn, cur, formdef, rebuild_global_views=False)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert table_exists(cur, 'wcs_view_1_tests')
assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
view_names = []
cur.execute('''SELECT table_name FROM information_schema.views
WHERE table_name LIKE %s''', ('wcs\_view\_%',))
while True:
row = cur.fetchone()
if row is None:
break
view_names.append(row[0])
fake_formdef = FormDef()
common_fields = sql.get_view_fields(fake_formdef)
# remove formdef_id for the purpose of this test
common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
', '.join([y[1] for y in common_fields]), x) for x in view_names])
assert not 'formdef_id' in union
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
sql.migrate()
assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
assert migration_level(cur) >= 2
conn.commit()
cur.close()
@postgresql
def test_migration_6_actions_roles():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN actions_roles_array')
sql.drop_views(formdef, conn, cur)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert not column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'actions_roles_array')
assert column_exists_in_table(cur, 'wcs_all_forms', 'actions_roles_array')
assert migration_level(cur) >= 6
conn.commit()
cur.close()
@postgresql
def test_migration_10_submission_channel():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
# hack a formdef table the wrong way, to check it is reconstructed
# properly before the views are created
formdef.fields[4] = fields.StringField(id='4', label='item')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_channel')
sql.drop_views(formdef, conn, cur)
formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot'))
assert not column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'submission_channel')
assert column_exists_in_table(cur, 'wcs_all_forms', 'submission_channel')
assert migration_level(cur) >= 10
conn.commit()
cur.close()
@postgresql
def test_migration_12_users_fts():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',))
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Pierre'
user.store()
# remove the fts column
cur.execute('ALTER TABLE users DROP COLUMN fts')
assert not column_exists_in_table(cur, 'users', 'fts')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'fts')
assert migration_level(cur) >= 12
# no fts, migration only prepare re-index
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 0
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True
call_command('cron') # first cron = reindex
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False
# make sure the fts is filled after the migration
assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1
conn.commit()
cur.close()
@postgresql
def test_migration_21_users_ascii_name():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',))
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Jean Sénisme'
user.store()
# remove the ascii_name column
cur.execute('ALTER TABLE users DROP COLUMN ascii_name')
assert not column_exists_in_table(cur, 'users', 'ascii_name')
sql.migrate()
assert column_exists_in_table(cur, 'users', 'ascii_name')
assert migration_level(cur) >= 21
# no fts, migration only prepare re-index
assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 0
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True
call_command('cron') # first cron = reindex
assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False
assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False
# make sure the ascii_name is filled after the migration
assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 1
conn.commit()
cur.close()
@postgresql
def test_migration_24_evolution_index():
formdef = FormDef()
formdef.name = 'tests migration 24'
formdef.fields = []
formdef.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP INDEX %s_evolutions_fid' % formdef.table_name)
cur.execute('UPDATE wcs_meta SET value = 23 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
assert not index_exists(cur, formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
sql.migrate()
conn, cur = sql.get_connection_and_cursor()
assert index_exists(cur, formdef.table_name + '_evolutions_fid')
conn.commit()
cur.close()
def drop_formdef_tables():
conn, cur = sql.get_connection_and_cursor()
cur.execute('''SELECT table_name FROM information_schema.tables''')
table_names = []
while True:
row = cur.fetchone()
if row is None:
break
table_names.append(row[0])
for table_name in table_names:
if table_name.startswith('formdata_'):
cur.execute('DROP TABLE %s CASCADE' % table_name)
@postgresql
def test_is_at_endpoint():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test endpoint'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.forced_endpoint = True
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2']
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''')
assert bool(cur.fetchone()[0] == 2)
@postgresql
def test_views_fts():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef = FormDef()
formdef.name = 'test fts'
formdef.fields = [
fields.StringField(id='0', label='string'),
]
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.data = {'0': 'foo bar'}
formdata1.store()
formdata2 = data_class()
formdata2.data = {'0': 'foo'}
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('foo',))
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('bar',))
assert bool(cur.fetchone()[0] == 1)
@postgresql
def test_select_any_formdata():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
cnt = 0
for i in range(5):
formdef = FormDef()
formdef.name = 'test any %d' % i
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
for j in range(20):
formdata = data_class()
formdata.just_created()
formdata.user_id = '%s' % ((i+j)%11)
# set receipt_time to make sure all entries are unique.
formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple()
formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i+j)%4]
if j < 5:
formdata.submission_channel = 'mail'
formdata.store()
cnt += 1
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 100
# make sure valid formdefs are used
assert len([x for x in objects if x.formdef.name == 'test any 0']) == 20
assert len([x for x in objects if x.formdef.name == 'test any 1']) == 20
# test sorting
objects = sql.AnyFormData.select(order_by='receipt_time')
assert len(objects) == 100
objects2 = sql.AnyFormData.select(order_by='-receipt_time')
assert [(x.formdef_id, x.id) for x in objects2] == list(reversed(
[(x.formdef_id, x.id) for x in objects]))
# test clauses
objects2 = sql.AnyFormData.select([st.Equal('user_id', '0')])
assert len(objects2) == len([x for x in objects if x.user_id == '0'])
objects2 = sql.AnyFormData.select([st.Equal('is_at_endpoint', True)])
assert len(objects2) == len([x for x in objects if x.status in ('wf-rejected', 'wf-finished')])
objects2 = sql.AnyFormData.select([st.Equal('submission_channel', 'mail')])
assert len(objects2) == len([x for x in objects if x.submission_channel == 'mail'])
assert objects2[0].submission_channel == 'mail'
# test offset/limit
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=0)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][:10]
objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=20)
assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][20:30]
@postgresql
def test_geoloc_in_global_view():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
formdef = FormDef()
formdef.name = 'test no geoloc'
formdef.fields = []
formdef.store()
formdef2 = FormDef()
formdef2.name = 'test with geoloc'
formdef2.fields = []
formdef2.geolocations = {'base': 'Plop'}
formdef2.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
data_class = formdef2.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.geolocations = {'base': {'lat': 12, 'lon': 21}}
formdata.store()
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 2
# test clauses
objects2 = sql.AnyFormData.select([st.Null('geoloc_base_x')])
assert len(objects2) == 1
assert not objects2[0].geolocations
objects2 = sql.AnyFormData.select([st.NotNull('geoloc_base_x')])
assert len(objects2) == 1
assert int(objects2[0].geolocations['base']['lat']) == formdata.geolocations['base']['lat']
assert int(objects2[0].geolocations['base']['lon']) == formdata.geolocations['base']['lon']
@postgresql
def test_actions_roles():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test endpoint')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '1']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
assert [x.id for x in wf.get_endpoint_status()] == ['st2']
formdef = FormDef()
formdef.name = 'test actions roles'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.status = 'wf-st1'
formdata.store()
formdata_id = formdata.id
formdata = data_class()
formdata.status = 'wf-st2'
formdata.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
assert bool(cur.fetchone()[0] == 1)
# check a change to workflow is reflected in the database
st1.items[0].by = ['2', '3']
wf.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['5', '1', '4']''')
assert bool(cur.fetchone()[0] == 0)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms
WHERE actions_roles_array && ARRAY['2', '3']''')
assert bool(cur.fetchone()[0] == 1)
# using criterias
criterias = [st.Intersects('actions_roles_array', ['2', '3'])]
total_count = sql.AnyFormData.count(criterias)
formdatas = sql.AnyFormData.select(criterias)
assert total_count == 1
assert formdatas[0].id == formdata_id
@postgresql
def test_last_update_time():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
wf = Workflow(name='test last update time')
st1 = wf.add_status('Status1', 'st1')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
wf.store()
formdef = FormDef()
formdef.name = 'test last update time'
formdef.fields = []
formdef.workflow = wf
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.status = 'wf-st1'
formdata1.just_created()
formdata1.evolution[0].comment = 'comment'
formdata1.jump_status('st1') # will add another evolution entry
formdata1.evolution[0].time = datetime.datetime(2015, 1, 1, 0, 0, 0).timetuple()
formdata1.evolution[1].time = datetime.datetime(2015, 1, 2, 0, 0, 0).timetuple()
formdata1.store()
formdata2 = data_class()
formdata2.status = 'wf-st1'
formdata2.just_created()
formdata2.evolution[0].comment = 'comment'
formdata2.jump_status('st1') # will add another evolution entry
formdata2.evolution[0].time = datetime.datetime(2015, 1, 3, 0, 0, 0).timetuple()
formdata2.evolution[1].time = datetime.datetime(2015, 1, 4, 0, 0, 0).timetuple()
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-02 00:00' ''')
assert bool(cur.fetchone()[0] == formdata1.id)
cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-04 00:00' ''')
assert bool(cur.fetchone()[0] == formdata2.id)
@postgresql
def test_view_formdef_name():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef1 = FormDef()
formdef1.name = 'test formdef name 1'
formdef1.fields = []
formdef1.store()
data_class = formdef1.data_class()
formdata1 = data_class()
formdata1.just_created()
formdata1.store()
formdef2 = FormDef()
formdef2.name = 'test formdef name 2'
formdef2.fields = []
formdef2.store()
data_class = formdef2.data_class()
formdata2 = data_class()
formdata2.just_created()
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''')
assert bool(cur.fetchone()[0] == 2)
cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 1' ''')
assert bool(str(cur.fetchone()[0]) == str(formdef1.id))
cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 2' ''')
assert bool(str(cur.fetchone()[0]) == str(formdef2.id))
@postgresql
def test_view_user_name():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef1 = FormDef()
formdef1.name = 'test user name'
formdef1.fields = []
formdef1.store()
sql.SqlUser.wipe()
user = sql.SqlUser()
user.name = 'Foobar'
user.store()
data_class = formdef1.data_class()
formdata1 = data_class()
formdata1.just_created()
formdata1.store()
data_class = formdef1.data_class()
formdata2 = data_class()
formdata2.user_id = user.id
formdata2.just_created()
formdata2.store()
cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata1.id,))
assert bool(cur.fetchone()[0] is None)
cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata2.id,))
assert bool(cur.fetchone()[0] == user.name)
@postgresql
def test_select_formdata_after_formdef_removal():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
now = datetime.datetime.now()
for i in range(2):
formdef = FormDef()
formdef.name = 'test formdef removal'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.store()
# test generic select
objects = sql.AnyFormData.select()
assert len(objects) == 2
formdef.remove_self()
objects = sql.AnyFormData.select()
assert len(objects) == 1
@postgresql
def test_views_submission_info():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
formdef = FormDef()
formdef.name = 'test backoffice submission'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata1 = data_class()
formdata1.submission_channel = 'mail'
formdata1.backoffice_submission = True
formdata1.store()
formdata2 = data_class()
formdata2.backoffice_submission = False
formdata2.store()
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS TRUE''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS FALSE''')
assert bool(cur.fetchone()[0] == 1)
cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE submission_channel = %s''', ('mail',))
assert bool(cur.fetchone()[0] == 1)
@postgresql
def test_get_formdef_new_id():
test1_formdef = FormDef()
test1_formdef.name = 'new formdef'
test1_formdef.fields = []
test1_formdef.store()
test1_id = test1_formdef.id
test1_table_name = test1_formdef.table_name
test1_formdef.remove_self()
test2_formdef = FormDef()
test2_formdef.name = 'new formdef'
test2_formdef.fields = []
test2_formdef.store()
assert test1_id != test2_formdef.id
assert test1_table_name != test2_formdef.table_name
@postgresql
def test_criticality_levels():
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
workflow1 = Workflow(name='criticality1')
workflow1.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
WorkflowCriticalityLevel(name='redder'),
WorkflowCriticalityLevel(name='reddest'),
]
workflow1.store()
workflow2 = Workflow(name='criticality2')
workflow2.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='reddest'),
]
workflow2.store()
formdef1 = FormDef()
formdef1.name = 'test criticality levels 1'
formdef1.fields = []
formdef1.workflow_id = workflow1.id
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test criticality levels 2'
formdef2.fields = []
formdef2.workflow_id = workflow2.id
formdef2.store()
data_class = formdef1.data_class(mode='sql')
for i in range(5):
formdata = data_class()
formdata.set_criticality_level(i)
formdata.store()
data_class = formdef2.data_class(mode='sql')
for i in range(2):
formdata = data_class()
formdata.set_criticality_level(i)
formdata.store()
objects = sql.AnyFormData.select(order_by='-criticality_level')
# make sure first two formdata are the highest priority ones, and the last
# two formdata are the lowest priority ones.
assert objects[0].get_criticality_level_object().name == 'reddest'
assert objects[1].get_criticality_level_object().name == 'reddest'
assert objects[-1].get_criticality_level_object().name == 'green'
assert objects[-2].get_criticality_level_object().name == 'green'
@postgresql
def test_view_performances():
pytest.skip('takes too much time')
drop_formdef_tables()
conn, cur = sql.get_connection_and_cursor()
nb_users = 1000
nb_roles = 10
nb_workflows = 5
nb_formdefs = 10
nb_fields = 10
nb_formdatas = 1000
nb_users = 10
nb_formdatas = 10000
random.seed('foobar')
# create users
sql.SqlUser.wipe()
users = []
for i in range(nb_users):
user = sql.SqlUser()
user.name = 'user %s' % i
user.store()
users.append(user)
# create roles
roles = []
for i in range(nb_roles):
role = Role(name='role%s' % i)
role.store()
roles.append(role)
# create workflows
workflows = []
for i in range(nb_workflows):
workflow = Workflow(name='test perf wf %s' % i)
for j in range(5):
status = workflow.add_status('Status %d' % j, 'st%s' % j)
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable%s' % j
commentable.by = [random.choice(roles).id, random.choice(roles).id]
status.items.append(commentable)
commentable.parent = status
if j != 4:
jump = JumpWorkflowStatusItem()
jump.id = '_jump%s' % j
jump.by = []
jump.timeout = 5
jump.status = 'st%s' % (j+1)
status.items.append(jump)
jump.parent = status
workflow.store()
workflows.append(workflow)
# create formdefs
formdefs = []
for i in range(nb_formdefs):
formdef = FormDef()
formdef.name = 'test performance view %s' % i
formdef.fields = []
for j in range(nb_fields):
formdef.fields.append(fields.StringField(id=str(j+1), label='string'))
formdef.workflow_id = workflows[i%5].id
formdef.store()
formdefs.append(formdef)
print('create formdatas')
# create formdatas
for i in range(nb_formdatas):
data_class = random.choice(formdefs).data_class()
formdata = data_class()
formdata.data = {}
for j in range(10):
formdata.data[str(j+1)] = ''.join([random.choice(string.letters) for x in range(random.randint(10, 30))])
formdata.user_id = random.choice(users).id
formdata.status = 'wf-st1'
formdata.just_created()
for j in range(5):
formdata.jump_status('st%s' % (j+2))
if random.random() < 0.5:
break
print('done')
t0 = time.time()
user_roles = [random.choice(roles).id, random.choice(roles).id]
criterias = []
criterias.append(st.NotEqual('status', 'draft'))
criterias.append(st.Equal('is_at_endpoint', False))
criterias.append(st.Intersects('actions_roles_array', user_roles))
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time', limit=20, offset=0)
print(time.time() - t0)
assert (time.time() - t0) < 0.5
@postgresql
def test_migration_30_anonymize_evo_who():
formdef = FormDef()
formdef.name = 'tests migration 24'
formdef.fields = []
formdef.store()
user = sql.SqlUser()
user.name = 'JohnDoe'
user.store()
klass = formdef.data_class()
formdata = klass()
formdata.evolution = []
formdata.anonymised = datetime.datetime.now()
evo = Evolution(formdata)
evo.who = user.id
evo.time = time.localtime()
formdata.evolution.append(evo)
formdata.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 29 WHERE key = %s', ('sql_level',))
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name)
assert cur.fetchone() == (0,)
cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',))
assert cur.fetchone() == (0,)
conn.commit()
cur.close()
sql.migrate()
conn, cur = sql.get_connection_and_cursor()
cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name)
assert cur.fetchone() == (1,)
cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',))
assert cur.fetchone() == (1,)
conn.commit()
cur.close()
@postgresql
def test_migration_31_user_label():
conn, cur = sql.get_connection_and_cursor()
cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
cur.execute('DROP VIEW wcs_all_forms CASCADE')
cur.execute('DROP VIEW wcs_view_1_tests')
cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
sql.drop_views(formdef, conn, cur)
assert not column_exists_in_table(cur, 'formdata_1_tests', 'user_label')
sql.migrate()
assert column_exists_in_table(cur, 'formdata_1_tests', 'user_label')
assert column_exists_in_table(cur, 'wcs_view_1_tests', 'user_label')
assert column_exists_in_table(cur, 'wcs_all_forms', 'user_label')
assert migration_level(cur) >= 31
conn.commit()
cur.close()