422 lines
14 KiB
Python
422 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
import collections
|
|
import copy
|
|
import cPickle
|
|
import json
|
|
import os
|
|
import pytest
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import urllib2
|
|
|
|
import mock
|
|
|
|
from utilities import create_temporary_pub, clean_temporary_pub
|
|
|
|
from quixote import cleanup
|
|
|
|
from wcs.ctl.check_hobos import CmdCheckHobos
|
|
from wcs.publisher import WcsPublisher
|
|
from wcs import fields
|
|
from wcs import sql
|
|
|
|
HOBO_JSON = {
|
|
'services': [
|
|
{
|
|
'title': 'Hobo',
|
|
'slug': 'hobo',
|
|
'service-id': 'hobo',
|
|
'base_url': 'http://hobo.example.net/',
|
|
'saml-sp-metadata-url': 'http://hobo.example.net/accounts/mellon/metadata/'
|
|
},
|
|
{
|
|
'service-id': 'authentic',
|
|
'saml-idp-metadata-url': 'http://authentic.example.net/idp/saml2/metadata',
|
|
'template_name': '',
|
|
'variables': {},
|
|
'title': 'Authentic',
|
|
'base_url': 'http://authentic.example.net/',
|
|
'id': 3,
|
|
'secret_key': '_b82$d)(xcw$dl@ieis@jhmrmbbeb4$=%lrpi*4p&b))*a*=5!',
|
|
'slug': 'authentic',
|
|
'secret_key': '12345',
|
|
},
|
|
{
|
|
'service-id': 'wcs',
|
|
'template_name': 'export-test.wcs',
|
|
'variables': {'xxx': 'HELLO WORLD'},
|
|
'title': 'Test wcs',
|
|
'saml-sp-metadata-url': 'http://wcs.example.net/saml/metadata',
|
|
'base_url': 'http://wcs.example.net/',
|
|
'backoffice-menu-url': 'http://wcs.example.net/backoffice/menu.json',
|
|
'id': 1,
|
|
'secret_key': 'eiue7aa10nt6e9*#jg2bsfvdgl)cr%4(tafibfjx9i$pgnfj#v',
|
|
'slug': 'test-wcs'
|
|
},
|
|
{
|
|
'service-id': 'combo',
|
|
'template_name': 'portal-agent',
|
|
'title': 'Portal Agents',
|
|
'base_url': 'http://agents.example.net/',
|
|
'secret_key': 'aaa',
|
|
},
|
|
{
|
|
'service-id': 'combo',
|
|
'template_name': 'portal-user',
|
|
'title': 'Portal',
|
|
'base_url': 'http://portal.example.net/',
|
|
'secret_key': 'bbb',
|
|
},
|
|
],
|
|
'profile': {
|
|
'fields': [
|
|
{
|
|
'kind': 'title',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': u'Civilité',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'title'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': u'Prénom',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': True,
|
|
'name': 'first_name'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': 'Nom',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': True,
|
|
'name': 'last_name'
|
|
},
|
|
{
|
|
'kind': 'email',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': u'Adresse électronique',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'email'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Addresse',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'address'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Code postal',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'zipcode'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Commune',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'city'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': u'Téléphone',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'phone'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Mobile',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'mobile'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Pays',
|
|
'disabled': True,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'country'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': False,
|
|
'user_visible': True,
|
|
'label': 'Date de naissance',
|
|
'disabled': True,
|
|
'user_editable': True,
|
|
'asked_on_registration': False,
|
|
'name': 'birthdate'
|
|
}
|
|
]
|
|
},
|
|
'variables': {
|
|
'foobar': 'http://example.net',
|
|
'email_signature': 'Hello world.',
|
|
'default_from_email': 'noreply@example.net',
|
|
'theme': 'clapotis-les-canards',
|
|
},
|
|
'users': [
|
|
{
|
|
'username': 'admin',
|
|
'first_name': 'AdminFoo',
|
|
'last_name': 'AdminBar',
|
|
'password': 'pbkdf2_sha256$15000$aXR4knesTiJJ$hubahjFVa4q9C5RTqY5ajSOcrCPc+RZM+Usf1CGYLmA=',
|
|
'email': 'fpeters@entrouvert.com',
|
|
}
|
|
],
|
|
'timestamp': '1431420355.31'
|
|
}
|
|
|
|
def setup_module(module):
|
|
global pub, hobo_cmd, alt_tempdir
|
|
pub = create_temporary_pub()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
hobo_cmd = CmdCheckHobos()
|
|
hobo_cmd.all_services = HOBO_JSON
|
|
alt_tempdir = tempfile.mkdtemp()
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
shutil.rmtree(alt_tempdir)
|
|
|
|
def test_configure_site_options():
|
|
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
|
hobo_cmd.configure_site_options(service, pub)
|
|
pub.load_site_options()
|
|
assert pub.get_site_option('hobo_url', 'variables') == 'http://hobo.example.net/'
|
|
assert pub.get_site_option('foobar', 'variables') == 'http://example.net'
|
|
assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD'
|
|
assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/'
|
|
assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/'
|
|
assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/'
|
|
assert (pub.get_site_option('authentic.example.net', 'api-secrets')
|
|
== CmdCheckHobos.shared_secret(HOBO_JSON['services'][1]['secret_key'],
|
|
HOBO_JSON['services'][2]['secret_key']))
|
|
assert (pub.get_site_option('authentic.example.net', 'wscall-secrets')
|
|
== CmdCheckHobos.shared_secret(HOBO_JSON['services'][1]['secret_key'],
|
|
HOBO_JSON['services'][2]['secret_key']))
|
|
self_domain = urllib2.urlparse.urlsplit(service.get('base_url')).netloc
|
|
assert pub.get_site_option(self_domain, 'wscall-secrets') != '0'
|
|
|
|
def test_update_configuration():
|
|
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
|
hobo_cmd.update_configuration(service, pub)
|
|
assert pub.cfg['misc']['sitename'] == 'Test wcs'
|
|
assert pub.cfg['emails']['footer'] == 'Hello world.'
|
|
assert pub.cfg['emails']['from'] == 'noreply@example.net'
|
|
assert pub.cfg['branding']['theme'] == 'publik-base'
|
|
|
|
def test_update_profile():
|
|
profile = HOBO_JSON.get('profile')
|
|
|
|
# load in an empty site
|
|
hobo_cmd.update_profile(profile, pub)
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef(pub)
|
|
field_labels = [x.get('label').encode('utf-8') for x in profile.get('fields') if not x.get('disabled')]
|
|
field_ids = [x.get('name') for x in profile.get('fields') if not x.get('disabled')]
|
|
assert [x.label for x in formdef.fields] == field_labels
|
|
for field_id in [pub.cfg['users']['field_email']] + pub.cfg['users']['field_name']:
|
|
assert field_id in [x.id for x in formdef.fields]
|
|
|
|
assert formdef.fields[0].id == '_title'
|
|
assert formdef.fields[1].id == '_first_name'
|
|
assert formdef.fields[2].id == '_last_name'
|
|
|
|
# change a varname value
|
|
formdef.fields[0].varname = 'civilite'
|
|
formdef.store()
|
|
|
|
# reload config, check the varname is kept
|
|
hobo_cmd.update_profile(profile, pub)
|
|
formdef = UserFieldsFormDef(pub)
|
|
assert formdef.fields[0].id == '_title'
|
|
assert formdef.fields[0].varname == 'civilite'
|
|
|
|
# change first_name/last_name order
|
|
HOBO_JSON['profile']['fields'][1], HOBO_JSON['profile']['fields'][2] = (
|
|
HOBO_JSON['profile']['fields'][2], HOBO_JSON['profile']['fields'][1])
|
|
hobo_cmd.update_profile(profile, pub)
|
|
formdef = UserFieldsFormDef(pub)
|
|
assert formdef.fields[1].id == '_last_name'
|
|
assert formdef.fields[2].id == '_first_name'
|
|
|
|
# disable mobile
|
|
assert '_mobile' in [x.id for x in formdef.fields]
|
|
HOBO_JSON['profile']['fields'][8]['disabled'] = True
|
|
hobo_cmd.update_profile(profile, pub)
|
|
formdef = UserFieldsFormDef(pub)
|
|
assert not '_mobile' in [x.id for x in formdef.fields]
|
|
|
|
# add a custom local field
|
|
formdef = UserFieldsFormDef(pub)
|
|
formdef.fields.append(fields.BoolField(id='3', label='bool', type='bool'))
|
|
formdef.store()
|
|
hobo_cmd.update_profile(profile, pub)
|
|
formdef = UserFieldsFormDef(pub)
|
|
assert 'bool' in [x.label for x in formdef.fields]
|
|
|
|
# create a fake entry in idp to check attribute mapping
|
|
pub.cfg['idp'] = {'xxx': {}}
|
|
hobo_cmd.update_profile(profile, pub)
|
|
|
|
attribute_mapping = pub.cfg['idp']['xxx']['attribute-mapping']
|
|
for field in profile.get('fields'):
|
|
attribute_name = str(field['name'])
|
|
field_id = str('_' + attribute_name)
|
|
if field.get('disabled'):
|
|
assert not attribute_name in attribute_mapping
|
|
else:
|
|
assert attribute_mapping[attribute_name] == field_id
|
|
|
|
def test_configure_authentication_methods():
|
|
pub.cfg['idp'] = {}
|
|
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
|
|
|
# with an error retrieving metadata
|
|
hobo_cmd.configure_authentication_methods(service, pub)
|
|
|
|
# with real metadata
|
|
with mock.patch('urllib2.urlopen') as urlopen:
|
|
idp_metadata_filename = os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')
|
|
urlopen.side_effect = lambda *args: open(idp_metadata_filename)
|
|
hobo_cmd.configure_authentication_methods(service, pub)
|
|
|
|
assert len(pub.cfg['idp'].keys()) == 1
|
|
assert pub.cfg['saml_identities']['registration-url']
|
|
assert pub.cfg['sp']['idp-manage-user-attributes']
|
|
assert pub.cfg['sp']['idp-manage-roles']
|
|
assert pub.get_site_option('idp_account_url', 'variables').endswith('/accounts/')
|
|
|
|
def test_deploy():
|
|
cleanup()
|
|
WcsPublisher.APP_DIR = alt_tempdir
|
|
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
|
|
hobo_json = copy.deepcopy(HOBO_JSON)
|
|
del hobo_json['services'][1] # authentic
|
|
fd.write(json.dumps(HOBO_JSON))
|
|
fd.close()
|
|
hobo_cmd = CmdCheckHobos()
|
|
base_options = {}
|
|
sub_options_class = collections.namedtuple('Options',
|
|
['ignore_timestamp', 'redeploy', 'extra'])
|
|
sub_options = sub_options_class(True, False, None)
|
|
sys.modules['publisher'] = sys.modules['wcs.publisher']
|
|
hobo_cmd.execute(base_options, sub_options,
|
|
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
|
|
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
|
|
|
|
# update
|
|
cleanup()
|
|
pub_cfg = cPickle.load(open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck')))
|
|
assert pub_cfg['language'] == {'language': 'fr'}
|
|
del pub_cfg['language']
|
|
cPickle.dump(pub_cfg,
|
|
open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck'), 'w'))
|
|
hobo_cmd.execute(base_options, sub_options,
|
|
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
|
|
pub_cfg = cPickle.load(open(os.path.join(alt_tempdir, 'wcs.example.net', 'config.pck')))
|
|
assert pub_cfg['language'] == {'language': 'fr'}
|
|
|
|
def test_configure_postgresql():
|
|
cleanup()
|
|
WcsPublisher.APP_DIR = alt_tempdir
|
|
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
|
|
hobo_json = copy.deepcopy(HOBO_JSON)
|
|
del hobo_json['services'][1] # authentic
|
|
fd.write(json.dumps(HOBO_JSON))
|
|
fd.close()
|
|
|
|
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
|
|
|
hobo_cmd = CmdCheckHobos()
|
|
base_options = collections.namedtuple('Options', ['configfile'])(configfile=None)
|
|
sub_options_class = collections.namedtuple('Options',
|
|
['ignore_timestamp', 'redeploy', 'extra'])
|
|
sub_options = sub_options_class(True, False, None)
|
|
sys.modules['publisher'] = sys.modules['wcs.publisher']
|
|
hobo_cmd.execute(base_options, sub_options,
|
|
['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')])
|
|
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
|
|
|
|
fd = file(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w')
|
|
fd.write('[options]\n')
|
|
fd.write('postgresql = true\n')
|
|
fd.close()
|
|
|
|
cleanup()
|
|
|
|
pub = WcsPublisher.create_publisher(register_cron=False, register_tld_names=False)
|
|
pub.app_dir = os.path.join(alt_tempdir, 'wcs.example.net')
|
|
pub.cfg['postgresql'] = {
|
|
'createdb-connection-params': {
|
|
'user': 'test',
|
|
'database': 'postgres'
|
|
},
|
|
'database-template-name': 'tests_wcs_%s',
|
|
'user': 'fred'
|
|
}
|
|
pub.write_cfg()
|
|
pub.set_config(skip_sql=True)
|
|
service['base_url'] = service['base_url'].strip('/')
|
|
|
|
with mock.patch('psycopg2.connect') as connect:
|
|
with mock.patch('subprocess.call') as call:
|
|
hobo_cmd.configure_sql(service, pub)
|
|
assert connect.call_args_list[0][1] == {'user': 'test', 'database': 'postgres'}
|
|
assert connect.call_args_list[1][1] == {'user': 'fred', 'database': 'tests_wcs_wcs_example_net'}
|
|
assert call.call_args[0][0][1:] == ['convert-to-sql',
|
|
'--dbname', 'tests_wcs_wcs_example_net',
|
|
'--user', 'fred', 'wcs.example.net']
|
|
|
|
pub.reload_cfg()
|
|
assert 'createdb-connection-params' in pub.cfg['postgresql']
|
|
with mock.patch('psycopg2.connect') as connect:
|
|
sql.get_connection(new=True)
|
|
assert connect.call_args_list[0][1] == {'user': 'fred', 'database': 'tests_wcs_wcs_example_net'}
|