wcs/tests/test_hobo_sql.py

296 lines
11 KiB
Python

import collections
import copy
import json
import os
import random
import shutil
import tempfile
import zipfile
import psycopg2
import pytest
from quixote import cleanup
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.publisher import WcsPublisher
from wcs.sql import cleanup_connection
from .utilities import clean_temporary_pub, create_temporary_pub
CONFIG = {
"postgresql": {
"createdb-connection-params": {"database": "postgres", "user": os.environ['USER']},
"database-template-name": "%s",
"user": os.environ['USER'],
}
}
WCS_BASE_TENANT = 'wcsteststenant%d' % random.randint(0, 100000)
WCS_TENANT = '%s.net' % WCS_BASE_TENANT
WCS_DB_NAME = '%s_net' % WCS_BASE_TENANT
NEW_WCS_BASE_TENANT = 'wcsteststenant%d' % random.randint(0, 100000)
NEW_WCS_TENANT = '%s.net' % NEW_WCS_BASE_TENANT
NEW_WCS_DB_NAME = '%s_net' % NEW_WCS_BASE_TENANT
HOBO_JSON = {
'services': [
{
'title': 'Hobo',
'slug': 'hobo',
'service-id': 'hobo',
'base_url': 'http://hobo.example.net/',
'saml-sp-metadata-url': 'http://hobo.example.net/accounts/mellon/metadata/',
},
{
'service-id': 'authentic',
'saml-idp-metadata-url': 'http://authentic.example.net/idp/saml2/metadata',
'template_name': '',
'variables': {},
'title': 'Authentic',
'base_url': 'http://authentic.example.net/',
'id': 3,
'slug': 'authentic',
'secret_key': '12345',
},
{
'service-id': 'wcs',
'template_name': 'publik.zip',
'variables': {'xxx': 'HELLO WORLD'},
'title': 'Test wcs',
'saml-sp-metadata-url': 'http://%s/saml/metadata' % WCS_TENANT,
'base_url': 'http://%s/' % WCS_TENANT,
'backoffice-menu-url': 'http://%s/backoffice/menu.json' % WCS_TENANT,
'id': 1,
'secret_key': 'eiue7aa10nt6e9*#jg2bsfvdgl)cr%4(tafibfjx9i$pgnfj#v',
'slug': 'test-wcs',
},
{
'service-id': 'combo',
'template_name': 'portal-agent',
'title': 'Portal Agents',
'base_url': 'http://agents.example.net/',
'secret_key': 'aaa',
},
{
'service-id': 'combo',
'template_name': 'portal-user',
'title': 'Portal',
'base_url': 'http://portal.example.net/',
'secret_key': 'bbb',
},
],
'timestamp': '1431420355.31',
}
@pytest.fixture
def setuptest():
cleanup_connection()
createdb_cfg = CONFIG['postgresql'].get('createdb-connection-params')
conn = psycopg2.connect(**createdb_cfg)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
hobo_cmd = CmdCheckHobos()
hobo_cmd.all_services = HOBO_JSON
WcsPublisher.APP_DIR = tempfile.mkdtemp()
skeleton_dir = os.path.join(WcsPublisher.APP_DIR, 'skeletons')
os.mkdir(skeleton_dir)
with open(os.path.join(skeleton_dir, 'publik.zip'), 'wb') as f:
with zipfile.ZipFile(f, 'w') as z:
z.writestr('config.json', json.dumps(CONFIG))
z.writestr('site-options.cfg', '[options]\npostgresql = true')
yield pub, hobo_cmd
clean_temporary_pub()
if os.path.exists(WcsPublisher.APP_DIR):
shutil.rmtree(WcsPublisher.APP_DIR)
cleanup_connection()
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
conn.close()
def database_exists(database):
res = False
cleanup_connection()
createdb_cfg = CONFIG['postgresql'].get('createdb-connection-params')
conn = psycopg2.connect(**createdb_cfg)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute("SELECT 1 AS result FROM pg_database WHERE datname='%s'" % database)
if cursor.fetchall():
res = True
conn.close()
return res
def test_deploy(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# deploy a new tenant
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['saml-sp-metadata-url'] = ('http://%s/saml/metadata' % WCS_TENANT,)
wcs_service['base_url'] = 'http://%s/' % NEW_WCS_TENANT
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert database_exists(NEW_WCS_DB_NAME)
def test_deploy_url_change(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# domain change request
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['legacy_urls'] = [
{
'saml-sp-metadata-url': wcs_service['saml-sp-metadata-url'],
'base_url': wcs_service['base_url'],
'backoffice-menu-url': wcs_service['backoffice-menu-url'],
}
]
wcs_service['saml-sp-metadata-url'] = ('http://%s/saml/metadata' % WCS_TENANT,)
wcs_service['base_url'] = 'http://%s/' % NEW_WCS_TENANT
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
publisher = WcsPublisher.create_publisher()
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
# check that sp configuration is updated
assert publisher.cfg['sp']['saml2_providerid'] == 'http://%s/saml/metadata' % NEW_WCS_TENANT
def test_deploy_url_change_old_tenant_dir(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# move tenant to APP_DIR (legacy way)
os.replace(
os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT),
os.path.join(WcsPublisher.APP_DIR, WCS_TENANT),
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
# domain change request
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['legacy_urls'] = [
{
'saml-sp-metadata-url': wcs_service['saml-sp-metadata-url'],
'base_url': wcs_service['base_url'],
'backoffice-menu-url': wcs_service['backoffice-menu-url'],
}
]
wcs_service['saml-sp-metadata-url'] = ('http://%s/saml/metadata' % WCS_TENANT,)
wcs_service['base_url'] = 'http://%s/' % NEW_WCS_TENANT
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
publisher = WcsPublisher.create_publisher()
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
# check that sp configuration is updated
assert publisher.cfg['sp']['saml2_providerid'] == 'http://%s/saml/metadata' % NEW_WCS_TENANT