wcs/tests/test_convert_to_sql.py

182 lines
5.2 KiB
Python

import os
import random
import psycopg2
import pytest
from django.core.management import call_command, get_commands
from django.core.management.base import CommandError
from quixote import get_publisher
from wcs.carddef import CardDef
from wcs.fields import BoolField
from wcs.formdef import FormDef
from wcs.roles import Role
from wcs.sql import AnyFormData, cleanup_connection
from .utilities import clean_temporary_pub, create_temporary_pub, force_connections_close
@pytest.fixture
def formdeffix():
for formdef in FormDef.select():
formdef.remove_self()
formdef = FormDef()
formdef.id = 123
formdef.name = 'testform'
formdef.description = 'plop'
formdef.fields = [BoolField(id='1')]
formdef.store()
data_class = formdef.data_class(mode='files')
for value in (True, True, True, False):
formdata = data_class()
formdata.data = {'1': value}
formdata.store()
return formdef
@pytest.fixture
def carddeffix():
for carddef in CardDef.select():
carddef.remove_self()
carddef = CardDef()
carddef.id = 456
carddef.name = 'testcard'
carddef.description = 'plop'
carddef.fields = [BoolField(id='1')]
carddef.store()
data_class = carddef.data_class(mode='files')
for value in (True, True, True, False):
formdata = data_class()
formdata.data = {'1': value}
formdata.store()
return carddef
@pytest.fixture(scope='module')
def cursor():
conn = psycopg2.connect(user=os.environ.get('USER'), dbname='postgres')
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
yield cur
cur.close()
@pytest.fixture
def database(cursor):
i = 0
while True:
dbname = 'wcstests%d' % random.randint(0, 100000)
try:
cursor.execute('CREATE DATABASE %s' % dbname)
break
except psycopg2.Error:
if i < 5:
i += 1
continue
raise
yield dbname
cleanup_connection()
cursor.execute('DROP DATABASE %s' % dbname)
@pytest.fixture()
def pub(request):
pub = create_temporary_pub(pickle_mode=True)
yield pub
clean_temporary_pub()
@pytest.fixture
def local_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.name_identifiers = ['0123456789']
user.store()
return user
def test_command_exists():
assert 'convert_to_sql' in get_commands()
def test_unknown_publisher_fails(pub):
with pytest.raises(CommandError) as excinfo:
call_command('convert_to_sql', '-d', 'unknown.net', '--database', 'foobar')
assert str(excinfo.value) == 'unknown tenant'
def test_failing_connection(pub):
with pytest.raises(psycopg2.OperationalError) as excinfo:
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar', '--port', '666')
assert 'could not connect' in str(excinfo.value) or 'connection to server on socket' in str(excinfo.value)
def test_database_does_not_exist(pub):
new_database = 'test_%s' % random.randint(1000, 9999)
with pytest.raises(psycopg2.OperationalError) as excinfo:
call_command('convert_to_sql', '-d', 'example.net', '--database', new_database)
assert 'exist' in str(excinfo.value) # works for english + french postgresql
def test_already_migrated_fails():
create_temporary_pub()
with pytest.raises(CommandError) as excinfo:
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar')
assert str(excinfo.value) == 'tenant already using postgresql'
cleanup_connection()
force_connections_close()
clean_temporary_pub()
def test_setup_database(pub, database):
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
pub.set_config()
assert pub.cfg['postgresql'].get('database') == database
def test_migration(pub, database):
assert 'postgresql' not in pub.cfg
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
pub.set_config()
assert 'postgresql' in pub.cfg
def test_data_is_migrated(pub, database, local_user, formdeffix, carddeffix):
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
pub.set_config()
formdefs = FormDef.select()
assert len(formdefs) == 1
data_class = formdefs[0].data_class(mode='sql')
assert len(data_class.keys()) == 4
carddefs = CardDef.select()
assert len(carddefs) == 1
data_class = carddefs[0].data_class(mode='sql')
assert len(data_class.keys()) == 4
# check global table is created
objects = AnyFormData.select()
assert len(objects) == 4
# check triggers were created
formdata = formdefs[0].data_class(mode='sql')()
formdata.just_created()
formdata.store()
objects = AnyFormData.select()
assert len(objects) == 5
def test_users_and_roles(pub, database, local_user):
role = Role(name='Test Role')
role.store()
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
assert len(pub.user_class.get_users_with_name_identifier('0123456789')) == 1
assert pub.role_class.count() == 1