wcs/tests/test_hobo.py

450 lines
16 KiB
Python

# -*- coding: utf-8 -*-
import collections
import copy
import json
import os
import pickle
import pytest
import shutil
import sys
import tempfile
import mock
from utilities import create_temporary_pub, clean_temporary_pub
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from wcs.qommon import force_str
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.publisher import WcsPublisher
from wcs import fields
from wcs import sql
HOBO_JSON = {
'services': [
{
'title': 'Hobo',
'slug': 'hobo',
'service-id': 'hobo',
'base_url': 'http://hobo.example.net/',
'saml-sp-metadata-url': 'http://hobo.example.net/accounts/mellon/metadata/'
},
{
'service-id': 'authentic',
'saml-idp-metadata-url': 'http://authentic.example.net/idp/saml2/metadata',
'template_name': '',
'variables': {},
'title': 'Authentic',
'base_url': 'http://authentic.example.net/',
'id': 3,
'secret_key': '_b82$d)(xcw$dl@ieis@jhmrmbbeb4$=%lrpi*4p&b))*a*=5!',
'slug': 'authentic',
'secret_key': '12345',
},
{
'service-id': 'wcs',
'template_name': 'export-test.wcs',
'variables': {'xxx': 'HELLO WORLD'},
'title': 'Test wcs',
'saml-sp-metadata-url': 'http://wcs.example.net/saml/metadata',
'base_url': 'http://wcs.example.net/',
'backoffice-menu-url': 'http://wcs.example.net/backoffice/menu.json',
'id': 1,
'secret_key': 'eiue7aa10nt6e9*#jg2bsfvdgl)cr%4(tafibfjx9i$pgnfj#v',
'slug': 'test-wcs'
},
{
'service-id': 'combo',
'template_name': 'portal-agent',
'title': 'Portal Agents',
'base_url': 'http://agents.example.net/',
'secret_key': 'aaa',
},
{
'service-id': 'combo',
'template_name': 'portal-user',
'title': 'Portal',
'base_url': 'http://portal.example.net/',
'secret_key': 'bbb',
},
],
'profile': {
'fields': [
{
'kind': 'title',
'description': '',
'required': False,
'user_visible': True,
'label': u'Civilité',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'title'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': u'Prénom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'first_name'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': 'Nom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'last_name'
},
{
'kind': 'email',
'description': '',
'required': True,
'user_visible': True,
'label': u'Adresse électronique',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'email'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Addresse',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'address'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Code postal',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'zipcode'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Commune',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'city'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': u'Téléphone',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'phone'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Mobile',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'mobile'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Pays',
'disabled': True,
'user_editable': True,
'asked_on_registration': False,
'name': 'country'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Date de naissance',
'disabled': True,
'user_editable': True,
'asked_on_registration': False,
'name': 'birthdate'
}
]
},
'variables': {
'foobar': 'http://example.net',
'email_signature': 'Hello world.',
'default_from_email': 'noreply@example.net',
'theme': 'clapotis-les-canards',
},
'users': [
{
'username': 'admin',
'first_name': 'AdminFoo',
'last_name': 'AdminBar',
'password': 'pbkdf2_sha256$15000$aXR4knesTiJJ$hubahjFVa4q9C5RTqY5ajSOcrCPc+RZM+Usf1CGYLmA=',
'email': 'fpeters@entrouvert.com',
}
],
'timestamp': '1431420355.31'
}
def setup_module(module):
global pub, hobo_cmd, alt_tempdir
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
hobo_cmd = CmdCheckHobos()
hobo_cmd.all_services = HOBO_JSON
alt_tempdir = tempfile.mkdtemp()
def teardown_module(module):
clean_temporary_pub()
shutil.rmtree(alt_tempdir)
def test_configure_site_options():
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd.configure_site_options(service, pub)
pub.load_site_options()
assert pub.get_site_option('hobo_url', 'variables') == 'http://hobo.example.net/'
assert pub.get_site_option('foobar', 'variables') == 'http://example.net'
assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD'
assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/'
assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/'
assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/'
assert (pub.get_site_option('authentic.example.net', 'api-secrets')
== CmdCheckHobos.shared_secret(HOBO_JSON['services'][1]['secret_key'],
HOBO_JSON['services'][2]['secret_key']))
assert (pub.get_site_option('authentic.example.net', 'wscall-secrets')
== CmdCheckHobos.shared_secret(HOBO_JSON['services'][1]['secret_key'],
HOBO_JSON['services'][2]['secret_key']))
self_domain = urlparse.urlsplit(service.get('base_url')).netloc
assert pub.get_site_option(self_domain, 'wscall-secrets') != '0'
def test_update_configuration():
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['misc']['sitename'] == 'Test wcs'
assert pub.cfg['emails']['footer'] == 'Hello world.'
assert pub.cfg['emails']['from'] == 'noreply@example.net'
def test_update_themes():
pub.cfg['branding'] = {'theme': 'default'}
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['branding']['theme'] == 'default'
service['variables']['theme'] = 'foobar'
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['branding']['theme'] == 'default'
hobo_cmd.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['branding']['theme'] == 'publik-base'
assert os.readlink(os.path.join(pub.app_dir, 'static')) == \
os.path.join(hobo_cmd.THEMES_DIRECTORY, 'foobar/static')
assert os.readlink(os.path.join(pub.app_dir, 'templates')) == \
os.path.join(hobo_cmd.THEMES_DIRECTORY, 'foobar/templates')
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == \
os.path.join(hobo_cmd.THEMES_DIRECTORY, 'publik-base')
service['variables']['theme'] = 'foobar2'
hobo_cmd.update_configuration(service, pub)
assert not os.path.lexists(os.path.join(pub.app_dir, 'static'))
assert not os.path.lexists(os.path.join(pub.app_dir, 'templates'))
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == \
os.path.join(hobo_cmd.THEMES_DIRECTORY, 'foobar')
def test_update_profile():
profile = HOBO_JSON.get('profile')
# load in an empty site
hobo_cmd.update_profile(profile, pub)
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
field_labels = [force_str(x.get('label')) for x in profile.get('fields') if not x.get('disabled')]
field_ids = [x.get('name') for x in profile.get('fields') if not x.get('disabled')]
assert [x.label for x in formdef.fields] == field_labels
for field_id in [pub.cfg['users']['field_email']] + pub.cfg['users']['field_name']:
assert field_id in [x.id for x in formdef.fields]
assert formdef.fields[0].id == '_title'
assert formdef.fields[1].id == '_first_name'
assert formdef.fields[2].id == '_last_name'
# change a varname value
formdef.fields[0].varname = 'civilite'
formdef.store()
# reload config, check the varname is kept
hobo_cmd.update_profile(profile, pub)
formdef = UserFieldsFormDef(pub)
assert formdef.fields[0].id == '_title'
assert formdef.fields[0].varname == 'civilite'
# change first_name/last_name order
HOBO_JSON['profile']['fields'][1], HOBO_JSON['profile']['fields'][2] = (
HOBO_JSON['profile']['fields'][2], HOBO_JSON['profile']['fields'][1])
hobo_cmd.update_profile(profile, pub)
formdef = UserFieldsFormDef(pub)
assert formdef.fields[1].id == '_last_name'
assert formdef.fields[2].id == '_first_name'
# disable mobile
assert '_mobile' in [x.id for x in formdef.fields]
HOBO_JSON['profile']['fields'][8]['disabled'] = True
hobo_cmd.update_profile(profile, pub)
formdef = UserFieldsFormDef(pub)
assert not '_mobile' in [x.id for x in formdef.fields]
# add a custom local field
formdef = UserFieldsFormDef(pub)
formdef.fields.append(fields.BoolField(id='3', label='bool', type='bool'))
formdef.store()
hobo_cmd.update_profile(profile, pub)
formdef = UserFieldsFormDef(pub)
assert 'bool' in [x.label for x in formdef.fields]
# create a fake entry in idp to check attribute mapping
pub.cfg['idp'] = {'xxx': {}}
hobo_cmd.update_profile(profile, pub)
attribute_mapping = pub.cfg['idp']['xxx']['attribute-mapping']
for field in profile.get('fields'):
attribute_name = str(field['name'])
field_id = str('_' + attribute_name)
if field.get('disabled'):
assert not attribute_name in attribute_mapping
else:
assert attribute_mapping[attribute_name] == field_id
def test_configure_authentication_methods(http_requests):
pub.cfg['idp'] = {}
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
# with real metadata
hobo_cmd.configure_authentication_methods(service, pub)
assert len(pub.cfg['idp'].keys()) == 1
assert pub.cfg['saml_identities']['registration-url']
assert pub.cfg['sp']['idp-manage-user-attributes']
assert pub.cfg['sp']['idp-manage-roles']
assert pub.get_site_option('idp_account_url', 'variables').endswith('/accounts/')
def test_deploy():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
fd.close()
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options',
['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
sys.modules['publisher'] = sys.modules['wcs.publisher']
hobo_cmd.execute(base_options, sub_options,
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
# update
cleanup()
pub_cfg = pickle.load(open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck'), 'rb'))
assert pub_cfg['language'] == {'language': 'fr'}
del pub_cfg['language']
pickle.dump(pub_cfg,
open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck'), 'wb'))
hobo_cmd.execute(base_options, sub_options,
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
pub_cfg = pickle.load(open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck'), 'rb'))
assert pub_cfg['language'] == {'language': 'fr'}
def test_configure_postgresql():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
fd.close()
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd = CmdCheckHobos()
base_options = collections.namedtuple('Options', ['configfile'])(configfile=None)
sub_options_class = collections.namedtuple('Options',
['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
sys.modules['publisher'] = sys.modules['wcs.publisher']
hobo_cmd.execute(base_options, sub_options,
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
fd = open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w')
fd.write('[options]\n')
fd.write('postgresql = true\n')
fd.close()
cleanup()
pub = WcsPublisher.create_publisher(register_tld_names=False)
pub.app_dir = os.path.join(alt_tempdir, 'wcs.example.net')
pub.cfg['postgresql'] = {
'createdb-connection-params': {
'user': 'test',
'database': 'postgres'
},
'database-template-name': 'tests_wcs_%s',
'user': 'fred'
}
pub.write_cfg()
pub.set_config(skip_sql=True)
service['base_url'] = service['base_url'].strip('/')
pub.initialize_sql = mock.Mock()
with mock.patch('psycopg2.connect') as connect:
hobo_cmd.configure_sql(service, pub)
assert connect.call_args_list[0][1] == {'user': 'test', 'database': 'postgres'}
assert connect.call_args_list[1][1] == {'user': 'fred', 'database': 'tests_wcs_wcs_example_net'}
assert pub.initialize_sql.call_count == 1
pub.reload_cfg()
assert 'createdb-connection-params' in pub.cfg['postgresql']
with mock.patch('psycopg2.connect') as connect:
sql.get_connection(new=True)
assert connect.call_args_list[0][1] == {'user': 'fred', 'database': 'tests_wcs_wcs_example_net'}
pub.cfg['postgresql']['database-template-name'] = 'very_long_'*10 + '%s'
with mock.patch('psycopg2.connect') as connect:
hobo_cmd.configure_sql(service, pub)
assert connect.call_args_list[0][1] == {'user': 'test', 'database': 'postgres'}
assert connect.call_args_list[1][1] == {'user': 'fred',
'database': 'very_long_very_long_very_long_c426_ng_very_long_wcs_example_net'}
assert len(connect.call_args_list[1][1]['database']) == 63
assert pub.initialize_sql.call_count == 2