1360 lines
59 KiB
Plaintext
1360 lines
59 KiB
Plaintext
import base64
|
|
import cPickle
|
|
import copy
|
|
import re
|
|
import os
|
|
import glob
|
|
import urllib
|
|
import urllib2
|
|
import urlparse
|
|
import tempfile
|
|
import zipfile
|
|
import qommon.x509utils as x509utils
|
|
import qommon.libertyutils as libertyutils
|
|
import qommon.saml2utils as saml2utils
|
|
import qommon.storage as storage
|
|
|
|
import lasso
|
|
|
|
from quixote import get_request, get_response, redirect, get_publisher, get_session
|
|
from quixote.directory import Directory
|
|
from quixote.html import htmltext
|
|
from quixote.util import dump_request
|
|
from quixote.errors import TraversalError
|
|
|
|
from qommon import get_cfg
|
|
|
|
from menu import *
|
|
|
|
from qommon import template
|
|
import qommon.template
|
|
|
|
from qommon.admin.emails import EmailsDirectory
|
|
from qommon.admin.texts import TextsDirectory
|
|
try:
|
|
from qommon.admin.translations import TranslationsDirectory
|
|
except ImportError:
|
|
TranslationsDirectory = None
|
|
|
|
from authentic.form import *
|
|
import authentic
|
|
from authentic import identities
|
|
from authentic import misc
|
|
import configuration
|
|
import authentic.schemas.schemagui as schemagui
|
|
|
|
template_error_page = error_page
|
|
|
|
def get_file_content(filename):
|
|
try:
|
|
return open(filename,'r').read()
|
|
except:
|
|
return None
|
|
|
|
def get_text_file_preview(filename):
|
|
'''Return a preformatted HTML blocks displaying content
|
|
of filename, or None if filename is not accessible
|
|
'''
|
|
content = get_file_content(str(filename))
|
|
if content:
|
|
return htmltext("<pre>%s</pre>") % content
|
|
else:
|
|
return None
|
|
|
|
def error_page(error):
|
|
return template_error_page('settings', error)
|
|
|
|
def cfg_submit(form, cfg_key, fields):
|
|
get_publisher().reload_cfg()
|
|
cfg_key = str(cfg_key)
|
|
cfg_dict = get_cfg(cfg_key, {})
|
|
for k in fields:
|
|
cfg_dict[str(k)] = form.get_widget(k).parse()
|
|
get_publisher().cfg[cfg_key] = cfg_dict
|
|
get_publisher().write_cfg()
|
|
|
|
|
|
class LibertyProvidersDir(Directory):
|
|
_q_exports = ['', 'new', 'new_remote']
|
|
|
|
def _q_traverse(self, path):
|
|
get_response().breadcrumb.append(('liberty_providers/', _('Liberty Providers')))
|
|
return Directory._q_traverse(self, path)
|
|
|
|
def _q_index [html] (self):
|
|
get_publisher().reload_cfg()
|
|
html_top('settings', title = _('Liberty Providers'))
|
|
"<h2>%s</h2>" % _('Liberty Providers')
|
|
'<ul id="nav-admin">\n'
|
|
' <li><a href="new">%s</a></li>\n' % _('New')
|
|
' <li><a href="new_remote">%s</a></li>\n' % _('Create new from remote URL')
|
|
'</ul>'
|
|
|
|
'<ul class="biglist">'
|
|
for klp, lp in get_cfg('providers', {}).items():
|
|
try:
|
|
p, label = misc.get_provider_and_label(klp)
|
|
except KeyError:
|
|
'<li>'
|
|
'<strong>%s (%s)</strong>' % (_('Broken'), klp)
|
|
'<p class="commands">'
|
|
klp = urllib.quote(str(klp))
|
|
command_icon('%s/delete' % klp, 'remove')
|
|
'</p>'
|
|
'</li>'
|
|
continue
|
|
|
|
klp = urllib.quote(str(klp))
|
|
'<li>'
|
|
'<strong>%s</strong>' % htmltext(label)
|
|
'<p class="details"><span class="data">%s</span></p>' % p.providerId # XXX: tell sp/idp
|
|
'<p class="commands">'
|
|
command_icon('%s/' % klp, 'view')
|
|
command_icon('%s/edit' % klp, 'edit')
|
|
command_icon('%s/delete' % klp, 'remove')
|
|
'</p></li>'
|
|
'</ul>'
|
|
|
|
def _q_lookup(self, component):
|
|
get_publisher().reload_cfg()
|
|
return LibertyProviderUI(component)
|
|
|
|
def new_form(self):
|
|
form = Form(enctype='multipart/form-data')
|
|
if get_cfg('idp', {}).get('idff_proxy'):
|
|
form.add(SingleSelectWidget, 'role', title = _('Role'), required=True,
|
|
value = lasso.PROVIDER_ROLE_SP,
|
|
options = [ (lasso.PROVIDER_ROLE_SP, _('Service Provider')),
|
|
(lasso.PROVIDER_ROLE_IDP, _('Identity Provider')),
|
|
(lasso.PROVIDER_ROLE_NONE, _('Both')) ])
|
|
form.add(FileWidget, 'metadata', title = _('Metadata'), required=True)
|
|
form.add(FileWidget, 'publickey', title = _('Public Key'), required=False)
|
|
form.add(FileWidget, 'cacertchain', title = _('CA Certificate Chain'), required=False)
|
|
form.add(CheckboxWidget, 'idp_initiated_sso',
|
|
title = _('Allow IdP initiated Single Sign On'), value = True)
|
|
|
|
form.add(SingleSelectWidget, 'default_name_id_format',
|
|
title = _('Default NameID Format'),
|
|
required=False,
|
|
value = 'persistent',
|
|
options = [ ('persistent', _('Persistent')),
|
|
('transient', _('Transient')),
|
|
('email', _('Email')) ])
|
|
|
|
form.add(CheckboxWidget, 'encrypt_nameid',
|
|
title = _('Encrypt NameID'),
|
|
value = False)
|
|
|
|
form.add(CheckboxWidget, 'encrypt_assertion',
|
|
title = _('Encrypt Assertion'),
|
|
value = False,
|
|
hint = _('Only used for SAML 2.0.'))
|
|
|
|
form.add(StringWidget, 'exported_attributes',
|
|
title = _('Exported attributes'),
|
|
value = '',
|
|
hint = _('Space separated list of words'))
|
|
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
return form
|
|
|
|
def new [html] (self):
|
|
form = self.new_form()
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('.')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
lpk, error = self.submit_new(form)
|
|
if error:
|
|
return error
|
|
return redirect('.')
|
|
|
|
get_response().breadcrumb.append(('new', _('New')))
|
|
html_top('settings', title = _('New Liberty Provider'))
|
|
'<h2>%s</h2>' % _('New Liberty Provider')
|
|
form.render()
|
|
|
|
def submit_new(self, form, key_provider_id = None):
|
|
get_publisher().reload_cfg()
|
|
providers_cfg = get_cfg('providers', {})
|
|
get_publisher().cfg['providers'] = providers_cfg
|
|
|
|
metadata, publickey, cacertchain = None, None, None
|
|
if form.get_widget('role'):
|
|
role = form.get_widget('role').parse()
|
|
else:
|
|
role = lasso.PROVIDER_ROLE_SP
|
|
if form.get_widget('metadata').parse():
|
|
metadata = form.get_widget('metadata').parse().fp.read()
|
|
if form.get_widget('publickey').parse():
|
|
publickey = form.get_widget('publickey').parse().fp.read()
|
|
if form.get_widget('cacertchain').parse():
|
|
cacertchain = form.get_widget('cacertchain').parse().fp.read()
|
|
|
|
old_label = None
|
|
if key_provider_id:
|
|
try:
|
|
old_label = misc.get_provider_and_label(key_provider_id)[1]
|
|
except KeyError:
|
|
pass
|
|
|
|
old_metadata_fn = None
|
|
old_publickey_fn = None
|
|
old_cacertchain_fn = None
|
|
old_dict = {}
|
|
provider_id = ''
|
|
if metadata:
|
|
try:
|
|
provider_id = re.findall(r'(provider|entity)ID="(.*?)"', metadata)[0][1]
|
|
except IndexError:
|
|
return (None, error_page(_('Bad metadata')))
|
|
|
|
new_key_provider_id = misc.get_provider_key(provider_id)
|
|
|
|
if key_provider_id and new_key_provider_id != key_provider_id:
|
|
# provider id changed, remove old files
|
|
providers_cfg[new_key_provider_id] = providers_cfg[key_provider_id]
|
|
old_metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
old_publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
old_cacertchain_fn = 'provider-%s-cacertchain.pem' % key_provider_id
|
|
old_dict = providers_cfg[key_provider_id]
|
|
del providers_cfg[key_provider_id]
|
|
|
|
key_provider_id = new_key_provider_id
|
|
|
|
dir = get_publisher().app_dir
|
|
metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
cacertchain_fn = 'provider-%s-cacertchain.pem' % key_provider_id
|
|
|
|
if old_publickey_fn and os.path.exists(old_publickey_fn):
|
|
os.rename(get_publisher().get_abs_path(old_publickey_fn),
|
|
get_publisher().get_abs_path(publickey_fn))
|
|
if old_cacertchain_fn and os.path.exists(old_cacertchain_fn):
|
|
os.rename(get_publisher().get_abs_path(old_cacertchain_fn),
|
|
get_publisher().get_abs_path(cacertchain_fn))
|
|
|
|
if not providers_cfg.has_key(key_provider_id):
|
|
providers_cfg[key_provider_id] = old_dict
|
|
|
|
if provider_id == '':
|
|
provider_id = providers_cfg[key_provider_id].get('provider_id', '')
|
|
providers_cfg[key_provider_id]['provider_id'] = provider_id
|
|
providers_cfg[key_provider_id]['role'] = role
|
|
providers_cfg[key_provider_id]['metadata'] = metadata_fn
|
|
|
|
for k in ('idp_initiated_sso', 'default_name_id_format',
|
|
'encrypt_nameid', 'encrypt_assertion', 'exported_attributes'):
|
|
widget = form.get_widget(k)
|
|
if widget:
|
|
get_publisher().cfg['providers'][key_provider_id][k] = widget.parse()
|
|
|
|
if form.get_widget('label'):
|
|
label = form.get_widget('label').parse()
|
|
if label != old_label:
|
|
get_publisher().cfg['providers'][key_provider_id]['label'] = label
|
|
|
|
if metadata:
|
|
file(misc.get_abs_path(metadata_fn), 'w').write(metadata)
|
|
if publickey:
|
|
file(misc.get_abs_path(publickey_fn), 'w').write(publickey)
|
|
get_publisher().cfg['providers'][key_provider_id]['publickey'] = publickey_fn
|
|
if cacertchain:
|
|
file(misc.get_abs_path(cacertchain_fn), 'w').write(cacertchain)
|
|
get_publisher().cfg['providers'][key_provider_id]['cacertchain'] = cacertchain_fn
|
|
|
|
lp = get_publisher().cfg['providers'][key_provider_id]
|
|
publickey_fn = None
|
|
cacertchain_fn = None
|
|
if lp.has_key('publickey') and os.path.exists(misc.get_abs_path(lp['publickey'])):
|
|
publickey_fn = misc.get_abs_path(lp['publickey'])
|
|
if lp.has_key('cacertchain') and os.path.exists(misc.get_abs_path(lp['cacertchain'])):
|
|
cacertchain_fn = misc.get_abs_path(lp['cacertchain'])
|
|
|
|
try:
|
|
p = lasso.Provider(lp['role'], misc.get_abs_path(lp['metadata']),
|
|
publickey_fn, cacertchain_fn)
|
|
except lasso.Error:
|
|
# this happens when the public key is missing from both params
|
|
# and metadata file
|
|
if publickey_fn:
|
|
return (None, error_page(_('Bad metadata')))
|
|
else:
|
|
return (None, error_page(_('Bad metadata or missing public key')))
|
|
|
|
try:
|
|
p, l = misc.get_provider_and_label(key_provider_id)
|
|
except (TypeError, KeyError):
|
|
del get_publisher().cfg['providers'][key_provider_id]
|
|
if metadata:
|
|
os.unlink(misc.get_abs_path(metadata_fn))
|
|
if publickey:
|
|
os.unlink(misc.get_abs_path(publickey_fn))
|
|
if cacertchain:
|
|
os.unlink(misc.get_abs_path(cacertchain_fn))
|
|
return (None, error_page(_('Bad metadata')))
|
|
|
|
get_publisher().write_cfg()
|
|
|
|
return (key_provider_id, None)
|
|
|
|
def new_remote [html] (self):
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(StringWidget, 'metadata_url', title = _('URL to metadata'), required=True,
|
|
size = 60)
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('.')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
metadata_pathname = None
|
|
publickey_pathname = None
|
|
metadata_url = form.get_widget('metadata_url').parse()
|
|
publickey_url = None
|
|
try:
|
|
rfd = urllib2.urlopen(metadata_url)
|
|
except urllib2.HTTPError, e:
|
|
if e.code == 404:
|
|
form.set_error('metadata_url', _('Resource not found'))
|
|
else:
|
|
form.set_error('metadata_url', _('HTTP error on retrieval: %s') % e.code)
|
|
except:
|
|
form.set_error('metadata_url', _('Failed to retrieve file'))
|
|
else:
|
|
s = rfd.read()
|
|
(bfd, metadata_pathname) = tempfile.mkstemp(str('.metadata'))
|
|
file(metadata_pathname, str('w')).write(s)
|
|
try:
|
|
p = lasso.Provider(lasso.PROVIDER_ROLE_SP, metadata_pathname, None, None)
|
|
except lasso.Error:
|
|
pass
|
|
else:
|
|
t = self.submit_new_remote(metadata_pathname, publickey_pathname,
|
|
metadata_url, publickey_url)
|
|
if t:
|
|
return t
|
|
form.get_widget('metadata_url').set_error(_('Bad metadata'))
|
|
|
|
publickey_url = get_request().form.get('publickey_url')
|
|
if publickey_url:
|
|
form.add(StringWidget, 'publickey_url', title = _('URL to public key'),
|
|
required=True, size = 60)
|
|
try:
|
|
rfd = urllib2.urlopen(publickey_url)
|
|
except urllib2.HTTPError, e:
|
|
if e.code == 404:
|
|
form.set_error('publickey_url', _('Resource not found'))
|
|
else:
|
|
form.set_error('publickey_url', _('HTTP error on retrieval: %s') % e.code)
|
|
except:
|
|
form.set_error('publickey_url', _('Failed to retrieve file'))
|
|
else:
|
|
s = rfd.read()
|
|
(bfd, publickey_pathname) = tempfile.mkstemp(str('.publickey'))
|
|
file(publickey_pathname, str('w')).write(s)
|
|
|
|
try:
|
|
p = lasso.Provider(lasso.PROVIDER_ROLE_SP, metadata_pathname,
|
|
publickey_pathname, None)
|
|
except lasso.Error:
|
|
form.get_widget('metadata_url').set_error(
|
|
_('Error in this metadata file'))
|
|
else:
|
|
t = self.submit_new_remote(metadata_pathname, publickey_pathname,
|
|
metadata_url, publickey_url)
|
|
if t:
|
|
return t
|
|
form.get_widget('metadata_url').set_error(_('Bad metadata'))
|
|
else:
|
|
# this will be the first time with a public key field; but
|
|
# perhaps it is not necessary, this is just the metadata
|
|
# being broken; test them with our our public key file
|
|
|
|
pubkey = misc.get_abs_path(get_cfg('idp')['publickey'])
|
|
try:
|
|
p = lasso.Provider(lasso.PROVIDER_ROLE_SP, metadata_pathname,
|
|
pubkey, None)
|
|
except lasso.Error:
|
|
# this was an error in the metadata file itself
|
|
form.get_widget('metadata_url').set_error(
|
|
_('File looks like a bad metadata file'))
|
|
else:
|
|
# ok when provided with a public key -> adding it for real
|
|
form.add(StringWidget, 'publickey_url', title = _('URL to public key'),
|
|
required=True, size = 60,
|
|
hint = _('The metadata file does not embed a public key, please provide it here.'))
|
|
|
|
|
|
if publickey_pathname and os.path.exists(publickey_pathname):
|
|
os.unlink(publickey_pathname)
|
|
if metadata_pathname and os.path.exists(metadata_pathname):
|
|
os.unlink(metadata_pathname)
|
|
|
|
get_response().breadcrumb.append(('new_remote', _('New')))
|
|
html_top('settings', title = _('New Liberty Provider'))
|
|
'<h2>%s</h2>' % _('New Liberty Provider')
|
|
form.render()
|
|
|
|
def submit_new_remote(self, metadata_pathname, publickey_pathname,
|
|
metadata_url, publickey_url, key_provider_id = None):
|
|
role = lasso.PROVIDER_ROLE_SP
|
|
|
|
get_publisher().reload_cfg()
|
|
d = get_cfg('providers', {})
|
|
get_publisher().cfg['providers'] = d
|
|
|
|
metadata = open(metadata_pathname).read()
|
|
if publickey_pathname:
|
|
publickey = open(publickey_pathname).read()
|
|
else:
|
|
publickey = None
|
|
|
|
if publickey_pathname and os.path.exists(publickey_pathname):
|
|
os.unlink(publickey_pathname)
|
|
if metadata_pathname and os.path.exists(metadata_pathname):
|
|
os.unlink(metadata_pathname)
|
|
|
|
try:
|
|
provider_id = re.findall(r'(provider|entity)ID="(.*?)"', metadata)[0][1]
|
|
except IndexError:
|
|
return None
|
|
|
|
new_key_provider_id = misc.get_provider_key(provider_id)
|
|
old_metadata_fn = None
|
|
old_publickey_fn = None
|
|
if key_provider_id and new_key_provider_id != key_provider_id:
|
|
# provider id changed, remove old files
|
|
get_publisher().cfg['providers'][new_key_provider_id] = get_publisher().cfg['providers'][key_provider_id]
|
|
old_metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
old_publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
old_cacertchain_fn = 'provider-%s-cacertchain.pem' % key_provider_id
|
|
del get_publisher().cfg['providers'][key_provider_id]
|
|
|
|
key_provider_id = new_key_provider_id
|
|
|
|
dir = get_publisher().app_dir
|
|
metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
if old_metadata_fn and os.path.exists(misc.get_abs_path(old_metadata_fn)):
|
|
os.rename(misc.get_abs_path(old_metadata_fn), misc.get_abs_path(metadata_fn))
|
|
if old_publickey_fn and os.path.exists(old_publickey_fn):
|
|
os.rename(misc.get_abs_path(old_publickey_fn), misc.get_abs_path(publickey_fn))
|
|
|
|
if not get_publisher().cfg['providers'].has_key(key_provider_id):
|
|
get_publisher().cfg['providers'][key_provider_id] = {}
|
|
|
|
get_publisher().cfg['providers'][key_provider_id]['role'] = role
|
|
get_publisher().cfg['providers'][key_provider_id]['metadata'] = metadata_fn
|
|
|
|
# save URL so they can be automatically updated later
|
|
get_publisher().cfg['providers'][key_provider_id]['metadata_url'] = metadata_url
|
|
get_publisher().cfg['providers'][key_provider_id]['publickey_url'] = publickey_url
|
|
|
|
file(misc.get_abs_path(metadata_fn), 'w').write(metadata)
|
|
if publickey:
|
|
file(misc.get_abs_path(publickey_fn), 'w').write(publickey)
|
|
|
|
get_publisher().write_cfg()
|
|
|
|
return redirect('.')
|
|
|
|
|
|
|
|
class LibertyProviderUI(Directory):
|
|
_q_exports = ['', 'delete', 'edit', 'update_remote']
|
|
|
|
def __init__(self, component):
|
|
try:
|
|
self.lp = get_cfg('providers')[component]
|
|
except KeyError:
|
|
raise TraversalError('%s does not exist' % component)
|
|
self.lpk = component
|
|
try:
|
|
self.label = misc.get_provider_and_label(self.lpk)[1]
|
|
except KeyError:
|
|
self.label = _('[Broken]')
|
|
|
|
def _q_traverse(self, path):
|
|
get_response().breadcrumb.append((('%s/' % self.lpk), self.label))
|
|
return Directory._q_traverse(self, path)
|
|
|
|
def _q_index [html] (self):
|
|
p, label = misc.get_provider_and_label(self.lpk)
|
|
html_top('settings', title = _('Liberty Provider - %s') % label)
|
|
'<h2>%s</h2>' % _('Liberty Provider - %s') % label
|
|
|
|
get_session().display_message()
|
|
|
|
'<div class="form">'
|
|
|
|
'<div class="title">%s</div>' % _('Provider ID')
|
|
'<div class="StringWidget content">%s</div>' % p.providerId
|
|
|
|
'<div class="title">%s</div>' % _('Label')
|
|
'<div class="StringWidget content">%s</div>' % label
|
|
|
|
if self.lp.get('metadata_url'):
|
|
'<div class="title">%s</div>' % _('Original metadata URL')
|
|
'<div class="StringWidget content">%s</div>' % self.lp.get('metadata_url')
|
|
|
|
'<h3>%s</h3>' % _('Metadata')
|
|
'<pre>'
|
|
file(misc.get_abs_path(self.lp['metadata'])).read()
|
|
'</pre>'
|
|
'</div>'
|
|
|
|
'<p>'
|
|
'<a href="edit">%s</a> ' % _('Edit')
|
|
if self.lp.get('metadata_url'):
|
|
'<a href="update_remote">%s</a>' % _('Update from remote URL')
|
|
'</p>'
|
|
|
|
|
|
def edit_form(self):
|
|
label = misc.get_provider_and_label(self.lpk)[1]
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(StringWidget, 'label', title = _('Label'), size = 50, value = label)
|
|
if get_cfg('idp').get('idff_proxy'):
|
|
form.add(SingleSelectWidget, 'role', title = _('Role'), required=True,
|
|
value = self.lp['role'],
|
|
options = [ (lasso.PROVIDER_ROLE_SP, _('Service Provider')),
|
|
(lasso.PROVIDER_ROLE_IDP, _('Identity Provider')),
|
|
(lasso.PROVIDER_ROLE_NONE, _('Both')) ])
|
|
form.add(FileWidget, 'metadata', title = _('Metadata'),
|
|
hint = _('Leaving empty will keep existing data.'))
|
|
form.add(FileWidget, 'publickey', title = _('Public Key'),
|
|
hint = _('Leaving empty will keep existing data.'))
|
|
form.add(FileWidget, 'cacertchain', title = _('CA Certificate Chain'),
|
|
hint = _('Leaving empty will keep existing data.'))
|
|
form.add(CheckboxWidget, 'idp_initiated_sso',
|
|
title = _('Allow IdP initiated Single Sign On'),
|
|
value = self.lp.get('idp_initiated_sso', True))
|
|
form.add(SingleSelectWidget, 'default_name_id_format',
|
|
title = _('Default NameID Format'),
|
|
required=False,
|
|
value = self.lp.get('default_name_id_format'),
|
|
options = [ ('persistent', _('Persistent')),
|
|
('transient', _('Transient')),
|
|
('email', _('Email')) ])
|
|
form.add(CheckboxWidget, 'encrypt_nameid',
|
|
title = _('Encrypt NameID'),
|
|
value = self.lp.get('encrypt_nameid', False))
|
|
|
|
form.add(CheckboxWidget, 'encrypt_assertion',
|
|
title = _('Encrypt Assertion'),
|
|
value = self.lp.get('encrypt_assertion', False),
|
|
hint = _('Only used for SAML 2.0.'))
|
|
|
|
form.add(StringWidget, 'exported_attributes',
|
|
title = _('Exported attributes'),
|
|
value = self.lp.get('exported_attributes', ''),
|
|
hint = _('Space separated list of words'))
|
|
|
|
if self.lp.get('locked') is None:
|
|
form.add_submit('submit', _('Submit'))
|
|
|
|
form.add_submit('cancel', _('Cancel'))
|
|
return form
|
|
|
|
def edit [html] (self):
|
|
form = self.edit_form()
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('..')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
t = LibertyProvidersDir()
|
|
if self.lp.get('locked') is None:
|
|
pk, error = t.submit_new(form, self.lpk)
|
|
if error:
|
|
return error
|
|
return redirect('../%s/' % pk)
|
|
else:
|
|
return redirect('.')
|
|
|
|
get_response().breadcrumb.append(('edit', _('Edit')))
|
|
label = misc.get_provider_and_label(self.lpk)[1]
|
|
html_top('settings', title = _('Edit Liberty Provider - %s') % label)
|
|
'<h2>%s</h2>' % _('Edit Liberty Provider - %s') % label
|
|
if self.lp.get('locked') is not None:
|
|
'<div class="infonotice">'
|
|
self.lp.get('locked')
|
|
'</div>'
|
|
form.render()
|
|
|
|
def delete [html] (self):
|
|
try:
|
|
p, label = misc.get_provider_and_label(self.lpk)
|
|
except KeyError:
|
|
p, label = (None, None)
|
|
|
|
form = Form(enctype='multipart/form-data')
|
|
if self.lp.get('locked') is None:
|
|
if label:
|
|
form.widgets.append(HtmlWidget('<p>%s</p>' % _(
|
|
'You are about to irrevocably remove this Liberty provider: %s') % label))
|
|
else:
|
|
form.widgets.append(HtmlWidget('<p>%s</p>' % _(
|
|
'You are about to irrevocably remove this Liberty provider.')))
|
|
|
|
form.widgets.append(HtmlWidget('<p>%s</p>' % _('Are you sure ?')))
|
|
|
|
form.add_submit('submit', _('Remove provider'))
|
|
else:
|
|
form.widgets.append(HtmlWidget('<p>%s</p>' % _(
|
|
'This provider is currently locked.')))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('..')
|
|
if form.is_submitted() and not form.has_errors():
|
|
if self.lp.get('locked') is None:
|
|
self.delete_submitted()
|
|
return redirect('..')
|
|
|
|
get_response().breadcrumb.append(('delete', _('Delete')))
|
|
html_top('settings', title = _('Liberty Provider'))
|
|
if label:
|
|
'<h2>%s</h2>' % _('Deleting %s') % label
|
|
else:
|
|
'<h2>%s</h2>' % _('Deleting Liberty Provider')
|
|
form.render()
|
|
|
|
def delete_submitted(self):
|
|
del get_publisher().cfg['providers'][self.lpk]
|
|
dir = get_publisher().app_dir
|
|
metadata_fn = os.path.join(dir, 'provider-%s-metadata.xml' % self.lpk)
|
|
publickey_fn = os.path.join(dir, 'provider-%s-publickey.pem' % self.lpk)
|
|
cacertchain_fn = os.path.join(dir, 'provider-%s-cacertchain.pem' % self.lpk)
|
|
for f in (metadata_fn, publickey_fn, cacertchain_fn):
|
|
if os.path.exists(f):
|
|
os.unlink(f)
|
|
get_publisher().write_cfg()
|
|
|
|
def update_remote(self):
|
|
metadata_url = self.lp.get('metadata_url')
|
|
try:
|
|
metadata_fd = urllib2.urlopen(metadata_url)
|
|
except urllib2.HTTPError, e:
|
|
return error_page('failed to download')
|
|
metadata = metadata_fd.read()
|
|
|
|
publickey_url = self.lp.get('publickey_url')
|
|
if publickey_url:
|
|
try:
|
|
publickey_fd = urllib2.urlopen(publickey_url)
|
|
except urllib2.HTTPError, e:
|
|
return error_page('failed to download')
|
|
publickey = publickey_fd.read()
|
|
else:
|
|
publickey = None
|
|
|
|
cacertchain = None
|
|
|
|
provider_id = re.findall(r'(provider|entity)ID="(.*?)"', metadata)[0][1]
|
|
try:
|
|
provider_id = re.findall(r'(provider|entity)ID="(.*?)"', metadata)[0][1]
|
|
except IndexError:
|
|
return error_page(_('Bad metadata'))
|
|
|
|
new_key_provider_id = misc.get_provider_key(provider_id)
|
|
key_provider_id = self.lpk
|
|
old_metadata_fn = None
|
|
old_publickey_fn = None
|
|
old_cacertchain_fn = None
|
|
old_dict = {}
|
|
if key_provider_id and new_key_provider_id != key_provider_id:
|
|
# provider id changed, remove old files
|
|
get_publisher().cfg['providers'][new_key_provider_id] = \
|
|
get_publisher().cfg['providers'][key_provider_id]
|
|
old_metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
old_publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
old_cacertchain_fn = 'provider-%s-cacertchain.pem' % key_provider_id
|
|
old_dict = get_publisher().cfg['providers'][key_provider_id]
|
|
del get_publisher().cfg['providers'][key_provider_id]
|
|
|
|
key_provider_id = new_key_provider_id
|
|
|
|
dir = get_publisher().app_dir
|
|
metadata_fn = 'provider-%s-metadata.xml' % key_provider_id
|
|
publickey_fn = 'provider-%s-publickey.pem' % key_provider_id
|
|
cacertchain_fn = 'provider-%s-cacertchain.pem' % key_provider_id
|
|
|
|
|
|
if old_publickey_fn and os.path.exists(old_publickey_fn):
|
|
os.rename(misc.get_abs_path(old_publickey_fn), misc.get_abs_path(publickey_fn))
|
|
if old_cacertchain_fn and os.path.exists(old_cacertchain_fn):
|
|
os.rename(misc.get_abs_path(old_cacertchain_fn), misc.get_abs_path(cacertchain_fn))
|
|
|
|
if not get_publisher().cfg['providers'].has_key(key_provider_id):
|
|
get_publisher().cfg['providers'][key_provider_id] = {}
|
|
|
|
get_publisher().cfg['providers'][key_provider_id]['metadata'] = metadata_fn
|
|
|
|
if metadata:
|
|
file(misc.get_abs_path(metadata_fn), 'w').write(metadata)
|
|
if publickey:
|
|
file(misc.get_abs_path(publickey_fn), 'w').write(publickey)
|
|
get_publisher().cfg['providers'][key_provider_id]['publickey'] = publickey_fn
|
|
if cacertchain:
|
|
file(misc.get_abs_path(cacertchain_fn), 'w').write(cacertchain)
|
|
get_publisher().cfg['providers'][key_provider_id]['cacertchain'] = cacertchain_fn
|
|
|
|
lp = get_publisher().cfg['providers'][key_provider_id]
|
|
publickey_fn = None
|
|
cacertchain_fn = None
|
|
if lp.has_key('publickey') and os.path.exists(misc.get_abs_path(lp['publickey'])):
|
|
publickey_fn = misc.get_abs_path(lp['publickey'])
|
|
if lp.has_key('cacertchain') and os.path.exists(misc.get_abs_path(lp['cacertchain'])):
|
|
cacertchain_fn = misc.get_abs_path(lp['cacertchain'])
|
|
try:
|
|
p = lasso.Provider(lp['role'], misc.get_abs_path(lp['metadata']),
|
|
publickey_fn, cacertchain_fn)
|
|
except lasso.Error:
|
|
# this happens when the public key is missing from both params
|
|
# and metadata file
|
|
if publickey_fn:
|
|
return (None, error_page(_('Bad metadata')))
|
|
else:
|
|
return (None, error_page(_('Bad metadata or missing public key')))
|
|
|
|
try:
|
|
p, l = misc.get_provider_and_label(key_provider_id)
|
|
except (TypeError, KeyError):
|
|
del get_publisher().cfg['providers'][key_provider_id]
|
|
if metadata:
|
|
os.unlink(misc.get_abs_path(metadata_fn))
|
|
if publickey:
|
|
os.unlink(misc.get_abs_path(publickey_fn))
|
|
if cacertchain:
|
|
os.unlink(misc.get_abs_path(cacertchain_fn))
|
|
return error_page(_('Bad metadata'))
|
|
|
|
get_publisher().write_cfg()
|
|
|
|
get_session().message = ('info',
|
|
_('Provider Metadata have been updated from their remote location.'))
|
|
|
|
return redirect('../%s/' % key_provider_id)
|
|
|
|
|
|
|
|
class SettingsDirectory(schemagui.SettingDirectory):
|
|
_q_exports = ['', 'idp', 'liberty_providers',
|
|
'identity_storage', 'identity_options', 'themes', 'passwords',
|
|
'debug', 'debug_options', 'emails', 'language', 'login', 'template', 'branding', 'texts',
|
|
'misc', 'sitename', 'ssl', 'upload_theme', 'debug', 'identities', 'cas', 'homepage', 'translations']
|
|
|
|
emails = EmailsDirectory()
|
|
texts = TextsDirectory()
|
|
if TranslationsDirectory:
|
|
translations = TranslationsDirectory()
|
|
|
|
liberty_providers = LibertyProvidersDir()
|
|
endpoints_prefixes = { 'slo': 'proxySingleLogout',
|
|
'ac': 'proxyAssertionConsumer', 'se': 'proxySoapEndpoint' }
|
|
|
|
def get_configuration(self, path):
|
|
return configuration.get_configuration(path)
|
|
|
|
def get_schema(self):
|
|
return configuration.configuration
|
|
|
|
def _q_index [html] (self):
|
|
get_publisher().reload_cfg()
|
|
html_top('settings', title = _('Settings'))
|
|
|
|
if lasso.SAML2_SUPPORT:
|
|
'<h2>Liberty Alliance & SAML 2.0</h2>'
|
|
'<dl> <dt><a href="idp">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Identity Provider'), _('Configure Liberty / SAML 2.0 Parameters'))
|
|
else:
|
|
'<h2>Liberty Alliance</h2>'
|
|
'<dl> <dt><a href="idp">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Identity Provider'), _('Configure Liberty Parameters'))
|
|
|
|
if get_cfg(str('idp'), {}).get(str('providerid')):
|
|
metadata_url = '%s/metadata.xml' % get_cfg('idp')['base_url']
|
|
'<dt><a href="%s">%s</a></dt> <dd>%s</dd>' % (metadata_url,
|
|
_('Liberty Identity Provider Metadata'),
|
|
_('Download Identity Provider ID-FF 1.2 Metadata file'))
|
|
if get_cfg(str('idp'), {}).get(str('saml2_providerid')):
|
|
metadata_url = '%s/metadata.xml' % get_cfg('idp')['saml2_base_url']
|
|
'<dt><a href="%s">%s</a></dt> <dd>%s</dd>' % (metadata_url,
|
|
_('SAML Identity Provider Metadata'),
|
|
_('Download Identity Provider SAML 2.0 Metadata file'))
|
|
|
|
'<dt><a href="liberty_providers/">%s</a></dt> <dd>%s</dd> </dl>' % (
|
|
_('Liberty Providers'), _('Add and remove liberty providers'))
|
|
|
|
'<h2>%s</h2>' % _('Identities')
|
|
|
|
'<dl>'
|
|
'<dt><a href="identity_options">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Identity Options'), '...')
|
|
'<dt><a href="identity_storage">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Identity Storage'), _('Configure identities data source'))
|
|
'<dt><a href="passwords">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Passwords'), _('Configure all password things'))
|
|
'<dt><a href="ssl">%s</a></dt> <dd>%s</dd>' % (
|
|
_('SSL'), _('Configure all SSL things'))
|
|
'<dt><a href="cas">%s</a></dt> <dd>%s</dd>' % (
|
|
_('CAS'), _('Configure CAS server'))
|
|
'</dl>'
|
|
|
|
'<h2>%s</h2>' % _('Customisation')
|
|
|
|
'<dl>'
|
|
'<dt><a href="sitename">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Site Name'), _('Configure site name'))
|
|
'<dt><a href="homepage">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Homepage'), _('Configure the user homepage'))
|
|
'<dt><a href="language">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Language'), _('Configure site language'))
|
|
'<dt><a href="themes">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Theme'), _('Configure theme'))
|
|
'<dt><a href="template">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Template'), _('Configure template'))
|
|
'<dt><a href="emails/">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Emails'), _('Configure email settings'))
|
|
'<dt><a href="login">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Login Screen'), _('Configure login screen'))
|
|
'<dt><a href="texts/">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Texts'), _('Configure public pages texts'))
|
|
if TranslationsDirectory:
|
|
'<dt><a href="translations/">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Custom Translations'), _('Define custom translations'))
|
|
'</dl>'
|
|
|
|
'<h2>%s</h2>' % _('Debug')
|
|
|
|
'<dl>'
|
|
'<dt><a href="debug_options">%s</a></dt> <dd>%s</dd>' % (
|
|
_('Debug Options'), _('...'))
|
|
'</dl>'
|
|
|
|
def cas [html] (self):
|
|
schemagui.SettingDirectory.__getattr__(self, str('cas'))()
|
|
|
|
if self.get_configuration('cas').get('enable'):
|
|
'<p>Server endpoints are :'
|
|
'<ul>'
|
|
'<li> /cas/login </li>'
|
|
'<li> /cas/logout </li>'
|
|
'<li> /cas/validate </li>'
|
|
'<li> /cas/proxy </li>'
|
|
'<li> /cas/serviceValidate </li>'
|
|
'<li> /cas/proxyValidate </li>'
|
|
'</ul>'
|
|
'</p>'
|
|
|
|
# keep old name for this configuration panel
|
|
def identity_options(self):
|
|
return self.identities()
|
|
|
|
def themes [html] (self):
|
|
request = get_request()
|
|
|
|
if not request.form.has_key('theme'):
|
|
current_theme = get_cfg('branding', {}).get('theme', 'default')
|
|
|
|
get_response().breadcrumb.append(('themes', _('Themes')))
|
|
html_top('settings', title = _('Themes'))
|
|
"<h2>%s</h2>" % _('Themes')
|
|
|
|
get_session().display_message()
|
|
|
|
'<a rel="popup" href="upload_theme">%s</a>' % _('Upload New Theme')
|
|
|
|
'<form action="themes" enctype="multipart/form-data" method="post">'
|
|
themes = qommon.template.get_themes()
|
|
'<ul class="biglist themes">'
|
|
for theme, (label, desc, author, icon) in sorted(themes.items()):
|
|
if current_theme == theme:
|
|
checked = ' checked="checked"'
|
|
else:
|
|
checked = ''
|
|
'<li>'
|
|
'<strong class="label">'
|
|
' <input name="theme" value="%s" type="radio"%s>%s</input></strong>' % (
|
|
theme, checked, label)
|
|
if icon:
|
|
'<img src="/themes/%s/icon.png" alt="" class="theme-icon" />' % theme
|
|
'<p class="details">%s' % desc
|
|
if author:
|
|
'<br/>by %s</p>' % author
|
|
'</li>'
|
|
'</ul>'
|
|
'<div class="buttons">'
|
|
'<input type="submit" name="submit" value="%s" />' % _('Submit')
|
|
'</div>'
|
|
'</form>'
|
|
else:
|
|
themes = qommon.template.get_themes()
|
|
if themes.has_key(str(request.form['theme'])):
|
|
branding_cfg = get_cfg('branding', {})
|
|
branding_cfg[str('theme')] = str(request.form['theme'])
|
|
get_publisher().cfg[str('branding')] = branding_cfg
|
|
get_publisher().write_cfg()
|
|
return redirect('.')
|
|
|
|
def upload_theme [html] (self):
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(FileWidget, 'file', title = _('Theme'), required = True)
|
|
form.add_submit('submit', _('Upload'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('.')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
try:
|
|
return self.upload_theme_submit(form)
|
|
except ValueError:
|
|
form.get_widget('file').set_error(_('Invalid Theme'))
|
|
|
|
get_response().breadcrumb.append( ('upload_theme', _('Upload Theme')) )
|
|
html_top('forms', title = _('Upload Theme'))
|
|
'<h2>%s</h2>' % _('Upload Theme')
|
|
form.render()
|
|
|
|
def upload_theme_submit(self, form):
|
|
try:
|
|
z = zipfile.ZipFile(form.get_widget('file').parse().fp, 'r')
|
|
except:
|
|
get_session().message = ('error', _('Failed to read theme file.'))
|
|
return redirect('themes')
|
|
theme_dir = os.path.join(get_publisher().app_dir, 'themes')
|
|
# TODO: should check and read desc.xml first; and if theme already
|
|
# exists, it should remove the whole directory
|
|
for f in z.namelist():
|
|
if f[-1] == '/':
|
|
continue
|
|
path = os.path.join(theme_dir, f)
|
|
data = z.read(f)
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
os.makedirs(os.path.dirname(path))
|
|
open(path, 'w').write(data)
|
|
z.close()
|
|
return redirect('themes')
|
|
|
|
def identity_storage [html] (self):
|
|
form = Form(enctype='multipart/form-data')
|
|
data_source = get_cfg('identity_storage', {})
|
|
options = [(k, _(identities.stores[k].label)) for k in identities.stores.keys()]
|
|
|
|
# force common storage in first position; others will be placed
|
|
# randomly
|
|
options.remove(('common', _(identities.stores['common'].label)))
|
|
options.insert(0, ('common', _(identities.stores['common'].label)))
|
|
|
|
if data_source.get('source') != 'file':
|
|
options.remove(('file', _(identities.stores['file'].label)))
|
|
|
|
form.add(SingleSelectWidget, 'source', title = _('Data Source'),
|
|
required = True, value = data_source.get('source', 'common'),
|
|
options = options)
|
|
redo = False
|
|
try:
|
|
temp_store = identities.stores.get(form.get_widget('source').parse())()
|
|
except TypeError: # TypeError: 'NoneType' object is not callable
|
|
# this happens if source is no longer available, we fall back to
|
|
# common
|
|
form.set_error('source',
|
|
_('Storage was set to "%s" but it is no longer available.') % \
|
|
data_source.get('source', 'common'))
|
|
temp_store = identities.stores.get('common')
|
|
|
|
if hasattr(temp_store, str('fill_admin_form')):
|
|
temp_store.fill_admin_form(form, data_source)
|
|
if not get_request().form.has_key(temp_store.admin_keys[0]):
|
|
# if the submitted form didn't have the first admin key, it
|
|
# means the user changes storage method and we must redisplay
|
|
# the form with the new options
|
|
form.clear_errors()
|
|
redo = True
|
|
else:
|
|
# if it previously had more than one field, user changed storage
|
|
# method and we let it see the change
|
|
# (counted form.keys() are 'source', 'submit'; excluding _* will
|
|
# exclude _form_id which may be added by Quixote)
|
|
if len([x for x in get_request().form.keys() if x[0] != '_']) > 2:
|
|
redo = True
|
|
|
|
if data_source.get('locked') is None:
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('.')
|
|
|
|
form_error = None
|
|
if form.is_submitted() and not (form.has_errors() or redo):
|
|
try:
|
|
self.identity_storage_submit(form)
|
|
except identities.IdentityStoreException, e:
|
|
form_error = e
|
|
else:
|
|
return redirect('.')
|
|
|
|
get_response().breadcrumb.append(('identity_storage', _('Identity Storage')))
|
|
html_top('settings', title = _('Identity Storage'))
|
|
'<h2>%s</h2>' % _('Identity Storage')
|
|
if form_error:
|
|
'<div class="errornotice">%s</div>' % e
|
|
if data_source.get('locked') is not None:
|
|
'<div class="infonotice">'
|
|
data_source.get('locked')
|
|
'</div>'
|
|
form.render()
|
|
|
|
def identity_storage_submit(self, form):
|
|
get_publisher().reload_cfg()
|
|
if not get_publisher().cfg.has_key('identity_storage'):
|
|
get_publisher().cfg['identity_storage'] = {}
|
|
get_publisher().cfg['identity_storage']['source'] = str(form.get_widget('source').parse())
|
|
temp_store = identities.stores.get(get_publisher().cfg['identity_storage']['source'])()
|
|
if hasattr(temp_store, 'admin_keys'):
|
|
for k in temp_store.admin_keys:
|
|
if form.get_widget(k):
|
|
get_publisher().cfg['identity_storage'][k] = form.get_widget(k).parse()
|
|
store = identities.load_store()
|
|
try:
|
|
store.load_identities()
|
|
except identities.IdentityStoreException, e:
|
|
# Rollback configuration changes
|
|
get_publisher().reload_cfg()
|
|
get_publisher().store = store.load_identities()
|
|
# Propagate the Exception which has to be caught by the caller
|
|
raise e
|
|
else:
|
|
# Commit configuration changes
|
|
get_publisher().write_cfg()
|
|
|
|
|
|
def idp [html] (self):
|
|
get_publisher().reload_cfg()
|
|
base_url = get_cfg('idp', {}).get('base_url', None)
|
|
base_soap_url = get_cfg('idp', {}).get('base_soap_url', base_url)
|
|
saml2_base_url = get_cfg('idp', {}).get('saml2_base_url', None)
|
|
saml2_base_soap_url = get_cfg('idp', {}).get('saml2_base_soap_url', saml2_base_url)
|
|
req = get_request()
|
|
|
|
if not base_url:
|
|
base_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
req.environ['SCRIPT_NAME'] + '/liberty')
|
|
base_soap_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
req.environ['SCRIPT_NAME'] + '/liberty')
|
|
|
|
if lasso.SAML2_SUPPORT and not saml2_base_url:
|
|
saml2_base_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
req.environ['SCRIPT_NAME'] + '/saml')
|
|
saml2_base_soap_url = '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
req.environ['SCRIPT_NAME'] + '/saml')
|
|
|
|
form = Form(enctype='multipart/form-data')
|
|
if lasso.SAML2_SUPPORT:
|
|
form.add(StringWidget, 'providerid', title=_('Liberty Provider ID'),
|
|
size=50, required=True,
|
|
value = get_cfg('idp', {}).get('providerid', base_url + '/metadata'))
|
|
form.add(StringWidget, 'base_url', title=_('Liberty Base URL'), size=50, required=True,
|
|
value = base_url)
|
|
form.add(StringWidget, 'base_soap_url', title=_('Liberty SOAP Base URL'),
|
|
size=50, required=True, value = base_soap_url)
|
|
form.add(StringWidget, 'saml2_providerid', title=_('SAML 2.0 Provider ID'),
|
|
size=50, required=True,
|
|
value = get_cfg('idp', {}).get(
|
|
'saml2_providerid', saml2_base_url + '/metadata'))
|
|
form.add(StringWidget, 'saml2_base_url', title=_('SAML 2.0 Base URL'),
|
|
size=50, required=True, value = saml2_base_url)
|
|
form.add(StringWidget, 'saml2_base_soap_url', title=_('SAML 2.0 SOAP Base URL'),
|
|
size=50, required=True, value = saml2_base_soap_url)
|
|
else:
|
|
form.add(StringWidget, 'providerid', title=_('Provider ID'), size=50, required=True,
|
|
value = get_cfg('idp', {}).get('providerid', base_url + '/metadata'))
|
|
form.add(StringWidget, 'base_url', title=_('Base URL'), size=50, required=True,
|
|
value = base_url)
|
|
form.add(StringWidget, 'base_soap_url', title=_('SOAP Base URL'), size=50, required=True,
|
|
value = base_soap_url)
|
|
|
|
form.add(StringWidget, 'organization_name', title=_('Organisation Name'), size=50,
|
|
value = get_cfg('idp', {}).get('organization_name', None))
|
|
|
|
dir = get_publisher().app_dir
|
|
privatekey_fn = os.path.join(dir, 'private-key.pem')
|
|
publickey_fn = os.path.join(dir, 'public-key.pem')
|
|
encryption_privatekey_fn = os.path.join(dir, 'encryption-private-key.pem')
|
|
encryption_publickey_fn = os.path.join(dir, 'encryption-public-key.pem')
|
|
|
|
form.add(FileWidget, 'privatekey', title = _('Signing Private Key'))
|
|
form.add(FileWidget, 'publickey', title = _('Signing Public Key'), hint = get_text_file_preview(publickey_fn))
|
|
form.add(FileWidget, 'encryption_privatekey', title = _('Encryption Private Key'))
|
|
form.add(FileWidget, 'encryption_publickey', title = _('Encryption Public Key'), hint = get_text_file_preview(encryption_publickey_fn))
|
|
|
|
form.add(CheckboxWidget, 'defederation', title=_('Allow users to \
|
|
manipulate their federations'),
|
|
hint=_('It gives user the possibility to kill their federation with \
|
|
a service, which can lead to information loss for them'),
|
|
value=get_cfg('idp',{}).get('defederation'))
|
|
|
|
form.add(StringWidget, 'common_domain',
|
|
title = _('Identity Provider Introduction, Common Domain'),
|
|
hint = _('Disabled if empty'),
|
|
value = get_cfg('idp', {}).get('common_domain'))
|
|
|
|
form.add(StringWidget, 'common_domain_setter_url',
|
|
title = _('Identity Provider Introduction, URL of Cookie Setter'),
|
|
hint = _('Disabled if empty'),
|
|
value = get_cfg('idp', {}).get('common_domain_setter_url'))
|
|
|
|
form.add(CheckboxWidget, 'idff_proxy', title = _('ID-FF & SAMLv2 Proxy Support'),
|
|
value = get_cfg('idp', {}).get('idff_proxy', False))
|
|
if not hasattr(lasso.Server(), str('role')):
|
|
widget = form.get_widget('idff_proxy')
|
|
widget.hint = _('Lasso version is too old for this support.')
|
|
widget.value = False
|
|
widget.attrs[str('disabled')] = str('disabled')
|
|
form.add(CheckboxWidget, 'direct_proxy', title = _('Direct Proxy'),
|
|
hint = _('Totally bypass local authentication'),
|
|
value = get_cfg('idp', {}).get('direct_proxy', False))
|
|
|
|
form.add(CheckboxWidget, 'idsis_pp',
|
|
title = _('Share attributes through ID-SIS Personal Profile'),
|
|
value = get_cfg('idp', {}).get('idsis_pp', False))
|
|
if not lasso.WSF_SUPPORT:
|
|
widget= form.get_widget('idsis_pp')
|
|
widget.hint = _('Lasso version is not built with ID-WSF support.')
|
|
widget.value = False
|
|
widget.attrs[str('disabled')] = str('disabled')
|
|
|
|
if get_cfg('idp',{}).get('locked') is None:
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
if x509utils.can_generate_rsa_key_pair():
|
|
form.add_submit('generate_rsa', _('Generate a simple RSA key pair'))
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('.')
|
|
|
|
if form.get_widget('generate_rsa') and form.get_widget('generate_rsa').parse():
|
|
result = self.idp_save(form)
|
|
if result:
|
|
form.set_error(*result)
|
|
else:
|
|
self.generate_rsa_keypair()
|
|
return redirect('')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
try:
|
|
old_cfg = copy.deepcopy(get_publisher().cfg)
|
|
if get_cfg('idp',{}).get('locked') is None:
|
|
self.idp_save(form)
|
|
if not form.has_errors():
|
|
return redirect('.')
|
|
get_publisher().cfg = old_cfg
|
|
except:
|
|
get_publisher().cfg = old_cfg
|
|
|
|
get_response().breadcrumb.append(('idp', _('Identity Provider')))
|
|
html_top('settings', title = _('Identity Provider Configuration'))
|
|
'<h2>%s</h2>' % _('Identity Provider Configuration')
|
|
if get_cfg('idp',{}).get('locked') is not None:
|
|
'<div class="infonotice">'
|
|
get_cfg('idp',{}).get('locked')
|
|
'</div>'
|
|
form.render()
|
|
|
|
def generate_rsa_keypair(self, branch = 'sp'):
|
|
publickey, privatekey = x509utils.generate_rsa_keypair()
|
|
encryptionpublickey, encryptionprivatekey = x509utils.generate_rsa_keypair()
|
|
cfg_sp = get_cfg('idp' ,{})
|
|
self.configure_idp_metadatas(cfg_sp, publickey, privatekey, encryptionpublickey, encryptionprivatekey, True, True)
|
|
|
|
def write_idp_metadatas(self, signing_pem_key, private_signing_pem_key,
|
|
encryption_pem_key, private_encryption_pem_key, metadata,
|
|
saml2_metadata):
|
|
'''Write SP metadatas, that key files and metadata files'''
|
|
dir = get_publisher().app_dir
|
|
if signing_pem_key:
|
|
privatekey_fn = os.path.join(dir, 'private-key.pem')
|
|
publickey_fn = os.path.join(dir, 'public-key.pem')
|
|
storage.atomic_write(publickey_fn, signing_pem_key)
|
|
storage.atomic_write(privatekey_fn, private_signing_pem_key)
|
|
if encryption_pem_key:
|
|
encryption_privatekey_fn = os.path.join(dir, 'encryption-private-key.pem')
|
|
encryption_publickey_fn = os.path.join(dir, 'encryption-public-key.pem')
|
|
storage.atomic_write(encryption_publickey_fn, encryption_pem_key)
|
|
storage.atomic_write(encryption_privatekey_fn, private_encryption_pem_key)
|
|
if metadata:
|
|
metadata_fn = os.path.join(dir, 'metadata.xml')
|
|
storage.atomic_write(metadata_fn, metadata)
|
|
|
|
if saml2_metadata:
|
|
saml2_metadata_fn = os.path.join(dir, 'saml2-metadata.xml')
|
|
storage.atomic_write(saml2_metadata_fn, saml2_metadata)
|
|
|
|
def configure_idp_metadatas(self, cfg_sp, signing_pem_key, private_signing_pem_key,
|
|
encryption_pem_key, private_encryption_pem_key, liberty, saml2):
|
|
if x509utils.can_generate_rsa_key_pair():
|
|
if signing_pem_key and not x509utils.check_key_pair_consistency(signing_pem_key, private_signing_pem_key):
|
|
return ('publickey', _('Signing key pair is invalid'))
|
|
if encryption_pem_key and not x509utils.check_key_pair_consistency(encryption_pem_key, private_encryption_pem_key):
|
|
return ('encryption_publickey', _('Encryption key pair is invalid'))
|
|
if signing_pem_key:
|
|
cfg_sp['publickey'] = 'public-key.pem'
|
|
cfg_sp['privatekey'] = 'private-key.pem'
|
|
if encryption_pem_key:
|
|
cfg_sp['encryption_privatekey'] = 'encryption-private-key.pem'
|
|
cfg_sp['encryption_publickey'] = 'encryption-public-key.pem'
|
|
|
|
metadata = saml2_metadata = None
|
|
if liberty:
|
|
cfg_sp['metadata'] = 'metadata.xml'
|
|
|
|
metadata_generator = libertyutils.Metadata(config = cfg_sp,
|
|
provider_id = cfg_sp['providerid'],
|
|
publisher = get_publisher())
|
|
metadata = metadata_generator.get_metadata(signing_pem_key,
|
|
encryption_pem_key, do_idp = True,
|
|
do_sp = bool(cfg_sp.get('idff_proxy', False)),
|
|
endpoints = self.endpoints_prefixes)
|
|
if saml2:
|
|
cfg_sp['saml2_metadata'] = 'saml2-metadata.xml'
|
|
saml2_metadata_gen = saml2utils.Metadata(config = cfg_sp,
|
|
provider_id = cfg_sp['saml2_providerid'],
|
|
publisher = get_publisher())
|
|
saml2_metadata = saml2_metadata_gen.get_saml2_metadata(signing_pem_key,
|
|
encryption_pem_key, do_idp = True,
|
|
do_sp = bool(cfg_sp.get('idff_proxy', False)),
|
|
endpoints = self.endpoints_prefixes)
|
|
self.write_idp_metadatas(signing_pem_key, private_signing_pem_key,
|
|
encryption_pem_key, private_encryption_pem_key,
|
|
metadata, saml2_metadata)
|
|
get_publisher().write_cfg()
|
|
return None
|
|
|
|
def idp_save(self, form):
|
|
dir = get_publisher().app_dir
|
|
error = False
|
|
get_publisher().reload_cfg()
|
|
config = get_publisher().cfg.get('idp', {})
|
|
get_publisher().cfg['idp'] = config
|
|
|
|
def extract_file_value(name):
|
|
value = form.get_widget(name).parse()
|
|
if value:
|
|
return value.fp.read()
|
|
return None
|
|
|
|
|
|
def get_key_pair(prefix):
|
|
try:
|
|
privatekey_content = extract_file_value(prefix+'privatekey')
|
|
publickey_content = extract_file_value(prefix+'publickey')
|
|
if not privatekey_content and not publickey_content:
|
|
return None
|
|
if x509utils.can_generate_rsa_key_pair():
|
|
if not x509utils.check_key_pair_consistency(publickey_content, privatekey_content):
|
|
raise Exception()
|
|
else:
|
|
if (not 'BEGIN RSA PRIVATE' in privatekey_content and not 'BEGIN DSA PRIVATE' in privatekey_content) or (not 'BEGIN PUBLIC KEY' in publickey_content and not 'BEGIN CERTIFICATE' in publickey_content):
|
|
raise Exception()
|
|
|
|
except:
|
|
raise
|
|
form.set_error(prefix+'publickey', _('The key pair is invalid.'))
|
|
return None
|
|
return (publickey_content, privatekey_content)
|
|
|
|
def write_key_pair(prefix1, prefix2, keypair):
|
|
if not keypair:
|
|
return
|
|
privatekey_fn = os.path.join(dir, prefix1+'private-key.pem')
|
|
publickey_fn = os.path.join(dir, prefix1+'public-key.pem')
|
|
file(publickey_fn, 'w').write(keypair[0])
|
|
file(privatekey_fn, 'w').write(keypair[1])
|
|
config[prefix2+'publickey'] = prefix1+'public-key.pem'
|
|
config[prefix2+'privatekey'] = prefix1+'private-key.pem'
|
|
|
|
old_common_domain_setter_url = config.get('common_domain_setter_url')
|
|
for k in ('providerid', 'base_url', 'organization_name', 'common_domain',
|
|
'idff_proxy', 'idsis_pp', 'saml2_providerid', 'saml2_base_url',
|
|
'base_soap_url', 'saml2_base_soap_url', 'direct_proxy',
|
|
'common_domain_setter_url','defederation'):
|
|
w = form.get_widget(k)
|
|
if w:
|
|
config[k] = w.parse()
|
|
|
|
signing_keypair = get_key_pair('')
|
|
encryption_keypair = get_key_pair('encryption_')
|
|
|
|
do_sp = bool(config.get('idff_proxy', False))
|
|
public_signing_key = signing_keypair and signing_keypair[0]
|
|
public_encryption_key = encryption_keypair and encryption_keypair[0]
|
|
# FIXME: is it really useful to permit manual metadatas ?
|
|
metadata_fn = os.path.join(dir, 'metadata.xml')
|
|
metadata = libertyutils.Metadata(config = config, provider_id = config['providerid'], publisher = get_publisher())
|
|
file(metadata_fn, 'w').write(metadata.get_metadata(public_signing_key,
|
|
public_encryption_key, do_idp = True, do_sp = do_sp,
|
|
endpoints = self.endpoints_prefixes))
|
|
config['metadata'] = 'metadata.xml'
|
|
|
|
if config.has_key('saml2_providerid'):
|
|
saml2_metadata_fn = os.path.join(dir, 'saml2-metadata.xml')
|
|
saml2_metadata = saml2utils.Metadata(config = config, provider_id = config['saml2_providerid'], publisher = get_publisher())
|
|
file(saml2_metadata_fn, 'w').write(
|
|
saml2_metadata.get_saml2_metadata(public_signing_key,
|
|
public_encryption_key, do_idp = True, do_sp = do_sp,
|
|
endpoints = self.endpoints_prefixes))
|
|
config['saml2_metadata'] = 'saml2-metadata.xml'
|
|
|
|
if form.has_errors():
|
|
return
|
|
|
|
write_key_pair('','',signing_keypair)
|
|
write_key_pair('encryption-','encryption_',signing_keypair)
|
|
|
|
new_common_domain_setter_url = config.get('common_domain_setter_url')
|
|
if new_common_domain_setter_url != old_common_domain_setter_url:
|
|
old_domain = None
|
|
new_domain = None
|
|
|
|
if old_common_domain_setter_url:
|
|
old_domain = urlparse.urlparse(old_common_domain_setter_url)[1]
|
|
if ':' in old_domain:
|
|
old_domain = old_domain.split(':')[0]
|
|
old_domain_dir = os.path.normpath(os.path.join(dir, '..', old_domain))
|
|
try:
|
|
os.unlink(os.path.join(old_domain_dir, 'common_cookie'))
|
|
os.rmdir(old_domain_dir)
|
|
except OSError:
|
|
# bad luck, but ignore this
|
|
pass
|
|
if new_common_domain_setter_url:
|
|
new_domain = urlparse.urlparse(new_common_domain_setter_url)[1]
|
|
if ':' in new_domain:
|
|
new_domain = new_domain.split(':')[0]
|
|
new_domain_dir = os.path.normpath(os.path.join(dir, '..', new_domain))
|
|
try:
|
|
os.mkdir(new_domain_dir)
|
|
except OSError:
|
|
pass
|
|
fn = os.path.join(new_domain_dir, 'common_cookie')
|
|
open(fn, 'w').write(dir)
|
|
|
|
get_publisher().write_cfg()
|
|
|
|
# keep support for old panel url point
|
|
def debug_options(self):
|
|
return self.debug()
|
|
|
|
def sitename(self):
|
|
return self.misc()
|
|
|
|
def template (self):
|
|
return self.branding()
|
|
|