This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
expression/src/modules/libertyalliance.py

477 lines
20 KiB
Python

# -*- coding: UTF-8 -*-
# Expression
# By: Frederic Peters <fpeters@entrouvert.com>
# Emmanuel Raviart <eraviart@entrouvert.com>
#
# Copyright (C) 2004 Entr'ouvert, Frederic Peters & Emmanuel Raviart
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Liberty Alliance Module"""
import httplib
import os
import urllib
import urlparse
try:
import lasso
except ImportError:
lasso = None
else:
lasso.init()
import libxml2
import expression.core.directories as directories
import expression.core.documents as documents
import expression.core.elements as elements
import expression.core.environs as environs
import expression.core.faults as faults
import expression.core.html as html
import expression.core.http as http
import expression.core.locations as locations
import expression.core.logs as logs
import expression.core.modules as modules
import expression.core.namespaces as namespaces
import expression.core.sessions as sessions
import expression.core.stations as stations
import expression.core.strings as strings
import identities
deleteSessionOnLogout = True
#symmetricKey = "Expression rules!" # FIXME
class EntityDescriptor(elements.Element):
def getAssertionConsumerServiceUrl(self, id):
if id:
nodes = self.evaluateXpath("md:SPDescriptor/md:AssertionConsumerServiceURL[@id = '%s']"
% id.replace("'", "&apos;"))
else:
nodes = None
if not nodes:
nodes = self.evaluateXpath(
"md:SPDescriptor/md:AssertionConsumerServiceURL[@isDefault = 'true']")
if not nodes:
return None
return nodes[0].content
def getProviderId(self):
nodes = self.evaluateXpath("@providerID")
if not nodes:
return None
return nodes[0].content
def getSingleSignOnProtocolProfiles(self):
nodes = self.evaluateXpath("md:IDPDescriptor/md:SingleSignOnProtocolProfile")
return [node.content for node in nodes]
def getSingleSignOnServiceUrl(self):
nodes = self.evaluateXpath("md:IDPDescriptor/md:SingleSignOnServiceURL")
if not nodes:
return None
return nodes[0].content
def getSoapEndpoint(self):
nodes = self.evaluateXpath("md:IDPDescriptor/md:SoapEndpoint")
if not nodes:
return None
return nodes[0].content
providerId = property(getProviderId)
singleSignOnProtocolProfiles = property(getSingleSignOnProtocolProfiles)
singleSignOnServiceUrl = property(getSingleSignOnServiceUrl)
soapEndpoint = property(getSoapEndpoint)
class LibertyAlliance(directories.Directory):
def afterAuthentication(self, action = "afterAuthentication", authnResponse = None):
"""Liberty Alliance Identity Provider Method which is called once the user authentication
requested by singleSignOn has been attempted.
Handles HTTP GET.
"""
command = environs.getVar("httpCommand")
if command != "GET":
raise faults.PathNotFound(environs.getVar("currentStation"), "")
success = True
session = environs.getVar("session")
if session is None:
success = False
if success:
if authnResponse is None:
authnResponse = lasso.AuthnResponse.new_from_dump(
session.libertyAllianceAuthenticationResponse)
user = environs.getVar("user")
if user is None:
success = False
if success:
authenticationRequest = authnResponse.request
serviceProviderId = authenticationRequest.get_child(
"ProviderID").get_content()
serviceProviderMetadata = self.getServiceProviderMetadata(
serviceProviderId)
if serviceProviderMetadata is None:
success = False
authnResponse.process_authentication_result(success)
if success:
assertion = lasso.Assertion(
"issuer",
authnResponse.get_attr_value("InResponseTo"))
authenticationMethod = session.authenticationMethod
if authenticationMethod == "password":
authenticationMethod = lasso.samlAuthenticationMethodPassword
else:
raise Exception(
"Unknown authentication method %s" % authenticationMethod)
identification = user.getIdentification(serviceProviderId)
# No service identification found, create it.
if identification is None:
if user.identifications is None:
user.node.newTextChild(None, "identifications", None)
identifications = user.identifications
identificationNode = identifications.node.newTextChild(
None, "identification", None)
identification = identifications.newElement(identificationNode)
identification.providerId = serviceProviderId
identification.localNameIdentifier \
= 'CDSICDSC7SD89CDSDCDCDCDISCDUIDSI344343'#lassoTools.generateNameIdentifier(serviceProviderId)
user.identifications.append(identificationNode)
user.getDocument().save()
# Process the NameIDPolicy.
# Remove the next lines.
nameIdentifierPolicy = "federated"
nameIdentifier = identification.peerNameIdentifier
if not nameIdentifier:
nameIdentifier = identification.localNameIdentifier
assert nameIdentifier
## nameIdentifierPolicy = authenticationRequest.getNameIDPolicy()
## nameIdentifier = None
## if nameIdentifierPolicy == "none":
## # No federation, just return the peer/local name identifier.
## nameIdentifier = identification.peerNameIdentifier
## if not nameIdentifier:
## nameIdentifier = identification.localNameIdentifier
## assert nameIdentifier
## elif nameIdentifierPolicy == "onetime":
## # One time federation.
## nameIdentifier = lassoTools.generateNameIdentifier(
## peerProviderId)
## raise "FIXME: Store new nameIdentifier in identification?"
## elif nameIdentifierPolicy == "federated":
## # Federation. Return peer/local/new name identifier
## nameIdentifier = identification.peerNameIdentifier
## if not nameIdentifier:
## nameIdentifier = identification.localNameIdentifier
## assert nameIdentifier
## elif nameIdentifierPolicy == "any":
## # Federated else none.
## consent = authenticationRequest.getConsent()
## if consent:
## nameIdentifier = identification.peerNameIdentifier
## if not nameIdentifier:
## nameIdentifier = identification.localNameIdentifier
## assert nameIdentifier
## else:
## raise "FIXME: No consent given by the principal"
# Get nameIdentifiers formats.
nameIdentifierFormat = "federated"
idpNameIdentifierFormat = "federated"
if nameIdentifierPolicy == "onetime":
nameIdentifierFormat = "onetime"
idpNameIdentifierFormat = "onetime"
authenticationStatement = lasso.AuthenticationStatement(
authenticationMethod,
"2005-05-03T16:12:00Z", # reauthenticateOnOrAfter
nameIdentifier = nameIdentifier,
nameQualifier = serviceProviderId,
format = nameIdentifierFormat,
idp_nameIdentifier = identification.localNameIdentifier,
idp_nameQualifier = self.metadata.providerId,
idp_format = idpNameIdentifierFormat)
assertion.add_authenticationStatement(authenticationStatement)
assertion.set_signature(1, self.privateKeyFilePath, self.certificateFilePath)
authnResponse.add_assertion(assertion)
protocolProfileLassoNode = authenticationRequest.get_child(
"ProtocolProfile")
if protocolProfileLassoNode is None:
protocolProfile = "artifact"
else:
protocolProfile = protocolProfileLassoNode.get_content()
# FIXME: To remove.
if protocolProfile != lasso.libProtocolProfilePost:
logs.debug(
"Using POST protocol profile instead of %s." % protocolProfile)
protocolProfile = "post"
## if protocolProfile == lasso.libProtocolProfileArtifact:
## # Send an artifact, keep the assertion for later retrieval.
## raise "BOUM"
## assertionsProxy = getProxyForServerRole("assertions")
## artifact = assertionsProxy.addAssertion(assertion.exportToString())
## url = sso.getServiceProviderAssertionArtifactHandlerUrl(
## serviceProviderMetadata.assertionConsumerServiceUrl, # FIXME
## artifact,
## authenticationRequest.getRelayState())
## return redirect(url)
if protocolProfile == lasso.libProtocolProfilePost:
import base64
lares = authnResponse.export_to_base64()
# Remove this line and uncomment the next ones.
assertionConsumerServiceId = None
## assertionConsumerServiceIdLassoNode \
## = authenticationRequest.get_child("AssertionConsumerServiceID")
## if assertionConsumerServiceIdLassoNode is None:
## assertionConsumerServiceId = None
## else:
## assertionConsumerServiceId \
## = assertionConsumerServiceIdLassoNode.get_content()
actionUrl = serviceProviderMetadata.getAssertionConsumerServiceUrl(
assertionConsumerServiceId)
layout = html.html(
html.head(html.title(_("Authentication Succeeded"))),
html.body(
html.form(
html.input(
name = "LARES", type = "hidden", value = lares),
html.p(_("""\
You have been succesfully authenticated; click ok to go back to the service provider.\
""")),
html.div(
html.span(
html.input(
class_ = "button", name = "submit", type = "submit",
value = _("OK")),
class_ = "action-buttons-bar"),
class_ = "buttons-bar"),
action = actionUrl,
enctype = "multipart/form-data", method = "post")))
htmlDocument = documents.newTemporaryHtmlDocument()
htmlDocument.append(layout)
stylesheet = self.getDataHolder().getSiteXslt()
self.outputHttpHtmlDocument(htmlDocument, stylesheet)
else:
# Unknown protocol profile. We dont know what to do :
raise "FIXME: Unknow protocol profile"
def assertionConsumer(self):
"""Liberty Alliance service provider method which processes an authentication response
coming (indirectly) from identity provider.
Handles HTTP GET & POST.
"""
command = environs.getVar("httpCommand")
if command == "POST":
submission = environs.getVar("submission")
lares = submission.getField("LARES")
response = lasso.AuthnResponse.new_from_export(lares, type=1)
logs.debug("Response = %s" % response)
## assertion = response.get_child("Assertion")
## if assertion:
## print "Signature check: ", assertion.verify_signature("/home/nico/sources/expression/cvs-expression/vhosts/service-provider/security/public/identity-provider-liberty-alliance.crt")
## status_code = res.get_child("StatusCode")
## print "Resultat authentification:", status_code.get_attr_value("Value")
## #certificateFilePath = property(getCertificateFilePath)
## #certificationAuthorityCertificateFilePath
else:
raise faults.PathNotFound(environs.getVar("currentStation"), "")
def getCertificateFilePath(self):
nodes = self.evaluateXpath("yep:certificateFile")
if nodes:
return os.path.normpath(os.path.join(self.getAbsolutePath(), nodes[0].content.strip()))
else:
return None
def getCertificationAuthorityCertificateFilePath(self):
nodes = self.evaluateXpath("yep:caCertificateFile")
if nodes:
return os.path.normpath(os.path.join(self.getAbsolutePath(), nodes[0].content.strip()))
else:
return None
def getIdentityProviderMetadata(self):
# FIXME: Temporary element name: yep:identityProviderMetadata
nodes = self.evaluateXpath("yep:identityProvider/@src")
if not nodes:
return None
location = nodes[0].content.strip()
try:
metadataHolder = self.walkToLocation(self.getSubPathInternUri(
location))
except faults.PathNotFound:
return None
return metadataHolder.getRootElement()
def getMetadata(self):
try:
metadataHolder = self.walkToLocation(self.getSubPathInternUri(
"metadata.xml"))
except faults.PathNotFound:
return None
return metadataHolder.getRootElement()
def getPeerPublicKeyFilePath(self):
nodes = self.evaluateXpath("yep:peerPublicKeyFile")
if nodes:
return os.path.normpath(os.path.join(self.getAbsolutePath(), nodes[0].content.strip()))
else:
return None
def getPrivateKeyFilePath(self):
nodes = self.evaluateXpath("yep:privateKeyFile")
if nodes:
return os.path.normpath(os.path.join(self.getAbsolutePath(), nodes[0].content.strip()))
else:
return None
def getServiceProviderMetadata(self, providerId):
try:
metadataHolder = self.walkToLocation(self.getSubPathInternUri(
"service-providers/%s" % strings.simplify(providerId)))
except faults.PathNotFound:
logs.debug("Service provider not found: %s" % providerId)
return None
return metadataHolder.getRootElement()
def login(self):
"""Liberty Alliance service provider method that builds an authentication requests and then
redirects to the identity provider login page.
Handles HTTP GET.
"""
command = environs.getVar("httpCommand")
if command != "GET":
raise faults.PathNotFound(environs.getVar("currentStation"), "")
authnRequest = lasso.AuthnRequest(self.metadata.providerId)
# Compute the crypted relayState from nextUrl.
submission = environs.getVar("submission")
nextUrl = submission.getField("nextUrl")
if not nextUrl:
nextUrl = None
if nextUrl:
# FIXME: Encrypt relayState.
# relayState = lassoTools.encryptRelayState(nextUrl, symmetricKey)
relayState = nextUrl
else:
relayState = None
cryptedRelayState = relayState # FIXME: To do.
authnRequest.set_relayState(cryptedRelayState)
authnRequest.set_forceAuthn(False)
authnRequest.set_isPassive(False)
authnRequest.set_protocolProfile(lasso.libProtocolProfilePost)
authnRequest.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
authnRequest.set_requestAuthnContext(
[
lasso.samlAuthenticationMethodSmartcardPki,
lasso.samlAuthenticationMethodSoftwarePki,
lasso.samlAuthenticationMethodPassword,
], # AuthContextClassRefs
None, # AuthnContextStatementRefs
lasso.libAuthnContextComparisonExact)
# authnRequest.set_scoping(proxyCount=1)
ssoQuery = authnRequest.export_to_query(
1, # RSA/SHA1
self.privateKeyFilePath)
authnRequest.destroy() # Should be done automatically.
identityProviderMetadata = self.identityProviderMetadata
assert identityProviderMetadata is not None
ssoUrl = "%s?%s" % (
identityProviderMetadata.singleSignOnServiceUrl, ssoQuery)
return environs.getVar("httpRequestHandler").outputRedirect(
self, ssoUrl)
def singleSignOn(self):
"""Liberty Alliance Identity Provider Method which processes an
authentication request coming (indirectly) from a service provider.
Handles HTTP GET.
"""
command = environs.getVar("httpCommand")
if command != "GET":
raise faults.PathNotFound(environs.getVar("currentStation"), "")
ssoQuery = locations.cleanUpQuery(
environs.getVar("httpQuery"), "sessionToken")
protocolProfile = lasso.authn_request_get_protocolProfile(ssoQuery)
if protocolProfile == lasso.libProtocolProfilePost:
authnResponse = lasso.AuthnResponse.new_from_request_query(
ssoQuery, self.metadata.providerId)
if authnResponse is None:
raise faults.PathForbidden(self, "")
if not authnResponse.verify_signature(
self.peerPublicKeyFilePath, # FIXME: Use a certificate instead & How to know which SP?
self.privateKeyFilePath # Should be removed. Used only to generate a fake signature.
): # Change this test to compare to 0.
# FIXME: Retourner le post d'erreur.
raise "BOUM"
user = environs.getVar("user")
if authnResponse.must_authenticate(
is_authenticated = user is not None):
# Save the authentication response. It will be used after
# authentication.
session = sessions.getOrCreateSession()
session.publishToken = True
session.libertyAllianceAuthenticationResponse \
= authnResponse.dump()
return self.walkToLocation(
"/accounts/passwords/login", "GET",
nextUrl = self.getActionUri("afterAuthentication"))
else:
return self.afterAuthentication("singleSignOn", authnResponse)
# FIXME: Handle the case no passive.
else:
# FIXME: Gérer l'artifact.
raise "BOUM"
certificateFilePath = property(getCertificateFilePath)
certificationAuthorityCertificateFilePath = property(
getCertificationAuthorityCertificateFilePath)
identityProviderMetadata = property(getIdentityProviderMetadata)
metadata = property(getMetadata)
peerPublicKeyFilePath = property(getPeerPublicKeyFilePath)
privateKeyFilePath = property(getPrivateKeyFilePath)
elements.registerElement(
namespaces.md.uri, "EntityDescriptor", EntityDescriptor,
"http://www.entrouvert.org/expression/schemas/liberty-metadata-v1.0.xsd")
elements.registerElement(
namespaces.yep.uri, "libertyAlliance", LibertyAlliance,
"http://www.entrouvert.org/expression/schemas/Directory.xsd",
"http://www.entrouvert.org/expression/descriptions/Directory.xml")