general: remove id-ff 1.2 support (#6229)

This commit is contained in:
Frédéric Péters 2015-01-04 12:16:16 +01:00
parent 734cc50abb
commit 7e90b505d7
13 changed files with 170 additions and 882 deletions

View File

@ -40,8 +40,6 @@ def setup_module(module):
def setup_environment(idp_number=1):
pub.cfg = {}
pub.cfg['sp'] = {
'base_url': 'http://example.net/liberty',
'providerid': 'http://example.net/liberty/metadata',
'saml2_metadata': 'saml2-metadata.xml',
'saml2_base_url': 'http://example.net/saml',
'saml2_providerid': 'http://example.net/saml/metadata'

View File

@ -63,7 +63,7 @@ class IdentificationDirectory(Directory):
methods = [ ('password', _('Simple local username / password')), ]
if lasso is not None:
methods.insert(0,
('idp', _('Delegated to Liberty/SAML2 identity provider')))
('idp', _('Delegated to SAML identity provider')))
form.add(CheckboxesWidget, 'methods', title = _('Methods'),
value = identification_cfg.get('methods'),
elements = methods,
@ -97,7 +97,7 @@ class IdentificationDirectory(Directory):
cls = 'infonotice'
if identification_cfg.get('methods') and 'idp' in identification_cfg.get('methods'):
cls = 'errornotice'
r += htmltext('<p class="%s">%s</p>') % (cls, _('Delegated to Liberty/SAML2 identity provider \
r += htmltext('<p class="%s">%s</p>') % (cls, _('Delegated to SAML identity provider \
authentication is unavailable. Lasso must be installed to use it.'))
r += form.render()
return r.getvalue()

View File

@ -312,7 +312,7 @@ class UsersDirectory(Directory):
if not ident_methods:
r += htmltext('<p>%s</p>') % _('An authentification system must be configured before creating users.')
elif ident_methods == ['idp'] and len(get_cfg('idp', {}).items()) == 0:
r += htmltext('<p>%s</p>') % _('Liberty support must be setup before creating users.')
r += htmltext('<p>%s</p>') % _('SAML support must be setup before creating users.')
else:
get_response().filter['sidebar'] = self.get_sidebar(offset, limit)
@ -469,7 +469,7 @@ class UsersDirectory(Directory):
_('An authentification system must be configured before creating users.'))
if ident_methods == ['idp'] and len(get_cfg('idp', {}).items()) == 0:
return error_page('users',
_('Liberty support must be setup before creating users.'))
_('SAML support must be setup before creating users.'))
if get_cfg('sp', {}).get('idp-manage-user-attributes', False):
raise errors.TraversalError()

View File

@ -107,9 +107,7 @@ class CmdCheckHobos(Command):
if not pub.cfg.get('sp'):
pub.cfg['sp'] = {}
spconfig = pub.cfg['sp']
spconfig['base_url'] = str(service.get('base_url')) + '/liberty'
spconfig['saml2_base_url'] = str(service.get('base_url')) + '/saml'
spconfig['providerid'] = spconfig['base_url'] + '/metadata'
spconfig['saml2_providerid'] = spconfig['saml2_base_url'] + '/metadata'
MethodAdminDirectory().generate_rsa_keypair()

View File

@ -1,102 +0,0 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from quixote import get_publisher
from qommon import get_cfg
import qommon.liberty
class LibertyDirectory(qommon.liberty.LibertyDirectory):
def lookup_user(self, session, login):
import libxml2, lasso
ni = login.nameIdentifier.content
session.name_identifier = ni
nis = list(get_publisher().user_class.get_users_with_name_identifier(ni))
if nis:
user = nis[0]
else:
compatibility_id_wsf_user = get_cfg('misc', {}).get('grab-user-with-wsf')
id_wsf_user = get_cfg('saml_identities', {}).get('grab_user_with_wsf')
if lasso.WSF_SUPPORT and (compatibility_id_wsf_user or id_wsf_user):
disco = lasso.Discovery(login.server)
disco.setSessionFromDump(session.lasso_session_dump)
try:
disco.initQuery()
except lasso.Error, error:
# there is no defined error code on lasso side :/
service = None
else:
disco.addRequestedServiceType(lasso.PP_HREF)
disco.buildRequestMsg()
soap_answer = qommon.liberty.soap_call(disco.msgUrl, disco.msgBody)
disco.processQueryResponseMsg(soap_answer)
service = disco.getService()
if not service:
return None
service.initQuery('/pp:PP/pp:InformalName', 'name')
service.addQueryItem('/pp:PP/pp:MsgContact', 'email')
service.buildRequestMsg()
try:
soap_answer = qommon.liberty.soap_call(service.msgUrl, service.msgBody)
except qommon.liberty.SOAPException:
# it was advertised, it didn't work, too bad.
return None
service.processQueryResponseMsg(soap_answer)
email, name = None, None
emailNode = service.getAnswer('/pp:PP/pp:MsgContact')
if emailNode:
# horrible <MsgContact>; rebuild email
doc = libxml2.parseDoc(emailNode)
node = doc.children.children
account, provider = None, None
while node:
if node.name == 'MsgAccount':
account = node.getContent()
if node.name == 'MsgProvider':
provider = node.getContent()
node = node.next
if account and provider:
email = '%s@%s' % (account, provider)
else:
email = ''
nameNode = service.getAnswer('/pp:PP/pp:InformalName')
if nameNode:
doc = libxml2.parseDoc(nameNode)
name = unicode(doc.getContent(), 'utf-8').encode('iso-8859-1')
if email and name:
user = get_publisher().user_class()
user.email = email
user.name = name
user.name_identifiers.append(login.nameIdentifier.content)
user.lasso_dump = login.identity.dump()
user.store()
return user
return None
user.lasso_dump = login.identity.dump()
user.store()
return user

View File

@ -46,9 +46,8 @@ from qommon.storage import atomic_write
import qommon.x509utils as x509utils
import qommon.saml2utils as saml2utils
import qommon.libertyutils as libertyutils
ADMIN_TITLE = N_('Liberty/SAML2')
ADMIN_TITLE = N_('SAML2')
def get_file_content(filename):
try:
@ -108,18 +107,8 @@ class MethodDirectory(Directory):
misc.get_abs_path(idp.get('publickey')), None)
providers[p.providerId] = p
include_protocol = True
if len([x for x in providers.values() if
x.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0]) in (0, len(providers)):
include_protocol = False
for p in providers.values():
label = misc.get_provider_label(p)
if include_protocol:
if p.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
label = '%s (SAML 2.0)' % label
else:
label = '%s (Liberty ID-FF 1.2)' % label
options.append((p.providerId, label, p.providerId))
if not value:
value = p.providerId
@ -133,13 +122,8 @@ class MethodDirectory(Directory):
if form.is_submitted() and not form.has_errors():
idp = form.get_widget('idp').parse()
p = providers[form.get_widget('idp').parse()]
if p.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
saml = get_publisher().root_directory_class.saml
return saml.perform_login(idp)
else:
liberty = get_publisher().root_directory_class.liberty
return liberty.perform_login(idp)
saml = get_publisher().root_directory_class.saml
return saml.perform_login(idp)
template.html_top(_('Login'))
r = TemplateIO(html=True)
@ -777,7 +761,7 @@ class AdminIDPUI(Directory):
class MethodAdminDirectory(Directory):
title = ADMIN_TITLE
label = N_('Configure Liberty/SAML identification method')
label = N_('Configure SAML identification method')
_q_exports = ['', 'sp', 'idp', 'identities']
@ -790,25 +774,13 @@ class MethodAdminDirectory(Directory):
def _q_index(self):
admin_html_top('settings', title = _(self.title))
r = TemplateIO(html=True)
if lasso.SAML2_SUPPORT:
r += htmltext('<h2>Liberty Alliance & SAML 2.0</h2>')
r += htmltext('<dl> <dt><a href="sp">%s</a></dt> <dd>%s</dd>') % (
_('Service Provider'), _('Configure Liberty / SAML 2.0 parameters'))
else:
r += htmltext('<h2>Liberty Alliance</h2>')
r += htmltext('<dl> <dt><a href="sp">%s</a></dt> <dd>%s</dd>') % (
_('Service Provider'), _('Configure Liberty parameters'))
r += htmltext('<h2>SAML 2.0</h2>')
r += htmltext('<dl> <dt><a href="sp">%s</a></dt> <dd>%s</dd>') % (
_('Service Provider'), _('Configure SAML 2.0 parameters'))
if get_cfg(str('sp'), {}).get(str('providerid')) and (
hasattr(get_publisher().root_directory_class, str('liberty'))):
metadata_url = '%s/metadata.xml' % get_cfg(str('sp'))[str('base_url')]
r += htmltext('<dt><a href="%s">%s</a></dt> <dd>%s</dd>') % (
metadata_url,
_('ID-FF 1.2 Service Provider Metadata'),
_('Download Service Provider ID-FF 1.2 Metadata file'))
if get_cfg(str('sp'), {}).get(str('saml2_providerid')) and (
hasattr(get_publisher().root_directory_class, str('saml'))):
metadata_url = '%s/metadata.xml' % get_cfg(str('sp'))[str('saml2_base_url')]
if get_cfg('sp', {}).get('saml2_providerid') and (
hasattr(get_publisher().root_directory_class, 'saml')):
metadata_url = '%s/metadata.xml' % get_cfg('sp')['saml2_base_url']
r += htmltext('<dt><a href="%s">%s</a></dt> <dd>%s</dd>') % (
metadata_url,
_('SAML 2.0 Service Provider Metadata'),
@ -826,40 +798,24 @@ class MethodAdminDirectory(Directory):
publickey, privatekey = x509utils.generate_rsa_keypair()
encryptionpublickey, encryptionprivatekey = x509utils.generate_rsa_keypair()
cfg_sp = get_cfg(branch, {})
self.configure_sp_metadatas(cfg_sp, publickey, privatekey, encryptionpublickey, encryptionprivatekey, True, True)
self.configure_sp_metadatas(cfg_sp, publickey, privatekey, encryptionpublickey, encryptionprivatekey)
def sp(self):
get_response().breadcrumb.append( ('sp', _('Service Provider')))
base_url = get_cfg('sp', {}).get('base_url', None)
saml2_base_url = get_cfg('sp', {}).get('saml2_base_url', None)
req = get_request()
if not base_url:
base_url = '%s://%s%sliberty' % (req.get_scheme(), req.get_server(),
get_publisher().get_root_url())
if lasso.SAML2_SUPPORT and not saml2_base_url:
if not saml2_base_url:
saml2_base_url = '%s://%s%ssaml' % (req.get_scheme(), req.get_server(),
get_publisher().get_root_url())
form = Form(enctype='multipart/form-data')
if lasso.SAML2_SUPPORT:
form.add(StringWidget, 'providerid', title=_('Liberty Provider ID'),
size=50, required=True,
value = get_cfg('sp', {}).get('providerid', base_url + '/metadata'))
form.add(StringWidget, 'base_url', title=_('Liberty Base URL'), size=50, required=True,
value = base_url)
form.add(StringWidget, 'saml2_providerid', title=_('SAML 2.0 Provider ID'),
size=50, required=False,
value = get_cfg('sp', {}).get(
'saml2_providerid', saml2_base_url + '/metadata'))
form.add(StringWidget, 'saml2_base_url', title=_('SAML 2.0 Base URL'),
size=50, required=False, value = saml2_base_url)
else:
form.add(StringWidget, 'providerid', title=_('Provider ID'), size=50, required=True,
value = get_cfg('sp', {}).get('providerid', base_url + '/metadata'))
form.add(StringWidget, 'base_url', title=_('Base URL'), size=50, required=True,
value = base_url)
form.add(StringWidget, 'saml2_providerid', title=_('SAML 2.0 Provider ID'),
size=50, required=False,
value = get_cfg('sp', {}).get(
'saml2_providerid', saml2_base_url + '/metadata'))
form.add(StringWidget, 'saml2_base_url', title=_('SAML 2.0 Base URL'),
size=50, required=False, value=saml2_base_url)
form.add(StringWidget, 'organization_name', title=_('Organisation Name'), size=50,
value = get_cfg('sp', {}).get('organization_name', None))
@ -923,8 +879,7 @@ class MethodAdminDirectory(Directory):
return r.getvalue()
def write_sp_metadatas(self, signing_pem_key, private_signing_pem_key,
encryption_pem_key, private_encryption_pem_key, metadata,
saml2_metadata):
encryption_pem_key, private_encryption_pem_key, saml2_metadata):
'''Write SP metadatas, that key files and metadata files'''
dir = get_publisher().app_dir
if signing_pem_key:
@ -937,16 +892,12 @@ class MethodAdminDirectory(Directory):
encryption_publickey_fn = os.path.join(dir, 'encryption-public-key.pem')
atomic_write(encryption_publickey_fn, encryption_pem_key)
atomic_write(encryption_privatekey_fn, private_encryption_pem_key)
if metadata:
metadata_fn = os.path.join(dir, 'metadata.xml')
atomic_write(metadata_fn, metadata)
if saml2_metadata:
saml2_metadata_fn = os.path.join(dir, 'saml2-metadata.xml')
atomic_write(saml2_metadata_fn, saml2_metadata)
saml2_metadata_fn = os.path.join(dir, 'saml2-metadata.xml')
atomic_write(saml2_metadata_fn, saml2_metadata)
def configure_sp_metadatas(self, cfg_sp, signing_pem_key, private_signing_pem_key,
encryption_pem_key, private_encryption_pem_key, liberty, saml2):
encryption_pem_key, private_encryption_pem_key):
if x509utils.can_generate_rsa_key_pair():
if signing_pem_key and not x509utils.check_key_pair_consistency(signing_pem_key, private_signing_pem_key):
return ('publickey', _('Signing key pair is invalid'))
@ -959,18 +910,12 @@ class MethodAdminDirectory(Directory):
cfg_sp['encryption_privatekey'] = 'encryption-private-key.pem'
cfg_sp['encryption_publickey'] = 'encryption-public-key.pem'
metadata = saml2_metadata = None
if liberty:
cfg_sp['metadata'] = 'metadata.xml'
metadata = self.get_metadata(cfg_sp, signing_pem_key, encryption_pem_key)
if saml2:
cfg_sp['saml2_metadata'] = 'saml2-metadata.xml'
saml2_metadata = self.get_saml2_metadata(cfg_sp, signing_pem_key, encryption_pem_key)
cfg_sp['saml2_metadata'] = 'saml2-metadata.xml'
saml2_metadata = self.get_saml2_metadata(cfg_sp, signing_pem_key, encryption_pem_key)
self.write_sp_metadatas(signing_pem_key, private_signing_pem_key,
encryption_pem_key, private_encryption_pem_key,
metadata, saml2_metadata)
saml2_metadata)
get_publisher().write_cfg()
return None
@ -979,7 +924,7 @@ class MethodAdminDirectory(Directory):
cfg_sp = get_cfg('sp', {})
get_publisher().cfg['sp'] = cfg_sp
old_common_domain_getter_url = cfg_sp.get('common_domain_getter_url')
for k in ('providerid', 'base_url', 'organization_name', 'common_domain',
for k in ('organization_name', 'common_domain',
'saml2_providerid', 'saml2_base_url', 'common_domain_getter_url',
'grab_user_with_id_wsf', 'identity-creation',
'authn-request-signed', 'want-assertion-signed',
@ -998,7 +943,6 @@ class MethodAdminDirectory(Directory):
encryption_pem_key = get_key('encryption_publickey')
private_encryption_pem_key = get_key('encryption_privatekey')
liberty = cfg_sp.has_key('providerid')
saml2 = cfg_sp.has_key('saml2_providerid')
new_common_domain_getter_url = cfg_sp.get('common_domain_getter_url')
@ -1028,12 +972,7 @@ class MethodAdminDirectory(Directory):
pass
fn = os.path.join(new_domain_dir, 'common_cookie')
atomic_write(fn, get_publisher().app_dir)
return self.configure_sp_metadatas(cfg_sp, signing_pem_key, private_signing_pem_key, encryption_pem_key, private_encryption_pem_key, liberty, saml2)
def get_metadata(self, sp_config, signing_pem_key, encryption_pem_key):
meta = libertyutils.Metadata(publisher = get_publisher(), config =
sp_config, provider_id = sp_config['providerid'])
return meta.get_metadata(signing_pem_key, encryption_pem_key, do_sp = True)
return self.configure_sp_metadatas(cfg_sp, signing_pem_key, private_signing_pem_key, encryption_pem_key, private_encryption_pem_key)
def get_saml2_metadata(self, sp_config, signing_pem_key, encryption_pem_key):
meta = saml2utils.Metadata(publisher = get_publisher(), config =
@ -1173,7 +1112,7 @@ class MethodUserDirectory(Directory):
class IdPAuthMethod(AuthMethod):
key = 'idp'
description = N_('Liberty/SAML2 identity provider')
description = N_('SAML identity provider')
method_directory = MethodDirectory
method_admin_directory = MethodAdminDirectory
method_user_directory = MethodUserDirectory
@ -1198,14 +1137,7 @@ class IdPAuthMethod(AuthMethod):
# there is only one visible IdP, perform login automatically on
# this one.
server = misc.get_lasso_server('liberty')
for x in server.providerIds:
key_provider_id = misc.get_provider_key(x)
if not idps.get(key_provider_id, {}).get('hide', False):
liberty = get_publisher().root_directory_class.liberty
return liberty.perform_login(x)
server = misc.get_lasso_server('saml2')
server = misc.get_lasso_server()
for x in server.providerIds:
key_provider_id = misc.get_provider_key(x)
if not idps.get(key_provider_id, {}).get('hide', False):

View File

@ -1,401 +0,0 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import urllib
import urlparse
import httplib
from quixote import get_field, get_request, get_response, get_session, \
get_session_manager, redirect, get_publisher
from quixote.directory import Directory
from quixote.http_request import parse_header
from publisher import get_cfg, get_logger
try:
import lasso
except ImportError:
lasso = None
import misc
from form import *
from template import error_page
import errors
def does_idp_authentication():
methods = get_cfg('identification', {}).get('methods')
return 'idp' in methods
class LibertyDirectory(Directory):
_q_exports = ['login', 'assertionConsumer', 'soapEndpoint',
'singleLogout', 'singleLogoutReturn',
'federationTermination', 'federationTerminationReturn',
('metadata.xml', 'metadata'), 'public_key']
def _q_traverse(self, path):
# if lasso is not installed, hide the liberty endpoints
if lasso is None:
if does_idp_authentication():
rel_path = os.path.join('/liberty', *path)
get_logger().error(
'%s unavailable - lasso is not installed' % rel_path)
raise errors.TraversalError()
return Directory._q_traverse(self, path)
def perform_login(self, idp = None):
server = misc.get_lasso_server()
login = lasso.Login(server)
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT)
login.request.nameIdPolicy = 'federated'
login.request.forceAuthn = False
login.request.isPassive = False
login.request.consent = 'urn:liberty:consent:obtained'
login.buildAuthnRequestMsg()
return redirect(login.msgUrl)
def assertionConsumer(self):
server = misc.get_lasso_server()
if not server:
return error_page(_('Liberty support is not yet configured'))
login = lasso.Login(server)
request = get_request()
if request.get_method() == 'GET' or get_field('LAREQ'):
if request.get_method() == 'GET':
try:
login.initRequest(request.get_query(), lasso.HTTP_METHOD_REDIRECT)
except lasso.Error, error:
return error_page(_('Failed to parse query'))
else:
login.initRequest(get_field('LAREQ'), lasso.HTTP_METHOD_POST)
login.buildRequestMsg()
remote_provider_cfg = get_cfg('idp', {}).get(misc.get_provider_key(login.remoteProviderId))
client_cert = remote_provider_cfg.get('clientcertificate')
try:
soap_answer = soap_call(login.msgUrl, login.msgBody, client_cert = client_cert)
except SOAPException:
return error_page(_('Failure to communicate with identity provider'))
try:
login.processResponseMsg(soap_answer)
except lasso.Error, error:
if error[0] == lasso.LOGIN_ERROR_STATUS_NOT_SUCCESS:
return error_page(_('Unknown authentication failure'))
if hasattr(lasso, 'LOGIN_ERROR_UNKNOWN_PRINCIPAL'):
if error[0] == lasso.LOGIN_ERROR_UNKNOWN_PRINCIPAL:
return error_page(_('Authentication failure; unknown principal'))
if error[0] == lasso.LOGIN_ERROR_FEDERATION_NOT_FOUND:
return error_page('there was no federation')
return error_page(_("Identity Provider didn't accept artifact transaction."))
else:
login.processAuthnResponseMsg(get_field('LARES'))
login.acceptSso()
session = get_session()
if login.isSessionDirty:
if login.session:
session.lasso_session_dump = login.session.dump()
else:
session.lasso_session_dump = None
user = self.lookup_user(session, login)
if user:
session.set_user(user.id)
else:
session.set_user('anonymous-%s' % login.nameIdentifier.content)
session.lasso_anonymous_identity_dump = login.identity.dump()
session.lasso_identity_provider_id = login.remoteProviderId
response = get_response()
if session.after_url:
after_url = session.after_url
session.after_url = None
return redirect(after_url)
response.set_status(303)
response.headers['location'] = urlparse.urljoin(request.get_url(), str('..'))
response.content_type = 'text/plain'
return 'Your browser should redirect you'
def lookup_user(self, session, login):
ni = login.nameIdentifier.content
session.name_identifier = ni
nis = list(get_publisher().user_class.get_users_with_name_identifier(ni))
if nis:
user = nis[0]
else:
return None
user.lasso_dump = login.identity.dump()
user.store()
return user
def singleLogout(self):
request = get_request()
logout = lasso.Logout(misc.get_lasso_server())
if lasso.isLibertyQuery(request.get_query()):
request = get_request()
try:
logout.processRequestMsg(request.get_query())
except lasso.Error, error:
if error[0] == lasso.DS_ERROR_INVALID_SIGNATURE:
return error_page(_('Failed to check single logout request signature.'))
raise
session = get_session()
if not session.id:
# session has not been found, this may be because the user has
# its browser configured so that cookies are not sent for
# remote queries and IdP is using image-based SLO.
# so we look up a session with the appropriate name identifier
name_identifier = logout.nameIdentifier.content
sessions = get_session_manager().get_session_for_liberty(
name_identifier, logout.request.sessionIndex)
if sessions:
session = session[0]
else:
session = get_session()
return self.slo_idp(logout, session)
else:
return self.slo_sp(logout, get_session())
def singleLogoutReturn(self):
logout = lasso.Logout(misc.get_lasso_server())
try:
logout.processResponseMsg(get_request().get_query())
except lasso.Error, error:
if error[0] == lasso.PROFILE_ERROR_INVALID_QUERY:
raise AccessError()
if error[0] == lasso.DS_ERROR_INVALID_SIGNATURE:
return error_page(_('Failed to check single logout request signature.'))
if hasattr(lasso, 'LOGOUT_ERROR_REQUEST_DENIED') and \
error[0] == lasso.LOGOUT_ERROR_REQUEST_DENIED:
# ignore silently
return redirect(get_publisher().get_root_url())
elif error[0] == lasso.ERROR_UNDEFINED:
# XXX: unknown status; ignoring for now.
return redirect(get_publisher().get_root_url())
return redirect(get_publisher().get_root_url())
def slo_idp(self, logout, session):
# Single Logout initiated by IdP
if session and session.lasso_session_dump:
logout.setSessionFromDump(session.lasso_session_dump)
user = None
if session:
user = session.get_user()
if user and user.lasso_dump:
logout.setIdentityFromDump(user.lasso_dump)
if session and logout.nameIdentifier.content != session.name_identifier:
raise Exception("no appropriate name identifier in session (%s and %s)" % (
logout.nameIdentifier.content, session.name_identifier))
try:
logout.validateRequest()
except lasso.Error, error:
if error[0] == lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
pass
elif error[0] == lasso.PROFILE_ERROR_IDENTITY_NOT_FOUND:
pass
else:
raise
else:
get_session_manager().expire_session()
# Expire all linked sessions
sessions = get_session_manager().get_session_for_liberty(
logout.nameIdentifier.content, logout.request.sessionIndex)
for session in sessions:
try:
del get_session_manager()[session.id]
except KeyError:
pass
logout.buildResponseMsg()
if logout.msgBody: # soap answer
return logout.msgBody
else:
return redirect(logout.msgUrl)
def slo_sp(self, logout, session):
if not session.user:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if session.lasso_session_dump:
logout.setSessionFromDump(session.lasso_session_dump)
user = get_request().user
if user and user.lasso_dump:
logout.setIdentityFromDump(user.lasso_dump)
return self.slo_sp_redirect(logout)
def slo_sp_redirect(self, logout):
try:
logout.initRequest(None, lasso.HTTP_METHOD_REDIRECT)
except lasso.Error, error:
if error[0] == lasso.PROFILE_ERROR_NAME_IDENTIFIER_NOT_FOUND:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if error[0] == lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
raise
logout.buildRequestMsg()
get_session_manager().expire_session()
return redirect(logout.msgUrl)
def soapEndpoint(self):
request = get_request()
ctype = request.environ.get('CONTENT_TYPE')
if not ctype:
return # XXX: error code
ctype, ctype_params = parse_header(ctype)
if ctype != 'text/xml':
return # XXX: error code
response = get_response()
response.set_content_type('text/xml')
length = int(request.environ.get('CONTENT_LENGTH'))
soap_message = request.stdin.read(length)
request_type = lasso.getRequestTypeFromSoapMsg(soap_message)
if request_type == lasso.REQUEST_TYPE_LOGOUT:
logout = lasso.Logout(misc.get_lasso_server())
logout.processRequestMsg(soap_message)
name_identifier = logout.nameIdentifier.content
sessions = get_session_manager().get_session_for_liberty(
name_identifier, logout.request.sessionIndex)
if sessions:
session = sessions[0]
else:
session = None
return self.slo_idp(logout, session)
if request_type == lasso.REQUEST_TYPE_DEFEDERATION:
defederation = lasso.Defederation(misc.get_lasso_server())
defederation.processNotificationMsg(soap_message)
name_identifier = defederation.nameIdentifier.content
user = None
for session in get_session_manager().values():
if name_identifier == session.name_identifier:
break
else:
nis = list(get_publisher().user_class.get_users_with_name_identifier(
name_identifier))
if not nis:
raise Exception('federation not found')
user = nis[0]
session = None
return self.fedterm(defederation, session, user)
response.set_status(501)
request.soap_request_type = request_type
request.soap_message = soap_message
return 'Unimplemented SOAP method'
def federationTermination(self):
request = get_request()
if not lasso.isLibertyQuery(request.get_query()):
return redirect('.')
defederation = lasso.Defederation(misc.get_lasso_server())
defederation.processNotificationMsg(request.get_query())
session = get_session()
return self.fedterm(defederation, session)
def fedterm(self, defederation, session, user = None):
if session:
defederation.setSessionFromDump(session.lasso_session_dump)
if not user:
user = session.get_user()
if user and user.lasso_dump:
defederation.setIdentityFromDump(user.lasso_dump)
try:
defederation.validateNotification()
except lasso.Error, error:
pass # ignore failure (?)
else:
if not defederation.identity:
# if it was the last federation the whole identity dump collapsed
user.lasso_dump = None
else:
user.lasso_dump = defederation.identity.dump()
user.store()
if defederation.isSessionDirty:
if defederation.session:
session.lasso_session_dump = defederation.session.dump()
else:
session.lasso_session_dump = None
if defederation.msgUrl:
return redirect(defederation.msgUrl)
else:
get_session_manager().commit_changes(session)
response = get_response()
response.set_status(204)
return ''
def federationTerminationReturn(self):
return redirect(get_publisher().get_root_url())
def metadata(self):
try:
metadata = unicode(open(misc.get_abs_path(
get_cfg('sp')['metadata'])).read(), 'utf-8')
except KeyError:
raise errors.TraversalError()
response = get_response()
response.set_content_type('text/xml', 'utf-8')
return metadata.encode('utf-8')
def public_key(self):
response = get_response()
response.set_content_type('application/octet-stream')
publickey = open(misc.get_abs_path(get_cfg('sp')['publickey'])).read()
return publickey
class SOAPException(Exception):
url = None
def __init__(self, url):
self.url = url
def soap_call(url, msg, client_cert = None):
if url.startswith('http://'):
host, query = urllib.splithost(url[5:])
conn = httplib.HTTPConnection(host)
else:
host, query = urllib.splithost(url[6:])
conn = httplib.HTTPSConnection(host,
key_file = client_cert, cert_file = client_cert)
try:
conn.request('POST', query, msg, {'Content-Type': 'text/xml'})
response = conn.getresponse()
except Exception, err:
# exception could be raised by request
get_logger().warn('SOAP error (on %s): %s' % (url, err))
raise SOAPException(url)
data = response.read()
conn.close()
if response.status not in (200, 204): # 204 ok for federation termination
get_logger().warn('SOAP error (%s) (on %s)' % (response.status, url))
raise SOAPException(url)
return data

View File

@ -1,208 +0,0 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import x509utils
import os.path
def bool2xs(boolean):
'''Convert a boolean value to XSchema boolean representation'''
if boolean is True:
return 'true'
if boolean is False:
return 'false'
raise TypeError()
class Metadata(object):
__endpoints = {
'slo' : 'singleLogout',
'fedterm' : 'federationTermination',
'rni' : 'registerNameIdentifier',
'ac' : 'assertionConsumer',
'se' : 'soapEndpoint' }
def __init__(self, publisher, provider_id, config):
self.publisher = publisher
self.provider_id = provider_id
self.config = config
def get_key_descriptor(self, keytype, key):
'''Format key as an XML Dsig KeyNode content'''
if keytype:
prologue = ' <KeyDescriptor use="%s">' % keytype
else:
prologue = ' <KeyDescriptor>'
if key and 'CERTIF' in key:
naked = x509utils.decapsulate_pem_file(key)
return prologue + '''
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data><ds:X509Certificate>%s</ds:X509Certificate></ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
''' % naked
# FIXME: generate proper RSAKeyValue, but wait for support in Lasso
elif key and 'KEY' in key:
naked = x509utils.get_xmldsig_rsa_key_value(key)
return prologue + '''
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
%s
</ds:KeyInfo>
</KeyDescriptor>
''' % naked
else:
return ''
def get_new_or_old_keys(self, signing_pem_key, encryption_pem_key):
'''Return new or earlier version of PEM keys'''
dir = self.publisher.app_dir
if not signing_pem_key and self.config.get('publickey'):
signing_pem_key = file(os.path.join(dir, 'public-key.pem')).read()
if not encryption_pem_key and self.config.get('encryption_publickey'):
encryption_pem_key = file(os.path.join(dir, 'encryption-public-key.pem')).read()
return (signing_pem_key, encryption_pem_key)
def get_key_descriptors(self, signing_pem_key, encryption_pem_key):
signing_pem_key, encryption_pem_key = \
self.get_new_or_old_keys(signing_pem_key, encryption_pem_key)
sp_key = {
'signing': self.get_key_descriptor('signing', signing_pem_key),
'encryption': self.get_key_descriptor('encryption', encryption_pem_key) }
if not sp_key['signing'] and sp_key['encryption']:
sp_key = {
'signing': '',
'encryption': self.get_key_descriptor('', encryption_pem_key) }
if sp_key['signing'] and not sp_key['encryption']:
sp_key = {
'signing': self.get_key_descriptor('', signing_pem_key),
'encryption': ''
}
return sp_key
def get_spsso_descriptor(self, signing_pem_key, encryption_pem_key, endpoints):
authnrequestsigned = bool2xs(self.config.get('authn-request-signed', True))
sp_key = self.get_key_descriptors(signing_pem_key, encryption_pem_key)
prologue = ''' <SPDescriptor protocolSupportEnumeration="urn:liberty:iff:2003-08">
'''
config = dict(self.config)
config.update({ 'authn-requests-signed': authnrequestsigned })
config.update(self.__endpoints)
config.update(endpoints)
return prologue + sp_key['signing'] + sp_key['encryption'] + '''
<SoapEndpoint>%(base_url)s/%(se)s</SoapEndpoint>
<SingleLogoutServiceURL>%(base_url)s/%(slo)s</SingleLogoutServiceURL>
<SingleLogoutServiceReturnURL>%(base_url)s/%(slo)sReturn</SingleLogoutServiceReturnURL>
<FederationTerminationServiceURL>%(base_url)s/%(fedterm)s</FederationTerminationServiceURL>
<FederationTerminationServiceReturnURL>%(base_url)s/%(fedterm)sReturn</FederationTerminationServiceReturnURL>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-idp-soap</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-idp-http</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-sp-soap</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-sp-http</FederationTerminationNotificationProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-idp-soap</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-idp-http</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-sp-soap</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-sp-http</SingleLogoutProtocolProfile>
<RegisterNameIdentifierServiceURL>%(base_url)s/%(rni)s</RegisterNameIdentifierServiceURL>
<RegisterNameIdentifierServiceReturnURL>%(base_url)s/%(rni)sReturn</RegisterNameIdentifierServiceReturnURL>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-idp-soap</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-idp-http</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-sp-soap</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-sp-http</RegisterNameIdentifierProtocolProfile>
<AuthnRequestsSigned>%(authn-requests-signed)s</AuthnRequestsSigned>
<AssertionConsumerServiceURL id="AssertionConsumerServiceURL1" isDefault="true">%(base_url)s/%(ac)s</AssertionConsumerServiceURL>
</SPDescriptor>''' % config
def get_idpsso_descriptor(self, signing_pem_key, encryption_pem_key):
idp_key = self.get_key_descriptors(signing_pem_key, encryption_pem_key)
idp_head = """ <IDPDescriptor protocolSupportEnumeration="urn:liberty:iff:2003-08">
"""
idp_body = """
<SoapEndpoint>%(base_soap_url)s/soapEndpoint</SoapEndpoint>
<SingleLogoutServiceURL>%(base_url)s/singleLogout</SingleLogoutServiceURL>
<SingleLogoutServiceReturnURL>%(base_url)s/singleLogoutReturn</SingleLogoutServiceReturnURL>
<FederationTerminationServiceURL>%(base_url)s/federationTermination</FederationTerminationServiceURL>
<FederationTerminationServiceReturnURL>%(base_url)s/federationTerminationReturn</FederationTerminationServiceReturnURL>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-idp-soap</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-idp-http</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-sp-soap</FederationTerminationNotificationProtocolProfile>
<FederationTerminationNotificationProtocolProfile>http://projectliberty.org/profiles/fedterm-sp-http</FederationTerminationNotificationProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-idp-soap</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-idp-http</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-sp-soap</SingleLogoutProtocolProfile>
<SingleLogoutProtocolProfile>http://projectliberty.org/profiles/slo-sp-http</SingleLogoutProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-idp-soap</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-idp-http</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-sp-soap</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierProtocolProfile>http://projectliberty.org/profiles/rni-sp-http</RegisterNameIdentifierProtocolProfile>
<RegisterNameIdentifierServiceURL>%(base_url)s/registerNameIdentifier</RegisterNameIdentifierServiceURL>
<RegisterNameIdentifierServiceReturnURL>%(base_url)s/registerNameIdentifierReturn</RegisterNameIdentifierServiceReturnURL>
<SingleSignOnServiceURL>%(base_url)s/singleSignOn</SingleSignOnServiceURL>
<SingleSignOnProtocolProfile>http://projectliberty.org/profiles/brws-art</SingleSignOnProtocolProfile>
<SingleSignOnProtocolProfile>http://projectliberty.org/profiles/brws-post</SingleSignOnProtocolProfile>
</IDPDescriptor>""" % self.config
return idp_head + idp_key['signing'] + idp_key['encryption'] + idp_body
def get_metadata(self, signing_pem_key = '', encryption_pem_key = '', do_idp = False, do_sp = False, endpoints = {}):
prologue = '''<?xml version="1.0"?>
<EntityDescriptor
providerID="%s"
xmlns="urn:liberty:metadata:2003-08">\n''' % self.provider_id
sp_descriptor = ''
if do_sp:
sp_descriptor = self.get_spsso_descriptor(signing_pem_key, encryption_pem_key, endpoints)
idp_descriptor = ''
if do_idp:
idp_descriptor = self.get_idpsso_descriptor(signing_pem_key, encryption_pem_key)
orga = ''
if self.config.get('organization_name'):
orga = '''<Organization>
<OrganizationName xml:lang="en">%s</OrganizationName>
</Organization>''' % self.publisher.sitecharset2utf8(self.config['organization_name'])
epilogue = '</EntityDescriptor>'
return '\n'.join([prologue, sp_descriptor, idp_descriptor, orga, epilogue])
if __name__ == '__main__':
print 'Testing metadata generation'
class NoPublisher(object):
pass
publisher = NoPublisher()
publisher.app_dir = '/dir'
pkey, _ = x509utils.generate_rsa_keypair()
meta = Metadata(publisher = publisher, config = { 'base_url': 'base_ulr', 'base_soap_url': 'base_soap_url' }, provider_id = 'provider_id_1')
assert meta != None
content = meta.get_saml2_metadata(pkey,'',True,True)
assert isinstance(content, str) and content != ''
print content

View File

@ -43,31 +43,22 @@ def get_abs_path(s):
return s
return os.path.join(get_publisher().app_dir, s)
def get_lasso_server(protocol = 'liberty'):
def get_lasso_server():
if not get_cfg('sp'):
return None
import lasso
if protocol == 'liberty':
server = lasso.Server(
get_abs_path(get_cfg('sp')['metadata']),
get_abs_path(get_cfg('sp')['privatekey']),
None, None)
elif protocol == 'saml2' and get_cfg('sp').has_key('saml2_metadata'):
server = lasso.Server(
get_abs_path(get_cfg('sp')['saml2_metadata']),
get_abs_path(get_cfg('sp')['privatekey']),
None, None)
else:
raise Exception('unknown protocol')
server = lasso.Server(
get_abs_path(get_cfg('sp')['saml2_metadata']),
get_abs_path(get_cfg('sp')['privatekey']),
None, None)
# Set encryption private key
if protocol in ('liberty', 'saml2'):
encryption_privatekey = get_abs_path(get_cfg('sp').get('encryption_privatekey'))
if encryption_privatekey and os.path.exists(encryption_privatekey):
try:
server.setEncryptionPrivateKey(encryption_privatekey)
except lasso.Error, error:
get_logger().warn('Failed to set encryption private key')
encryption_privatekey = get_abs_path(get_cfg('sp').get('encryption_privatekey'))
if encryption_privatekey and os.path.exists(encryption_privatekey):
try:
server.setEncryptionPrivateKey(encryption_privatekey)
except lasso.Error, error:
get_logger().warn('Failed to set encryption private key')
for klp, idp in get_cfg('idp', {}).items():
try:
@ -134,13 +125,6 @@ def get_provider(provider_key):
def get_provider_key(provider_id):
return provider_id.replace('://', '-').replace('/', '-').replace('?', '-').replace(':', '-')
def get_current_protocol():
if not get_session().lasso_identity_provider_id:
return None
provider = get_provider(get_provider_key(get_session().lasso_identity_provider_id))
return provider.getProtocolConformance()
xlate = {
u'\N{ACUTE ACCENT}': "'",

View File

@ -34,10 +34,43 @@ from publisher import get_cfg, get_logger
from qommon import template
from template import error_page
from liberty import SOAPException, soap_call, does_idp_authentication
import errors
class SOAPException(Exception):
url = None
def __init__(self, url):
self.url = url
def does_idp_authentication():
methods = get_cfg('identification', {}).get('methods')
return 'idp' in methods
def soap_call(url, msg, client_cert = None):
if url.startswith('http://'):
host, query = urllib.splithost(url[5:])
conn = httplib.HTTPConnection(host)
else:
host, query = urllib.splithost(url[6:])
conn = httplib.HTTPSConnection(host,
key_file = client_cert, cert_file = client_cert)
try:
conn.request('POST', query, msg, {'Content-Type': 'text/xml'})
response = conn.getresponse()
except Exception, err:
# exception could be raised by request
get_logger().warn('SOAP error (on %s): %s' % (url, err))
raise SOAPException(url)
data = response.read()
conn.close()
if response.status not in (200, 204): # 204 ok for federation termination
get_logger().warn('SOAP error (%s) (on %s)' % (response.status, url))
raise SOAPException(url)
return data
def soap_endpoint(method):
def f(*args, **kwargs):
if get_request().get_method() != 'POST':
@ -107,7 +140,7 @@ class Saml2Directory(Directory):
'metadata', ('metadata.xml', 'metadata'), 'public_key']
def _q_traverse(self, path):
# if lasso is not installed, hide the liberty endpoints
# if lasso is not installed, hide the saml endpoints
if lasso is None:
if does_idp_authentication():
rel_path = os.path.join('/saml', *path)
@ -149,7 +182,7 @@ class Saml2Directory(Directory):
@soap_endpoint
def assertionConsumerSOAP(self):
request = get_request()
server = misc.get_lasso_server(protocol = 'saml2')
server = misc.get_lasso_server()
if not server:
return error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
@ -174,7 +207,7 @@ class Saml2Directory(Directory):
return self.perform_login()
def perform_login(self, idp = None):
server = misc.get_lasso_server(protocol = 'saml2')
server = misc.get_lasso_server()
if not server:
return error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
@ -188,7 +221,7 @@ class Saml2Directory(Directory):
return redirect(login.msgUrl)
def assertionConsumerArtifact(self):
server = misc.get_lasso_server(protocol = 'saml2')
server = misc.get_lasso_server()
if not server:
return error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
@ -361,7 +394,7 @@ class Saml2Directory(Directory):
return self.assertion_consumer_process(query_string)
def assertion_consumer_process(self, message):
server = misc.get_lasso_server(protocol = 'saml2')
server = misc.get_lasso_server()
if not server:
return error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
@ -459,7 +492,7 @@ class Saml2Directory(Directory):
if method is None:
method = lasso.HTTP_METHOD_REDIRECT
logout = lasso.Logout(misc.get_lasso_server(protocol = 'saml2'))
logout = lasso.Logout(misc.get_lasso_server())
session = get_session()
if session.lasso_session_dump:
@ -491,7 +524,7 @@ class Saml2Directory(Directory):
def singleLogoutReturn(self):
logout = lasso.Logout(misc.get_lasso_server(protocol = 'saml2'))
logout = lasso.Logout(misc.get_lasso_server())
if get_session().lasso_session_dump:
logout.setSessionFromDump(get_session().lasso_session_dump)
message = get_request().get_query()
@ -545,7 +578,7 @@ class Saml2Directory(Directory):
if method is None:
method = lasso.HTTP_METHOD_REDIRECT
manage = lasso.NameIdManagement(misc.get_lasso_server(protocol = 'saml2'))
manage = lasso.NameIdManagement(misc.get_lasso_server())
session = get_session()
if session.lasso_session_dump:
@ -589,7 +622,7 @@ class Saml2Directory(Directory):
# remote queries and IdP is using image-based SLO.
# so we look up a session with the appropriate name identifier
name_identifier = nameid.content
sessions = get_session_manager().get_sessions_for_liberty(nameid.content,
sessions = get_session_manager().get_sessions_for_saml(nameid.content,
session_indexes)
session_manager = get_session_manager()
for session in sessions:
@ -617,7 +650,7 @@ class Saml2Directory(Directory):
return self.slo_idp(get_request().get_query())
def slo_idp(self, message, soap = False):
logout = lasso.Logout(misc.get_lasso_server(protocol = 'saml2'))
logout = lasso.Logout(misc.get_lasso_server())
try:
logout.processRequestMsg(message)
except lasso.Error, error:
@ -626,7 +659,7 @@ class Saml2Directory(Directory):
return self.slo_idp_finish(logout, soap)
try:
sessions = get_session_manager().get_sessions_for_liberty(
sessions = get_session_manager().get_sessions_for_saml(
logout.nameIdentifier.content, logout.request.sessionIndexes)
sessions = list(sessions)
if sessions:
@ -697,7 +730,7 @@ class Saml2Directory(Directory):
return self.manage_name_id_return(manage, soap_answer)
def manageNameId(self):
manage = lasso.NameIdManagement(misc.get_lasso_server(protocol = 'saml2'))
manage = lasso.NameIdManagement(misc.get_lasso_server())
try:
manage.processRequestMsg(get_request().get_query())
except lasso.Error, error:
@ -721,7 +754,7 @@ class Saml2Directory(Directory):
except:
return
manage = lasso.NameIdManagement(misc.get_lasso_server(protocol = 'saml2'))
manage = lasso.NameIdManagement(misc.get_lasso_server())
manage.processRequestMsg(soap_message)
ni = manage.nameIdentifier.content
@ -774,11 +807,11 @@ class Saml2Directory(Directory):
def manageNameIdReturn(self):
if get_session().lasso_manage_name_id_dump:
manage = lasso.NameIdManagement.newFromDump(
misc.get_lasso_server(protocol = 'saml2'),
misc.get_lasso_server(),
get_session().lasso_manage_name_id_dump)
get_session().lasso_manage_name_id_dump = None
else:
manage = lasso.NameIdManagement(misc.get_lasso_server(protocol = 'saml2'))
manage = lasso.NameIdManagement(misc.get_lasso_server())
message = get_request().get_query()
return self.manage_name_id_return(manage, message)
@ -817,6 +850,3 @@ class Saml2Directory(Directory):
singleSignOnPost = assertionConsumerPost
singleSignOnSOAP = assertionConsumerSOAP
singleSignOnRedirect = assertionConsumerRedirect

View File

@ -14,10 +14,20 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import x509utils
import libertyutils
import os
class Metadata(libertyutils.Metadata):
import x509utils
def bool2xs(boolean):
'''Convert a boolean value to XSchema boolean representation'''
if boolean is True:
return 'true'
if boolean is False:
return 'false'
raise TypeError()
class Metadata(object):
__endpoints = {
'slo' : 'singleLogout',
'mni' : 'manageNameId',
@ -28,13 +38,66 @@ class Metadata(libertyutils.Metadata):
self.provider_id = provider_id
self.config = config
def get_key_descriptor(self, keytype, key):
'''Format key as an XML Dsig KeyNode content'''
if keytype:
prologue = ' <KeyDescriptor use="%s">' % keytype
else:
prologue = ' <KeyDescriptor>'
if key and 'CERTIF' in key:
naked = x509utils.decapsulate_pem_file(key)
return prologue + '''
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data><ds:X509Certificate>%s</ds:X509Certificate></ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
''' % naked
# FIXME: generate proper RSAKeyValue, but wait for support in Lasso
elif key and 'KEY' in key:
naked = x509utils.get_xmldsig_rsa_key_value(key)
return prologue + '''
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
%s
</ds:KeyInfo>
</KeyDescriptor>
''' % naked
else:
return ''
def get_key_descriptors(self, signing_pem_key, encryption_pem_key):
signing_pem_key, encryption_pem_key = \
self.get_new_or_old_keys(signing_pem_key, encryption_pem_key)
sp_key = {
'signing': self.get_key_descriptor('signing', signing_pem_key),
'encryption': self.get_key_descriptor('encryption', encryption_pem_key) }
if not sp_key['signing'] and sp_key['encryption']:
sp_key = {
'signing': '',
'encryption': self.get_key_descriptor('', encryption_pem_key) }
if sp_key['signing'] and not sp_key['encryption']:
sp_key = {
'signing': self.get_key_descriptor('', signing_pem_key),
'encryption': ''
}
return sp_key
def get_new_or_old_keys(self, signing_pem_key, encryption_pem_key):
'''Return new or earlier version of PEM keys'''
dir = self.publisher.app_dir
if not signing_pem_key and self.config.get('publickey'):
signing_pem_key = file(os.path.join(dir, 'public-key.pem')).read()
if not encryption_pem_key and self.config.get('encryption_publickey'):
encryption_pem_key = file(os.path.join(dir, 'encryption-public-key.pem')).read()
return (signing_pem_key, encryption_pem_key)
def get_spsso_descriptor(self, signing_pem_key, encryption_pem_key,
endpoints):
signing_pem_key, encryption_pem_key = \
self.get_new_or_old_keys(signing_pem_key, encryption_pem_key)
authnrequestsigned = libertyutils.bool2xs(self.config.get('authn-request-signed', True))
wantassertionsigned = libertyutils.bool2xs(self.config.get('want-assertion-signed', True))
authnrequestsigned = bool2xs(self.config.get('authn-request-signed', True))
wantassertionsigned = bool2xs(self.config.get('want-assertion-signed', True))
prologue = ''' <SPSSODescriptor
AuthnRequestsSigned="%s" WantAssertionsSigned="%s"
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
@ -82,7 +145,7 @@ class Metadata(libertyutils.Metadata):
<IDPSSODescriptor
WantAuthnRequestsSigned="%s"
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
""" % libertyutils.bool2xs(self.config.get('want-authn-request-signed', True))
""" % bool2xs(self.config.get('want-authn-request-signed', True))
idp_body = """
<ArtifactResolutionService isDefault="true" index="0"
Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP"

View File

@ -277,7 +277,7 @@ class StorageSessionManager(QommonSessionManager):
if session:
session.clean_tempfiles()
def get_sessions_for_liberty(self, name_identifier = Ellipsis, \
def get_sessions_for_saml(self, name_identifier = Ellipsis, \
session_indexes = ()):
ret = (x for x in self.values() \
if (not session_indexes \
@ -285,12 +285,12 @@ class StorageSessionManager(QommonSessionManager):
and name_identifier in (x.name_identifier or []))
return ret
def get_session_for_liberty(self, name_identifier = None, session_index = None):
def get_session_for_saml(self, name_identifier = None, session_index = None):
if session_index:
session_indexes = (session_index,)
else:
session_indexes = ()
for session in self.get_sessions_for_liberty(name_identifier,
for session in self.get_sessions_for_saml(name_identifier,
session_indexes):
return session
return None

View File

@ -29,7 +29,6 @@ from quixote.html import htmltext, TemplateIO
from quixote.util import StaticDirectory
import forms.root
import liberty
from qommon import saml2
from qommon import errors
@ -81,7 +80,7 @@ class LoginDirectory(Directory):
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
ident_methods = ['idp'] # fallback to old behaviour; saml.
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
# if isPassive is given in query parameters, we restrict ourselves
@ -146,7 +145,7 @@ class RegisterDirectory(Directory):
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
ident_methods = ['idp'] # fallback to old behaviour; saml.
if len(ident_methods) == 1:
method = ident_methods[0]
@ -188,7 +187,7 @@ class RegisterDirectory(Directory):
class RootDirectory(Directory):
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'liberty', 'token', 'saml',
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'token', 'saml',
'ident', 'register', 'afterjobs', 'themes', 'myspace', 'user', 'roles',
'pages', ('tmp-upload', 'tmp_upload'), 'api', '__version__']
@ -219,11 +218,7 @@ class RootDirectory(Directory):
# add settings to disable single logout?
# (and to set it as none/get/soap?)
import lasso
if misc.get_current_protocol() == lasso.PROTOCOL_SAML_2_0:
return self.saml.slo_sp()
else:
return self.liberty.singleLogout()
return self.saml.slo_sp()
def token(self):
if not get_request().user:
@ -379,7 +374,6 @@ class RootDirectory(Directory):
backoffice = None
saml = saml2.Saml2Directory()
liberty = liberty.LibertyDirectory()
forms = CompatibilityDirectory()
login = LoginDirectory()
register = RegisterDirectory()