# -*- coding: utf-8 -*- from __future__ import print_function import datetime import os import random import shutil import string import sys import time from django.core.management import call_command from quixote import cleanup from wcs.qommon import force_str from wcs import formdef, publisher, fields from wcs.formdef import FormDef from wcs.formdata import Evolution from wcs.roles import Role from wcs.workflows import Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel from wcs.wf.jump import JumpWorkflowStatusItem from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem from wcs import sql import wcs.qommon.storage as st from utilities import create_temporary_pub, clean_temporary_pub import pytest postgresql = pytest.mark.postgresql try: import psycopg2 except ImportError: pass def setup_module(module): global pub, formdef cleanup() pub = create_temporary_pub(sql_mode=True) formdef = formdef.FormDef() formdef.name = 'tests' formdef.fields = [ fields.StringField(id='0', label='string'), fields.EmailField(id='1', label='email'), fields.TextField(id='2', label='text'), fields.BoolField(id='3', label='bool'), fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')), fields.DateField(id='5', label='date'), fields.ItemsField(id='6', label='items', items=('apple', 'pear', 'peach', 'apricot')), ] formdef.store() def teardown_module(module): clean_temporary_pub() @postgresql def test_sql_table_name_invalid_chars(): test_formdef = FormDef() test_formdef.name = 'test-some|char;are better/ignored' test_formdef.fields = [] test_formdef.store() assert test_formdef.table_name is not None data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 @postgresql def test_sql_data_class(): data_class = formdef.data_class(mode='sql') @postgresql def test_sql_count(): data_class = formdef.data_class(mode='sql') assert data_class.count() == 0 @postgresql def test_sql_store(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.store() assert formdata.id @postgresql def test_sql_get(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.store() id = formdata.id formdata = data_class.get(id) assert formdata.user_id == '5' assert formdata.status == 'wf-0' @postgresql def test_sql_store_channel(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.submission_channel = 'mail' formdata.store() assert data_class.get(formdata.id).submission_channel == 'mail' formdata.submission_channel = None formdata.store() assert data_class.get(formdata.id).submission_channel is None @postgresql def test_sql_get_missing(): data_class = formdef.data_class(mode='sql') with pytest.raises(KeyError): data_class.get(123456) @postgresql def test_sql_get_missing_ignore_errors(): data_class = formdef.data_class(mode='sql') assert data_class.get(123456, ignore_errors=True) is None def check_sql_field(no, value): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.data = {no: value} formdata.store() id = formdata.id formdata = data_class.get(id) assert formdata.data.get(no) == value @postgresql def test_sql_field_string(): check_sql_field('0', 'hello world') check_sql_field('0', 'élo world') check_sql_field('0', None) @postgresql def test_sql_field_email(): check_sql_field('1', 'fred@example.com') @postgresql def test_sql_field_text(): check_sql_field('2', 'long text') check_sql_field('2', 'long tèxt') @postgresql def test_sql_field_bool(): check_sql_field('3', False) check_sql_field('3', True) @postgresql def test_sql_field_item(): check_sql_field('4', 'apricot') @postgresql def test_sql_field_date(): check_sql_field('5', datetime.date.today().timetuple()) @postgresql def test_sql_field_items(): check_sql_field('6', ['apricot']) check_sql_field('6', ['apricot', 'pear']) check_sql_field('6', ['pomme', 'poire', 'pêche']) @postgresql def test_sql_geoloc(): test_formdef = FormDef() test_formdef.name = 'geoloc' test_formdef.fields = [] test_formdef.geolocations = {'base': 'Plop'} test_formdef.store() data_class = test_formdef.data_class(mode='sql') formdata = data_class() formdata.data = {} formdata.store() # NULL geolocation formdata2 = data_class.get(formdata.id) assert not formdata2.geolocations formdata.geolocations = {'base': {'lat': 12, 'lon': 21}} formdata.store() formdata2 = data_class.get(formdata.id) assert formdata2.geolocations == formdata.geolocations formdata.geolocations = {} formdata.store() formdata2 = data_class.get(formdata.id) assert formdata2.geolocations == formdata.geolocations @postgresql def test_sql_multi_geoloc(): test_formdef = FormDef() test_formdef.name = 'geoloc' test_formdef.fields = [] test_formdef.geolocations = {'base': 'Plop'} test_formdef.store() data_class = test_formdef.data_class(mode='sql') formdata = data_class() formdata.data = {} formdata.geolocations = {'base': {'lat': 12, 'lon': 21}} formdata.store() formdata2 = data_class.get(formdata.id) assert formdata2.geolocations == formdata.geolocations test_formdef.geolocations = {'base': 'Plop', '2nd': 'XXX'} test_formdef.store() formdata.geolocations = {'base': {'lat': 12, 'lon': 21}, '2nd': {'lat': -34, 'lon': -12}} formdata.store() formdata2 = data_class.get(formdata.id) assert formdata2.geolocations == formdata.geolocations test_formdef.geolocations = {'base': 'Plop'} test_formdef.store() formdata2 = data_class.get(formdata.id) assert formdata2.geolocations == {'base': {'lat': 12, 'lon': 21}} @postgresql def test_sql_change(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.data = {'0': 'test'} formdata.store() id = formdata.id formdata = data_class.get(id) assert formdata.data.get('0') == 'test' formdata.data = {'0': 'test2'} formdata.store() formdata = data_class.get(id) assert formdata.data.get('0') == 'test2' @postgresql def test_sql_remove(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.data = {'0': 'test'} formdata.store() id = formdata.id formdata = data_class.get(id) assert formdata.data.get('0') == 'test' formdata.remove_self() with pytest.raises(KeyError): data_class.get(id) @postgresql def test_sql_wipe(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.store() assert data_class.count() != 0 data_class.wipe() assert data_class.count() == 0 @postgresql def test_sql_evolution(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.just_created() formdata.store() id = formdata.id formdata = data_class.get(id) assert len(formdata.evolution) == 1 evo = Evolution() evo.time = time.localtime() evo.status = formdata.status evo.comment = 'hello world' formdata.evolution.append(evo) formdata.store() formdata = data_class.get(id) assert len(formdata.evolution) == 2 assert formdata.evolution[-1].comment == 'hello world' @postgresql def test_sql_evolution_change(): data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.just_created() formdata.store() id = formdata.id formdata = data_class.get(id) assert len(formdata.evolution) == 1 evo = Evolution() evo.time = time.localtime() evo.status = formdata.status evo.comment = 'hello world' formdata.evolution.append(evo) formdata.store() formdata = data_class.get(id) assert len(formdata.evolution) == 2 assert formdata.evolution[-1].comment == 'hello world' formdata.evolution[-1].comment = 'foobar' formdata.store() formdata = data_class.get(id) assert len(formdata.evolution) == 2 assert formdata.evolution[-1].comment == 'foobar' @postgresql def test_sql_multiple_evolutions(): data_class = formdef.data_class(mode='sql') for i in range(20): formdata = data_class() formdata.just_created() formdata.store() id = formdata.id formdata = data_class.get(id) evo = Evolution() evo.time = time.localtime() evo.status = formdata.status evo.comment = 'hello world %d' % i formdata.evolution.append(evo) formdata.store() values = data_class.select() data_class.load_all_evolutions(values) assert [x._evolution for x in values] @postgresql def test_sql_get_ids_with_indexed_value(): data_class = formdef.data_class(mode='sql') data_class.wipe() formdata = data_class() formdata.store() id1 = formdata.id formdata = data_class() formdata.user_id = '2' formdata.store() id2 = formdata.id formdata = data_class() formdata.user_id = '2' formdata.store() id3 = formdata.id ids = data_class.get_ids_with_indexed_value('user_id', '2') assert set(ids) == set([id2, id3]) @postgresql def test_sql_get_ids_from_query(): data_class = formdef.data_class(mode='sql') data_class.wipe() formdata = data_class() formdata.data = {'2': 'this is some reasonably long text'} formdata.store() id1 = formdata.id formdata = data_class() formdata.data = {'2': 'hello world is still a classical example'} formdata.store() id2 = formdata.id formdata = data_class() formdata.data = {'2': 'you would think other ideas of text would emerge'} formdata.store() id3 = formdata.id ids = data_class.get_ids_from_query('text') assert set(ids) == set([id1, id3]) ids = data_class.get_ids_from_query('classical') assert set(ids) == set([id2]) @postgresql def test_sql_rollback_on_error(): data_class = formdef.data_class(mode='sql') data_class.wipe() with pytest.raises(psycopg2.Error): # this will raise a psycopg2.ProgrammingError as there's no FOOBAR # column in the table. data_class.get_ids_with_indexed_value('FOOBAR', '2') data_class.wipe() @postgresql def test_sql_get_ids_with_indexed_value_dict(): data_class = formdef.data_class(mode='sql') data_class.wipe() formdata = data_class() formdata.store() id1 = formdata.id formdata = data_class() formdata.workflow_roles = {'plop': '2'} formdata.store() id2 = formdata.id formdata = data_class() formdata.workflow_roles = {'plop': '2'} formdata.store() id3 = formdata.id ids = data_class.get_ids_with_indexed_value('workflow_roles', '2') assert set(ids) == set([id2, id3]) @postgresql def test_create_user(): sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.store() @postgresql def test_get_user(): sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.store() assert sql.SqlUser.get(user.id) is not None @postgresql def test_get_missing_user(): sql.SqlUser.wipe() with pytest.raises(KeyError): sql.SqlUser.get(12345) @postgresql def test_get_missing_user_ignore_errors(): sql.SqlUser.wipe() assert sql.SqlUser.get(12345, ignore_errors=True) is None @postgresql def test_user_formdef(): sql.SqlUser.wipe() from wcs.admin.settings import UserFieldsFormDef formdef = UserFieldsFormDef(pub) formdef.fields = [fields.StringField(id='3', label='test', type='string')] formdef.store() user = sql.SqlUser() user.name = 'Pierre' user.form_data = {'3': 'Papier'} user.store() assert sql.SqlUser.get(user.id, ignore_errors=True).form_data['3'] == 'Papier' del pub.cfg['users']['formdef'] pub.write_cfg() @postgresql def test_get_users_with_role(): sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.roles = [1] user.store() user_id = user.id user = sql.SqlUser() user.name = 'Papier' user.store() assert len(sql.SqlUser.get_users_with_role(1)) == 1 @postgresql def test_get_users_with_name_identifier(): sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.name_identifiers = ['foo'] user.store() user_id = user.id user = sql.SqlUser() user.name = 'Papier' user.store() assert len(sql.SqlUser.get_users_with_name_identifier('foo')) == 1 assert sql.SqlUser.get_users_with_name_identifier('foo')[0].name == 'Pierre' @postgresql def test_get_users_fts(): sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.name_identifiers = ['foo'] user.store() user_id = user.id user = sql.SqlUser() user.name = 'Papier' user.store() assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1 assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre')[0]).id == user_id @postgresql def test_get_users_formdef_fts(): sql.SqlUser.wipe() from wcs.admin.settings import UserFieldsFormDef formdef = UserFieldsFormDef(pub) formdef.fields = [fields.StringField(id='3', label='test', type='string')] formdef.store() user = sql.SqlUser() user.name = 'Pierre' user.form_data = {'3': 'Papier'} user.store() user_id = user.id assert len(sql.SqlUser.get_ids_from_query('pierre papier')) == 1 assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('pierre papier')[0]).id == user_id assert len(sql.SqlUser.get_ids_from_query('papier pierre')) == 1 assert sql.SqlUser.get(sql.SqlUser.get_ids_from_query('papier pierre')[0]).id == user_id del pub.cfg['users']['formdef'] pub.write_cfg() @postgresql def test_urlname_change(): global formef data_class = formdef.data_class(mode='sql') data_class.wipe() assert formdef.url_name == 'tests' formdef.name = 'tests2' formdef.store() assert formdef.url_name == 'tests' formdef.name = 'tests' formdef.store() assert formdef.url_name == 'tests' data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.store() formdef.name = 'tests2' formdef.store() assert formdef.url_name == 'tests' assert data_class.count() == 1 @postgresql def test_sql_table_add_and_remove_fields(): test_formdef = FormDef() test_formdef.name = 'tests and and remove fields' test_formdef.fields = [] test_formdef.store() assert test_formdef.table_name is not None data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 test_formdef.fields = [ fields.StringField(label='string'), fields.EmailField(label='email'), ] for field in test_formdef.fields: if field.id is None: field.id = test_formdef.get_new_field_id() test_formdef.store() test_formdef.fields.append( fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot'))) test_formdef.fields[-1].id = test_formdef.get_new_field_id() test_formdef.store() data_class = test_formdef.data_class(mode='sql') data_class.select() previous_id = test_formdef.fields[-1].id test_formdef.fields = test_formdef.fields[:-1] test_formdef.store() data_class = test_formdef.data_class(mode='sql') data_class.select() test_formdef.fields.append(fields.StringField(label='item')) test_formdef.fields[-1].id = test_formdef.get_new_field_id() test_formdef.store() assert test_formdef.fields[-1].id != previous_id data_class = test_formdef.data_class(mode='sql') data_class.select() test_formdef.fields = test_formdef.fields[:-1] test_formdef.fields.append( fields.ItemField(label='item', items=('apple', 'pear', 'peach', 'apricot'))) test_formdef.fields[-1].id = test_formdef.get_new_field_id() test_formdef.store() data_class = test_formdef.data_class(mode='sql') data_class.select() @postgresql def test_sql_table_wipe_and_drop(): test_formdef = FormDef() test_formdef.name = 'tests wipe and drop' test_formdef.fields = [] test_formdef.store() assert test_formdef.table_name is not None data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 conn, cur = sql.get_connection_and_cursor() assert table_exists(cur, test_formdef.table_name) conn.commit() cur.close() data_class.wipe(drop=True) conn, cur = sql.get_connection_and_cursor() assert not table_exists(cur, test_formdef.table_name) assert not table_exists(cur, test_formdef.table_name + '_evolutions') conn.commit() cur.close() test_formdef.store() conn, cur = sql.get_connection_and_cursor() assert table_exists(cur, test_formdef.table_name) @postgresql def test_sql_indexes(): test_formdef = FormDef() test_formdef.name = 'tests indexes' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 conn, cur = sql.get_connection_and_cursor() assert index_exists(cur, test_formdef.table_name + '_evolutions_fid') conn.commit() cur.close() data_class.wipe(drop=True) conn, cur = sql.get_connection_and_cursor() assert not index_exists(cur, test_formdef.table_name + '_evolutions_fid') conn.commit() cur.close() @postgresql def test_sql_table_select(): test_formdef = FormDef() test_formdef.name = 'table select' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.store() assert data_class.count() == 50 assert len(data_class.select()) == 50 assert len(data_class.select(lambda x: x.id < 26)) == 25 assert len(data_class.select([st.Less('id', 26)])) == 25 assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)])) == 15 assert len(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15])) == 10 assert len(data_class.select([st.NotEqual('id', 26)])) == 49 assert len(data_class.select([st.Contains('id', [])])) == 0 assert len(data_class.select([st.Contains('id', [24, 25, 26])])) == 3 assert len(data_class.select([st.Contains('id', [24, 25, 86])])) == 2 assert len(data_class.select([st.NotContains('id', [24, 25, 86])])) == 48 assert len(data_class.select([st.NotContains('id', [])])) == 50 @postgresql def test_sql_table_select_iterator(): test_formdef = FormDef() test_formdef.name = 'table select' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.store() assert data_class.count() == 50 with pytest.raises(TypeError): assert len(data_class.select(iterator=True)) == 50 # TypeError: object of type 'generator' has no len() assert len(list(data_class.select(iterator=True))) == 50 assert len(list(data_class.select(lambda x: True, iterator=True))) == 50 assert len(list(data_class.select(lambda x: x.id < 26, iterator=True))) == 25 assert len(list(data_class.select([st.Less('id', 26)], iterator=True))) == 25 assert len(list(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10)], iterator=True))) == 15 assert len(list(data_class.select([st.Less('id', 25), st.GreaterOrEqual('id', 10), lambda x: x.id >= 15], iterator=True))) == 10 @postgresql def test_sql_table_select_datetime(): test_formdef = FormDef() test_formdef.name = 'table select datetime' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 d = datetime.datetime(2014, 1, 1) for i in range(50): t = data_class() t.receipt_time = (d + datetime.timedelta(days=i)).timetuple() t.store() assert data_class.count() == 50 assert len(data_class.select()) == 50 assert len(data_class.select(lambda x: x.receipt_time == d.timetuple())) == 1 assert len(data_class.select([st.Equal('receipt_time', d.timetuple())])) == 1 assert len(data_class.select([ st.Less('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 20 assert len(data_class.select([ st.Greater('receipt_time', (d + datetime.timedelta(days=20)).timetuple())])) == 29 @postgresql def test_select_limit_offset(): test_formdef = FormDef() test_formdef.name = 'table select limit offset' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.store() assert len(data_class.select()) == 50 for iterator in (False, True): for func_clause in (lambda x: True, None): assert [x.id for x in data_class.select(func_clause, order_by='id', iterator=iterator)] == list(range(1, 51)) assert [x.id for x in data_class.select(func_clause, order_by='id', limit=10, iterator=iterator)] == list(range(1, 11)) assert [x.id for x in data_class.select(func_clause, order_by='id', limit=10, offset=10, iterator=iterator)] == list(range(11, 21)) assert [x.id for x in data_class.select(func_clause, order_by='id', limit=20, offset=20, iterator=iterator)] == list(range(21, 41)) assert [x.id for x in data_class.select(func_clause, order_by='id', offset=10, iterator=iterator)] == list(range(11, 51)) assert len([x.id for x in data_class.select(lambda x: x.id > 10, limit=10, iterator=iterator)]) == 10 @postgresql def test_select_criteria_intersects(): data_class = formdef.data_class(mode='sql') data_class.wipe() formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.data = {'6': ['apricot']} formdata.store() formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.data = {'6': ['apricot', 'pear']} formdata.store() formdata = data_class() formdata.status = 'wf-0' formdata.user_id = '5' formdata.data = {'6': []} formdata.store() assert len(data_class.select([st.Intersects('f6', ['apricot'])])) == 2 assert len(data_class.select([st.Intersects('f6', ['pear'])])) == 1 assert len(data_class.select([st.Intersects('f6', ['apple'])])) == 0 @postgresql def test_count(): test_formdef = FormDef() test_formdef.name = 'table select count' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.store() assert data_class.count() == 50 assert data_class.count([st.Less('id', 26)]) == 25 @postgresql def test_select_criteria_or_and(): test_formdef = FormDef() test_formdef.name = 'table select criteria or and' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.store() assert [x.id for x in data_class.select([st.Or([st.Less('id', 10)])], order_by='id')] == list(range(1, 10)) assert [x.id for x in data_class.select([st.Or([ st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == list(range(1, 10)) + [15] assert [x.id for x in data_class.select([st.And([ st.Less('id', 10), st.Greater('id', 5)])], order_by='id')] == list(range(6, 10)) @postgresql def test_select_criteria_null(): test_formdef = FormDef() test_formdef.name = 'table select criteria null' test_formdef.fields = [] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for x in range(50): t = data_class() if x % 3: t.submission_channel = None else: t.submission_channel = 'mail' t.store() assert len(data_class.select([st.Null('submission_channel')])) == 33 assert len(data_class.select([st.NotNull('submission_channel')])) == 17 @postgresql def test_sql_table_select_bool(): test_formdef = FormDef() test_formdef.name = 'table select bool' test_formdef.fields = [fields.BoolField(id='3', label='bool')] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() t.data = {'3': False} t.store() t.data = {'3': True} t.store() assert data_class.count() == 50 assert len(data_class.select()) == 50 assert len(data_class.select([st.Equal('f3', True)])) == 1 assert len(data_class.select([st.Equal('f3', False)])) == 49 @postgresql def test_sql_criteria_ilike(): test_formdef = FormDef() test_formdef.name = 'table select bool' test_formdef.fields = [fields.StringField(id='3', label='string')] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() if i < 20: t.data = {'3': 'foo'} else: t.data = {'3': 'bar'} t.store() t.store() assert data_class.count() == 50 assert len(data_class.select()) == 50 assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == list(range(21, 51)) assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == list(range(21, 51)) @postgresql def test_sql_criteria_fts(): test_formdef = FormDef() test_formdef.name = 'table select fts' test_formdef.fields = [fields.StringField(id='3', label='string')] test_formdef.store() data_class = test_formdef.data_class(mode='sql') assert data_class.count() == 0 for i in range(50): t = data_class() if i < 20: t.data = {'3': 'foo'} else: t.data = {'3': 'bar'} t.just_created() t.store() assert data_class.count() == 50 assert len(data_class.select()) == 50 assert [x.id for x in data_class.select([st.FtsMatch('BAR')], order_by='id')] == list(range(21, 51)) # check fts against data in history assert len(data_class.select([st.FtsMatch('XXX')])) == 0 formdata1 = data_class.select([st.FtsMatch('BAR')])[0] formdata1.evolution[0].comment = 'XXX' formdata1.store() assert len(data_class.select([st.FtsMatch('XXX')])) == 1 assert data_class.select([st.FtsMatch('XXX')])[0].id == formdata1.id assert len(data_class.select([st.FtsMatch('yyy')])) == 0 item = RegisterCommenterWorkflowStatusItem() item.comment = 'ÿÿÿ' item.perform(formdata1) assert formdata1.evolution[-1].display_parts()[-1] == 'ÿÿÿ' formdata1.store() assert len(data_class.select([st.FtsMatch('yyy')])) == 1 assert len(data_class.select([st.FtsMatch('span')])) == 0 assert data_class.count([st.FtsMatch('Pierre')]) == 0 sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.store() t.user_id = user.id t.store() assert data_class.count([st.FtsMatch('Pierre')]) == 1 # check unaccent user = sql.SqlUser() user.name = force_str(u'Frédéric') user.store() t.user_id = user.id t.store() assert data_class.count([st.FtsMatch(user.name)]) == 1 assert data_class.count([st.FtsMatch('Frederic')]) == 1 def table_exists(cur, table_name): cur.execute('''SELECT COUNT(*) FROM information_schema.tables WHERE table_name = %s''', (table_name,)) return bool(cur.fetchone()[0] == 1) def column_exists_in_table(cur, table_name, column_name): cur.execute('''SELECT COUNT(*) FROM information_schema.columns WHERE table_name = %s AND column_name = %s''', (table_name, column_name)) return bool(cur.fetchone()[0] == 1) def index_exists(cur, index_name): cur.execute('''SELECT COUNT(*) FROM pg_indexes WHERE schemaname = 'public' AND indexname = %s''', (index_name,)) return bool(cur.fetchone()[0] == 1) @postgresql def test_sql_level(): conn, cur = sql.get_connection_and_cursor() cur.execute('DROP TABLE wcs_meta') assert sql.get_sql_level(conn, cur) == 0 sql.migrate() assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL # insert negative SQL level, to trigger an error, and check it's not # changed. cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(-1), 'sql_level')) assert sql.get_sql_level(conn, cur) == -1 with pytest.raises(RuntimeError): sql.migrate() assert sql.get_sql_level(conn, cur) == -1 conn.commit() cur.close() def migration_level(cur): cur.execute('SELECT value FROM wcs_meta WHERE key = %s', ('sql_level',)) row = cur.fetchone() return int(row[0]) @postgresql def test_migration_1_tracking_code(): conn, cur = sql.get_connection_and_cursor() cur.execute('DROP TABLE wcs_meta') cur.execute('DROP TABLE tracking_codes') sql.migrate() assert table_exists(cur, 'tracking_codes') assert table_exists(cur, 'wcs_meta') assert migration_level(cur) >= 1 conn.commit() cur.close() @postgresql def test_migration_2_formdef_id_in_views(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',)) cur.execute('DROP VIEW wcs_all_forms CASCADE') # hack a formdef table the wrong way, to check it is reconstructed # properly before the views are created formdef.fields[4] = fields.StringField(id='4', label='item') cur.execute('DROP VIEW wcs_view_1_tests') cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN f4_display') sql.redo_views(conn, cur, formdef, rebuild_global_views=False) formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')) assert table_exists(cur, 'wcs_view_1_tests') assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display') view_names = [] cur.execute('''SELECT table_name FROM information_schema.views WHERE table_name LIKE %s''', ('wcs\_view\_%',)) while True: row = cur.fetchone() if row is None: break view_names.append(row[0]) fake_formdef = FormDef() common_fields = sql.get_view_fields(fake_formdef) # remove formdef_id for the purpose of this test common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0]) union = ' UNION '.join(['''SELECT %s FROM %s''' % ( ', '.join([y[1] for y in common_fields]), x) for x in view_names]) assert not 'formdef_id' in union cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union) sql.migrate() assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id') assert migration_level(cur) >= 2 conn.commit() cur.close() @postgresql def test_migration_6_actions_roles(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',)) cur.execute('DROP VIEW wcs_all_forms CASCADE') # hack a formdef table the wrong way, to check it is reconstructed # properly before the views are created formdef.fields[4] = fields.StringField(id='4', label='item') cur.execute('DROP VIEW wcs_view_1_tests') cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN actions_roles_array') sql.drop_views(formdef, conn, cur) formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')) assert not column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array') sql.migrate() assert column_exists_in_table(cur, 'formdata_1_tests', 'actions_roles_array') assert column_exists_in_table(cur, 'wcs_view_1_tests', 'actions_roles_array') assert column_exists_in_table(cur, 'wcs_all_forms', 'actions_roles_array') assert migration_level(cur) >= 6 conn.commit() cur.close() @postgresql def test_migration_10_submission_channel(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',)) cur.execute('DROP VIEW wcs_all_forms CASCADE') # hack a formdef table the wrong way, to check it is reconstructed # properly before the views are created formdef.fields[4] = fields.StringField(id='4', label='item') cur.execute('DROP VIEW wcs_view_1_tests') cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_channel') sql.drop_views(formdef, conn, cur) formdef.fields[4] = fields.ItemField(id='4', label='item', items=('apple', 'pear', 'peach', 'apricot')) assert not column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel') sql.migrate() assert column_exists_in_table(cur, 'formdata_1_tests', 'submission_channel') assert column_exists_in_table(cur, 'wcs_view_1_tests', 'submission_channel') assert column_exists_in_table(cur, 'wcs_all_forms', 'submission_channel') assert migration_level(cur) >= 10 conn.commit() cur.close() @postgresql def test_migration_12_users_fts(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',)) sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Pierre' user.store() # remove the fts column cur.execute('ALTER TABLE users DROP COLUMN fts') assert not column_exists_in_table(cur, 'users', 'fts') sql.migrate() assert column_exists_in_table(cur, 'users', 'fts') assert migration_level(cur) >= 12 # no fts, migration only prepare re-index assert len(sql.SqlUser.get_ids_from_query('pierre')) == 0 assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True call_command('cron') # first cron = reindex assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False # make sure the fts is filled after the migration assert len(sql.SqlUser.get_ids_from_query('pierre')) == 1 conn.commit() cur.close() @postgresql def test_migration_21_users_ascii_name(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 11 WHERE key = %s', ('sql_level',)) sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Jean Sénisme' user.store() # remove the ascii_name column cur.execute('ALTER TABLE users DROP COLUMN ascii_name') assert not column_exists_in_table(cur, 'users', 'ascii_name') sql.migrate() assert column_exists_in_table(cur, 'users', 'ascii_name') assert migration_level(cur) >= 21 # no fts, migration only prepare re-index assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 0 assert sql.is_reindex_needed('user', conn=conn, cur=cur) is True assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is True call_command('cron') # first cron = reindex assert sql.is_reindex_needed('user', conn=conn, cur=cur) is False assert sql.is_reindex_needed('formdata', conn=conn, cur=cur) is False # make sure the ascii_name is filled after the migration assert sql.SqlUser.count([st.Equal('ascii_name', 'jean senisme')]) == 1 conn.commit() cur.close() @postgresql def test_migration_24_evolution_index(): formdef = FormDef() formdef.name = 'tests migration 24' formdef.fields = [] formdef.store() conn, cur = sql.get_connection_and_cursor() cur.execute('DROP INDEX %s_evolutions_fid' % formdef.table_name) cur.execute('UPDATE wcs_meta SET value = 23 WHERE key = %s', ('sql_level',)) conn.commit() cur.close() conn, cur = sql.get_connection_and_cursor() assert not index_exists(cur, formdef.table_name + '_evolutions_fid') conn.commit() cur.close() sql.migrate() conn, cur = sql.get_connection_and_cursor() assert index_exists(cur, formdef.table_name + '_evolutions_fid') conn.commit() cur.close() def drop_formdef_tables(): conn, cur = sql.get_connection_and_cursor() cur.execute('''SELECT table_name FROM information_schema.tables''') table_names = [] while True: row = cur.fetchone() if row is None: break table_names.append(row[0]) for table_name in table_names: if table_name.startswith('formdata_'): cur.execute('DROP TABLE %s CASCADE' % table_name) @postgresql def test_is_at_endpoint(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() wf = Workflow(name='test endpoint') st1 = wf.add_status('Status1', 'st1') st2 = wf.add_status('Status2', 'st2') commentable = CommentableWorkflowStatusItem() commentable.id = '_commentable' commentable.by = ['_submitter', '_receiver'] st1.items.append(commentable) commentable.parent = st1 wf.store() assert [x.id for x in wf.get_endpoint_status()] == ['st2'] formdef = FormDef() formdef.name = 'test endpoint' formdef.fields = [] formdef.workflow = wf formdef.store() data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-st1' formdata.store() formdata = data_class() formdata.status = 'wf-st2' formdata.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''') assert bool(cur.fetchone()[0] == 2) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE status = 'wf-st1' ''') assert bool(cur.fetchone()[0] == 1) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''') assert bool(cur.fetchone()[0] == 1) # check a change to workflow is reflected in the database st1.forced_endpoint = True wf.store() assert [x.id for x in wf.get_endpoint_status()] == ['st1', 'st2'] cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE is_at_endpoint = true''') assert bool(cur.fetchone()[0] == 2) @postgresql def test_views_fts(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() formdef = FormDef() formdef.name = 'test fts' formdef.fields = [ fields.StringField(id='0', label='string'), ] formdef.store() data_class = formdef.data_class(mode='sql') formdata1 = data_class() formdata1.data = {'0': 'foo bar'} formdata1.store() formdata2 = data_class() formdata2.data = {'0': 'foo'} formdata2.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('foo',)) assert bool(cur.fetchone()[0] == 2) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE fts @@ plainto_tsquery(%s)''', ('bar',)) assert bool(cur.fetchone()[0] == 1) @postgresql def test_select_any_formdata(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() now = datetime.datetime.now() cnt = 0 for i in range(5): formdef = FormDef() formdef.name = 'test any %d' % i formdef.fields = [] formdef.store() data_class = formdef.data_class(mode='sql') for j in range(20): formdata = data_class() formdata.just_created() formdata.user_id = '%s' % ((i+j)%11) # set receipt_time to make sure all entries are unique. formdata.receipt_time = (now + datetime.timedelta(seconds=cnt)).timetuple() formdata.status = ['wf-new', 'wf-accepted', 'wf-rejected', 'wf-finished'][(i+j)%4] if j < 5: formdata.submission_channel = 'mail' formdata.store() cnt += 1 # test generic select objects = sql.AnyFormData.select() assert len(objects) == 100 # make sure valid formdefs are used assert len([x for x in objects if x.formdef.name == 'test any 0']) == 20 assert len([x for x in objects if x.formdef.name == 'test any 1']) == 20 # test sorting objects = sql.AnyFormData.select(order_by='receipt_time') assert len(objects) == 100 objects2 = sql.AnyFormData.select(order_by='-receipt_time') assert [(x.formdef_id, x.id) for x in objects2] == list(reversed( [(x.formdef_id, x.id) for x in objects])) # test clauses objects2 = sql.AnyFormData.select([st.Equal('user_id', '0')]) assert len(objects2) == len([x for x in objects if x.user_id == '0']) objects2 = sql.AnyFormData.select([st.Equal('is_at_endpoint', True)]) assert len(objects2) == len([x for x in objects if x.status in ('wf-rejected', 'wf-finished')]) objects2 = sql.AnyFormData.select([st.Equal('submission_channel', 'mail')]) assert len(objects2) == len([x for x in objects if x.submission_channel == 'mail']) assert objects2[0].submission_channel == 'mail' # test offset/limit objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=0) assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][:10] objects2 = sql.AnyFormData.select(order_by='receipt_time', limit=10, offset=20) assert [(x.formdef_id, x.id) for x in objects2] == [(x.formdef_id, x.id) for x in objects][20:30] @postgresql def test_geoloc_in_global_view(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() now = datetime.datetime.now() formdef = FormDef() formdef.name = 'test no geoloc' formdef.fields = [] formdef.store() formdef2 = FormDef() formdef2.name = 'test with geoloc' formdef2.fields = [] formdef2.geolocations = {'base': 'Plop'} formdef2.store() data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.just_created() formdata.store() data_class = formdef2.data_class(mode='sql') formdata = data_class() formdata.just_created() formdata.geolocations = {'base': {'lat': 12, 'lon': 21}} formdata.store() # test generic select objects = sql.AnyFormData.select() assert len(objects) == 2 # test clauses objects2 = sql.AnyFormData.select([st.Null('geoloc_base_x')]) assert len(objects2) == 1 assert not objects2[0].geolocations objects2 = sql.AnyFormData.select([st.NotNull('geoloc_base_x')]) assert len(objects2) == 1 assert int(objects2[0].geolocations['base']['lat']) == formdata.geolocations['base']['lat'] assert int(objects2[0].geolocations['base']['lon']) == formdata.geolocations['base']['lon'] @postgresql def test_actions_roles(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() wf = Workflow(name='test endpoint') st1 = wf.add_status('Status1', 'st1') st2 = wf.add_status('Status2', 'st2') commentable = CommentableWorkflowStatusItem() commentable.id = '_commentable' commentable.by = ['_submitter', '1'] st1.items.append(commentable) commentable.parent = st1 wf.store() assert [x.id for x in wf.get_endpoint_status()] == ['st2'] formdef = FormDef() formdef.name = 'test actions roles' formdef.fields = [] formdef.workflow = wf formdef.store() data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.status = 'wf-st1' formdata.store() formdata_id = formdata.id formdata = data_class() formdata.status = 'wf-st2' formdata.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''') assert bool(cur.fetchone()[0] == 2) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE actions_roles_array && ARRAY['5', '1', '4']''') assert bool(cur.fetchone()[0] == 1) # check a change to workflow is reflected in the database st1.items[0].by = ['2', '3'] wf.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE actions_roles_array && ARRAY['5', '1', '4']''') assert bool(cur.fetchone()[0] == 0) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE actions_roles_array && ARRAY['2', '3']''') assert bool(cur.fetchone()[0] == 1) # using criterias criterias = [st.Intersects('actions_roles_array', ['2', '3'])] total_count = sql.AnyFormData.count(criterias) formdatas = sql.AnyFormData.select(criterias) assert total_count == 1 assert formdatas[0].id == formdata_id @postgresql def test_last_update_time(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() wf = Workflow(name='test last update time') st1 = wf.add_status('Status1', 'st1') commentable = CommentableWorkflowStatusItem() commentable.id = '_commentable' commentable.by = ['_submitter', '_receiver'] st1.items.append(commentable) commentable.parent = st1 wf.store() formdef = FormDef() formdef.name = 'test last update time' formdef.fields = [] formdef.workflow = wf formdef.store() data_class = formdef.data_class(mode='sql') formdata1 = data_class() formdata1.status = 'wf-st1' formdata1.just_created() formdata1.evolution[0].comment = 'comment' formdata1.jump_status('st1') # will add another evolution entry formdata1.evolution[0].time = datetime.datetime(2015, 1, 1, 0, 0, 0).timetuple() formdata1.evolution[1].time = datetime.datetime(2015, 1, 2, 0, 0, 0).timetuple() formdata1.store() formdata2 = data_class() formdata2.status = 'wf-st1' formdata2.just_created() formdata2.evolution[0].comment = 'comment' formdata2.jump_status('st1') # will add another evolution entry formdata2.evolution[0].time = datetime.datetime(2015, 1, 3, 0, 0, 0).timetuple() formdata2.evolution[1].time = datetime.datetime(2015, 1, 4, 0, 0, 0).timetuple() formdata2.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''') assert bool(cur.fetchone()[0] == 2) cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-02 00:00' ''') assert bool(cur.fetchone()[0] == formdata1.id) cur.execute('''SELECT id FROM wcs_all_forms WHERE last_update_time = '2015-01-04 00:00' ''') assert bool(cur.fetchone()[0] == formdata2.id) @postgresql def test_view_formdef_name(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() formdef1 = FormDef() formdef1.name = 'test formdef name 1' formdef1.fields = [] formdef1.store() data_class = formdef1.data_class() formdata1 = data_class() formdata1.just_created() formdata1.store() formdef2 = FormDef() formdef2.name = 'test formdef name 2' formdef2.fields = [] formdef2.store() data_class = formdef2.data_class() formdata2 = data_class() formdata2.just_created() formdata2.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms''') assert bool(cur.fetchone()[0] == 2) cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 1' ''') assert bool(str(cur.fetchone()[0]) == str(formdef1.id)) cur.execute('''SELECT formdef_id FROM wcs_all_forms WHERE formdef_name = 'test formdef name 2' ''') assert bool(str(cur.fetchone()[0]) == str(formdef2.id)) @postgresql def test_view_user_name(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() formdef1 = FormDef() formdef1.name = 'test user name' formdef1.fields = [] formdef1.store() sql.SqlUser.wipe() user = sql.SqlUser() user.name = 'Foobar' user.store() data_class = formdef1.data_class() formdata1 = data_class() formdata1.just_created() formdata1.store() data_class = formdef1.data_class() formdata2 = data_class() formdata2.user_id = user.id formdata2.just_created() formdata2.store() cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata1.id,)) assert bool(cur.fetchone()[0] is None) cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata2.id,)) assert bool(cur.fetchone()[0] == user.name) @postgresql def test_select_formdata_after_formdef_removal(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() now = datetime.datetime.now() for i in range(2): formdef = FormDef() formdef.name = 'test formdef removal' formdef.fields = [] formdef.store() data_class = formdef.data_class(mode='sql') formdata = data_class() formdata.just_created() formdata.store() # test generic select objects = sql.AnyFormData.select() assert len(objects) == 2 formdef.remove_self() objects = sql.AnyFormData.select() assert len(objects) == 1 @postgresql def test_views_submission_info(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() formdef = FormDef() formdef.name = 'test backoffice submission' formdef.fields = [] formdef.store() data_class = formdef.data_class(mode='sql') formdata1 = data_class() formdata1.submission_channel = 'mail' formdata1.backoffice_submission = True formdata1.store() formdata2 = data_class() formdata2.backoffice_submission = False formdata2.store() cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS TRUE''') assert bool(cur.fetchone()[0] == 1) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE backoffice_submission IS FALSE''') assert bool(cur.fetchone()[0] == 1) cur.execute('''SELECT COUNT(*) FROM wcs_all_forms WHERE submission_channel = %s''', ('mail',)) assert bool(cur.fetchone()[0] == 1) @postgresql def test_get_formdef_new_id(): test1_formdef = FormDef() test1_formdef.name = 'new formdef' test1_formdef.fields = [] test1_formdef.store() test1_id = test1_formdef.id test1_table_name = test1_formdef.table_name test1_formdef.remove_self() test2_formdef = FormDef() test2_formdef.name = 'new formdef' test2_formdef.fields = [] test2_formdef.store() assert test1_id != test2_formdef.id assert test1_table_name != test2_formdef.table_name @postgresql def test_criticality_levels(): drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() workflow1 = Workflow(name='criticality1') workflow1.criticality_levels = [ WorkflowCriticalityLevel(name='green'), WorkflowCriticalityLevel(name='yellow'), WorkflowCriticalityLevel(name='red'), WorkflowCriticalityLevel(name='redder'), WorkflowCriticalityLevel(name='reddest'), ] workflow1.store() workflow2 = Workflow(name='criticality2') workflow2.criticality_levels = [ WorkflowCriticalityLevel(name='green'), WorkflowCriticalityLevel(name='reddest'), ] workflow2.store() formdef1 = FormDef() formdef1.name = 'test criticality levels 1' formdef1.fields = [] formdef1.workflow_id = workflow1.id formdef1.store() formdef2 = FormDef() formdef2.name = 'test criticality levels 2' formdef2.fields = [] formdef2.workflow_id = workflow2.id formdef2.store() data_class = formdef1.data_class(mode='sql') for i in range(5): formdata = data_class() formdata.set_criticality_level(i) formdata.store() data_class = formdef2.data_class(mode='sql') for i in range(2): formdata = data_class() formdata.set_criticality_level(i) formdata.store() objects = sql.AnyFormData.select(order_by='-criticality_level') # make sure first two formdata are the highest priority ones, and the last # two formdata are the lowest priority ones. assert objects[0].get_criticality_level_object().name == 'reddest' assert objects[1].get_criticality_level_object().name == 'reddest' assert objects[-1].get_criticality_level_object().name == 'green' assert objects[-2].get_criticality_level_object().name == 'green' @postgresql def test_view_performances(): pytest.skip('takes too much time') drop_formdef_tables() conn, cur = sql.get_connection_and_cursor() nb_users = 1000 nb_roles = 10 nb_workflows = 5 nb_formdefs = 10 nb_fields = 10 nb_formdatas = 1000 nb_users = 10 nb_formdatas = 10000 random.seed('foobar') # create users sql.SqlUser.wipe() users = [] for i in range(nb_users): user = sql.SqlUser() user.name = 'user %s' % i user.store() users.append(user) # create roles roles = [] for i in range(nb_roles): role = Role(name='role%s' % i) role.store() roles.append(role) # create workflows workflows = [] for i in range(nb_workflows): workflow = Workflow(name='test perf wf %s' % i) for j in range(5): status = workflow.add_status('Status %d' % j, 'st%s' % j) commentable = CommentableWorkflowStatusItem() commentable.id = '_commentable%s' % j commentable.by = [random.choice(roles).id, random.choice(roles).id] status.items.append(commentable) commentable.parent = status if j != 4: jump = JumpWorkflowStatusItem() jump.id = '_jump%s' % j jump.by = [] jump.timeout = 5 jump.status = 'st%s' % (j+1) status.items.append(jump) jump.parent = status workflow.store() workflows.append(workflow) # create formdefs formdefs = [] for i in range(nb_formdefs): formdef = FormDef() formdef.name = 'test performance view %s' % i formdef.fields = [] for j in range(nb_fields): formdef.fields.append(fields.StringField(id=str(j+1), label='string')) formdef.workflow_id = workflows[i%5].id formdef.store() formdefs.append(formdef) print('create formdatas') # create formdatas for i in range(nb_formdatas): data_class = random.choice(formdefs).data_class() formdata = data_class() formdata.data = {} for j in range(10): formdata.data[str(j+1)] = ''.join([random.choice(string.letters) for x in range(random.randint(10, 30))]) formdata.user_id = random.choice(users).id formdata.status = 'wf-st1' formdata.just_created() for j in range(5): formdata.jump_status('st%s' % (j+2)) if random.random() < 0.5: break print('done') t0 = time.time() user_roles = [random.choice(roles).id, random.choice(roles).id] criterias = [] criterias.append(st.NotEqual('status', 'draft')) criterias.append(st.Equal('is_at_endpoint', False)) criterias.append(st.Intersects('actions_roles_array', user_roles)) formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time', limit=20, offset=0) print(time.time() - t0) assert (time.time() - t0) < 0.5 @postgresql def test_migration_30_anonymize_evo_who(): formdef = FormDef() formdef.name = 'tests migration 24' formdef.fields = [] formdef.store() user = sql.SqlUser() user.name = 'JohnDoe' user.store() klass = formdef.data_class() formdata = klass() formdata.evolution = [] formdata.anonymised = datetime.datetime.now() evo = Evolution(formdata) evo.who = user.id evo.time = time.localtime() formdata.evolution.append(evo) formdata.store() conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 29 WHERE key = %s', ('sql_level',)) conn.commit() cur.close() conn, cur = sql.get_connection_and_cursor() cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name) assert cur.fetchone() == (0,) cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',)) assert cur.fetchone() == (0,) conn.commit() cur.close() sql.migrate() conn, cur = sql.get_connection_and_cursor() cur.execute('SELECT COUNT(*) FROM %s_evolutions WHERE who IS NULL' % formdef.table_name) assert cur.fetchone() == (1,) cur.execute('SELECT COUNT(*) FROM wcs_meta WHERE key = %s AND value::integer > 29', ('sql_level',)) assert cur.fetchone() == (1,) conn.commit() cur.close() @postgresql def test_migration_31_user_label(): conn, cur = sql.get_connection_and_cursor() cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',)) cur.execute('DROP VIEW wcs_all_forms CASCADE') cur.execute('DROP VIEW wcs_view_1_tests') cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label') sql.drop_views(formdef, conn, cur) assert not column_exists_in_table(cur, 'formdata_1_tests', 'user_label') sql.migrate() assert column_exists_in_table(cur, 'formdata_1_tests', 'user_label') assert column_exists_in_table(cur, 'wcs_view_1_tests', 'user_label') assert column_exists_in_table(cur, 'wcs_all_forms', 'user_label') assert migration_level(cur) >= 31 conn.commit() cur.close()