154 lines
4.5 KiB
Python
154 lines
4.5 KiB
Python
import os
|
|
import random
|
|
|
|
import psycopg2
|
|
import pytest
|
|
from django.core.management import call_command, get_commands
|
|
from django.core.management.base import CommandError
|
|
from quixote import get_publisher
|
|
|
|
from wcs.fields import BoolField
|
|
from wcs.formdef import FormDef
|
|
from wcs.roles import Role
|
|
from wcs.sql import cleanup_connection
|
|
|
|
from .utilities import clean_temporary_pub, create_temporary_pub, force_connections_close
|
|
|
|
|
|
@pytest.fixture
|
|
def formdeffix():
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'testform'
|
|
formdef.description = 'plop'
|
|
formdef.fields = [BoolField(id='1')]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
for value in (True, True, True, False):
|
|
formdata = data_class()
|
|
formdata.data = {'1': value}
|
|
formdata.store()
|
|
|
|
return formdef
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def cursor():
|
|
conn = psycopg2.connect(user=os.environ.get('USER'))
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
cur = conn.cursor()
|
|
yield cur
|
|
cur.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def database(cursor):
|
|
i = 0
|
|
while True:
|
|
dbname = 'wcstests%d' % random.randint(0, 100000)
|
|
try:
|
|
cursor.execute('CREATE DATABASE %s' % dbname)
|
|
break
|
|
except psycopg2.Error:
|
|
if i < 5:
|
|
i += 1
|
|
continue
|
|
raise
|
|
yield dbname
|
|
cleanup_connection()
|
|
cursor.execute('DROP DATABASE %s' % dbname)
|
|
|
|
|
|
@pytest.fixture()
|
|
def pub(request):
|
|
pub = create_temporary_pub(sql_mode=False)
|
|
yield pub
|
|
clean_temporary_pub()
|
|
|
|
|
|
@pytest.fixture
|
|
def local_user():
|
|
get_publisher().user_class.wipe()
|
|
user = get_publisher().user_class()
|
|
user.name = 'Jean Darmette'
|
|
user.email = 'jean.darmette@triffouilis.fr'
|
|
user.name_identifiers = ['0123456789']
|
|
user.store()
|
|
return user
|
|
|
|
|
|
def test_command_exists():
|
|
assert 'convert_to_sql' in get_commands()
|
|
|
|
|
|
def test_unknown_publisher_fails(pub):
|
|
with pytest.raises(CommandError) as excinfo:
|
|
call_command('convert_to_sql', '-d', 'unknown.net', '--database', 'foobar')
|
|
assert str(excinfo.value) == 'unknown tenant'
|
|
|
|
|
|
def test_failing_connection(pub):
|
|
with pytest.raises(psycopg2.OperationalError) as excinfo:
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar', '--port', '666')
|
|
assert 'could not connect' in str(excinfo.value)
|
|
|
|
|
|
def test_database_does_not_exist(pub):
|
|
new_database = 'test_%s' % random.randint(1000, 9999)
|
|
with pytest.raises(psycopg2.OperationalError) as excinfo:
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', new_database)
|
|
assert 'exist' in str(excinfo.value) # works for english + french postgresql
|
|
|
|
|
|
def test_already_migrated_fails():
|
|
create_temporary_pub(sql_mode=True)
|
|
with pytest.raises(CommandError) as excinfo:
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar')
|
|
assert str(excinfo.value) == 'tenant already using postgresql'
|
|
cleanup_connection()
|
|
force_connections_close()
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_setup_database(pub, database):
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
|
pub.set_config()
|
|
assert pub.cfg['postgresql'].get('database') == database
|
|
|
|
|
|
def test_migration(pub, database):
|
|
assert 'postgresql' not in pub.cfg
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
|
pub.set_config()
|
|
assert 'postgresql' in pub.cfg
|
|
|
|
|
|
def test_data_is_migrated(pub, database, local_user, formdeffix):
|
|
pub.load_site_options()
|
|
assert not pub.site_options.has_option('options', 'postgresql')
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
|
pub.load_site_options()
|
|
assert pub.site_options.has_option('options', 'postgresql')
|
|
pub.is_using_postgresql = lambda: True
|
|
pub.set_config()
|
|
formdefs = FormDef.select()
|
|
assert len(formdefs) == 1
|
|
data_class = formdefs[0].data_class(mode='sql')
|
|
assert len(data_class.keys()) == 4
|
|
|
|
|
|
def test_users_and_roles(pub, database, local_user):
|
|
role = Role(name='Test Role')
|
|
role.store()
|
|
|
|
pub.load_site_options()
|
|
assert not pub.site_options.has_option('options', 'postgresql')
|
|
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
|
pub.load_site_options()
|
|
assert pub.site_options.has_option('options', 'postgresql')
|
|
pub.is_using_postgresql = lambda: True
|
|
pub.set_config()
|
|
assert len(pub.user_class.get_users_with_name_identifier('0123456789')) == 1
|
|
assert pub.role_class.count() == 1
|