Added LECP support in Python simulator and unit tests. I think I have found

several bugs in Lasso LECP implementation.

My biggest problem is that I didn't find a way for IDP to set
userAuthenticated, authenticationMethod, reauthenticateOnOrAfter to lecp
before (or when) building response envelope with
lecp.build_authn_response_envelope_msg(). Did I overlook something?
This commit is contained in:
Emmanuel Raviart 2004-08-07 20:42:02 +00:00
parent 8d90adf21c
commit b46a6f8038
6 changed files with 437 additions and 124 deletions

View File

@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
import lasso
@ -40,49 +38,92 @@ class IdentityProvider(Provider):
def singleSignOn(self, httpRequest):
server = self.getServer()
login = lasso.Login.new(server)
webSession = self.getWebSession(httpRequest.client)
webUser = None
if webSession is not None:
if httpRequest.method == 'GET':
# Single sign-on using HTTP redirect.
login = lasso.Login.new(server)
webSession = self.getWebSession(httpRequest.client)
webUser = None
if webSession is not None:
if webSession.sessionDump is not None:
login.set_session_from_dump(webSession.sessionDump)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is not None and webUser.identityDump is not None:
login.set_identity_from_dump(webUser.identityDump)
login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect)
if not login.must_authenticate():
userAuthenticated = webUser is not None
authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
return self.singleSignOn_part2(
httpRequest, login, webSession, webUser, userAuthenticated,
authenticationMethod)
if webSession is None:
webSession = self.createWebSession(httpRequest.client)
webSession.loginDump = login.dump()
# A real identity provider using a HTML form to ask user's login & password would store
# idpLoginDump in a session variable and display the HTML login form.
webUserId = httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if userAuthenticated:
webSession.webUserId = webUserId
authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
server = self.getServer()
webSession = self.getWebSession(httpRequest.client)
loginDump = webSession.loginDump
del webSession.loginDump
login = lasso.Login.new_from_dump(server, loginDump)
# Set identity & session in login, because loginDump doesn't contain them.
if webSession.sessionDump is not None:
login.set_session_from_dump(webSession.sessionDump)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is not None and webUser.identityDump is not None:
login.set_identity_from_dump(webUser.identityDump)
login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect)
if not login.must_authenticate():
userAuthenticated = webUser is not None
authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
return self.singleSignOn_part2(
httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod)
if webSession is None:
webSession = self.createWebSession(httpRequest.client)
webSession.loginDump = login.dump()
# A real identity provider using a HTML form to ask user's login & password would store
# idpLoginDump in a session variable and display the HTML login form.
webUserId = httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if userAuthenticated:
webSession.webUserId = webUserId
authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
server = self.getServer()
webSession = self.getWebSession(httpRequest.client)
loginDump = webSession.loginDump
del webSession.loginDump
login = lasso.Login.new_from_dump(server, loginDump)
# Set identity & session in login, because loginDump doesn't contain them.
if webSession.sessionDump is not None:
login.set_session_from_dump(webSession.sessionDump)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is not None and webUser.identityDump is not None:
login.set_identity_from_dump(webUser.identityDump)
return self.singleSignOn_part2(
httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod)
elif httpRequest.method == 'POST' \
and httpRequest.headers.get('Content-Type', None) == 'text/xml':
# SOAP request => LECP single sign-on.
lecp = lasso.Lecp.new(server)
webSession = self.getWebSession(httpRequest.client)
webUser = None
if webSession is not None:
if webSession.sessionDump is not None:
lecp.set_session_from_dump(webSession.sessionDump)
webUser = self.getWebUserFromWebSession(webSession)
if webUser is not None and webUser.identityDump is not None:
lecp.set_identity_from_dump(webUser.identityDump)
lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap)
# FIXME: lecp.must_authenticate() should always return False.
# The other solution is that we are able to not call lecp.must_authenticate()
# self.failIf(lecp.must_authenticate())
userAuthenticated = webUser is not None
authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME
# FIXME: It is very very strange that we don't provide userAuthenticated,
# authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response.
lecp.build_authn_response_envelope_msg()
soapResponseMsg = lecp.msg_body
self.failUnless(soapResponseMsg)
# FIXME: Lasso should set a lecp.msg_content_type to
# "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with
# other profiles.
# contentType = lecp.msg_content_type
# self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml')
contentType = 'application/vnd.liberty-response+xml'
return self.newHttpResponse(
200,
headers = {
'Content-Type': contentType,
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
},
body = soapResponseMsg)
else:
return self.newHttpResponse(
400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method)
def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated,
authenticationMethod):
@ -127,7 +168,8 @@ class IdentityProvider(Provider):
soapResponseMsg = self.soapResponseMsgs.get(artifact, None)
if soapResponseMsg is None:
raise Exception('FIXME: Handle the case when artifact is wrong')
return HttpResponse(200, body = soapResponseMsg)
return self.newHttpResponse(
200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
elif requestType == lasso.requestTypeLogout:
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeIdp)
@ -183,7 +225,9 @@ class IdentityProvider(Provider):
self.failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
self.failUnless(soapRequestMsg)
httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
httpResponse = sendHttpRequest(
'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
@ -192,6 +236,7 @@ class IdentityProvider(Provider):
logout.build_response_msg()
soapResponseMsg = logout.msg_body
self.failUnless(soapResponseMsg)
return HttpResponse(200, body = soapResponseMsg)
return self.newHttpResponse(
200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg)
else:
raise Exception('Unknown request type: %s' % requestType)

View File

@ -0,0 +1,107 @@
# -*- coding: UTF-8 -*-
# Python Lasso Simulator
#
# Copyright (C) 2004 Entr'ouvert
# http://lasso.entrouvert.org
#
# Author: Emmanuel Raviart <eraviart@entrouvert.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import lasso
from websimulator import *
class LibertyEnabledClient(WebClient, Simulation):
# A service provider MAY provide a list of identity providers it recognizes by including the
# <lib:IDPList> element in the <lib:AuthnRequestEnvelope>. The format and processing rules for
# the identity provider list MUST be as defined in [LibertyProtSchema].
# The identity provider list can be used by the LECP to create a user identifier to be
# presented to the Principal. For example, the LECP could compare the list of the Principal's
# known identities (and the identities of the identity provider that provides those identities)
# against the list provided by the service provider and then only display the intersection.
# If the service provider does not support the LECP-advertised Liberty version, the service
# provider MUST return to the LECP an HTTP 501 response with the reason phrase "Unsupported
# Liberty Version."
#
# The responses in step 3 and step 6 SHOULD NOT be cached. To this end service providers and
# identity providers SHOULD place both "Cache-Control: no-cache" and "Pragma: no-cache" on
# their responses to ensure that the LECP and any intervening proxies will not cache the
# response.
# If the LECP discovers a syntax error due to the service provider or cannot proceed any
# further for other reasons (for example, cannot resolve identity provider, cannot reach the
# identity provider, etc.), the LECP MUST return to the service provider a <lib:AuthnResponse>
# with a <samlp:Status> indicating the desired error element as defined in
# [LibertyProtSchema]. The <lib:AuthnResponse> containing the error status MUST be sent using
# a POST to the service provider's assertion consumer service URL obtained from the
# <lib:AssertionConsumerServiceURL> element of the <lib:AuthnRequestEnvelope>. The POST MUST
# be a form that contains the field LARES with the value being the <lib:AuthnResponse>
# protocol message as defined in [LibertyProtSchema], containing the <samlp:Status>. The
# <lib:AuthnResponse> MUST be encoded by applying a base64 transformation (refer to
# [RFC2045]) to the <lib:AuthnResponse> and all its elements.
# FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without
# metadata, instead of using 'singleSignOnServiceUrl'.
idpSingleSignOnServiceUrl = None
requestHeaders = WebClient.requestHeaders.copy()
requestHeaders.update({
# FIXME: Is this the correct syntax for several URLs in LIBV?
'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
'Liberty-Agent': 'LassoSimulator/0.0.0',
# FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use:
# 'User-Agent': ' '.join((
# requestHeaders['User-Agent'],
# 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1'))
'Accept': ','.join((requestHeaders['Accept'], 'application/vnd.liberty-request+xml'))
})
def __init__(self, test, internet):
Simulation.__init__(self, test)
WebClient.__init__(self, internet)
def login(self, principal, site, path):
httpResponse = self.sendHttpRequestToSite(site, 'GET', path)
self.failUnlessEqual(
httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml')
lecp = lasso.Lecp.new(None)
authnRequestEnvelope = httpResponse.body
# FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. I think this
# is an error.
import base64
authnRequestEnvelope = base64.encodestring(authnRequestEnvelope)
lecp.process_authn_request_envelope_msg(authnRequestEnvelope)
lecp.build_authn_request_msg()
# FIXME: The service provider could return an IDPList in authnRequestEnvelope, so that
# we verify that self.idpSingleSignOnServiceUrl belongs to one of them
httpResponse = self.sendHttpRequest(
'POST', self.idpSingleSignOnServiceUrl, headers = {'Content-Type': 'text/xml'},
body = lecp.msg_body)
self.failUnlessEqual(
httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml')
lecp.process_authn_response_envelope_msg(httpResponse.body)
lecp.build_authn_response_msg()
self.failUnless(lecp.msg_url)
self.failUnless(lecp.msg_body)
# FIXME: Should we use 'multipart/form-data' for forms?
return self.sendHttpRequest(
'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'},
form = {'LARES': lecp.msg_body})

View File

@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
import lasso
@ -31,5 +29,13 @@ from websimulator import *
class Provider(WebSite):
responseHeaders = WebSite.responseHeaders.copy()
responseHeaders.update({
'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1',
})
serverDump = None
webUserIdsByNameIdentifier = None
webSessionIdsByNameIdentifier = None
def getServer(self):
return lasso.Server.new_from_dump(self.serverDump)

View File

@ -21,8 +21,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
import lasso
@ -37,23 +35,46 @@ class ServiceProvider(Provider):
def assertionConsumer(self, httpRequest):
server = self.getServer()
login = lasso.Login.new(server)
login.init_request(httpRequest.query, lasso.httpMethodRedirect)
login.build_request_msg()
soapEndpoint = login.msg_url
self.failUnless(soapEndpoint)
soapRequestMsg = login.msg_body
self.failUnless(soapRequestMsg)
httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
self.failUnlessEqual(httpResponse.statusCode, 200)
try:
login.process_response_msg(httpResponse.body)
except lasso.Error, error:
if error.code == -7: # FIXME: This will change, he said.
return HttpResponse(
401, 'Access Unauthorized: User authentication failed on identity provider.')
else:
raise
if httpRequest.method == 'GET':
login.init_request(httpRequest.query, lasso.httpMethodRedirect)
login.build_request_msg()
soapEndpoint = login.msg_url
self.failUnless(soapEndpoint)
soapRequestMsg = login.msg_body
self.failUnless(soapRequestMsg)
httpResponse = self.sendHttpRequest(
'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'},
body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
try:
login.process_response_msg(httpResponse.body)
except lasso.Error, error:
if error.code == -7: # FIXME: This will change, he said.
return self.newHttpResponse(
401,
'Access Unauthorized: User authentication failed on identity provider.')
else:
raise
elif httpRequest.method == 'POST':
authnResponseMsg = httpRequest.getFormField('LARES', None)
self.failUnless(authnResponseMsg)
# FIXME: Should we do an init before process_authn_response_msg?
try:
login.process_authn_response_msg(authnResponseMsg)
except lasso.Error, error:
if error.code == -7: # FIXME: This will change, he said.
return self.newHttpResponse(
401,
'Access Unauthorized: User authentication failed on identity provider.')
else:
raise
else:
return self.newHttpResponse(
400,
'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method)
nameIdentifier = login.nameIdentifier
self.failUnless(nameIdentifier)
@ -100,7 +121,7 @@ class ServiceProvider(Provider):
webUserId = httpRequest.client.keyring.get(self.url, None)
userAuthenticated = webUserId in self.webUsers
if not userAuthenticated:
return HttpResponse(401, 'Access Unauthorized: User has no account.')
return self.newHttpResponse(401, 'Access Unauthorized: User has no account.')
webUser = self.webUsers[webUserId]
webSession.webUserId = webUser.uniqueId
@ -113,35 +134,91 @@ class ServiceProvider(Provider):
self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId
self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId
return HttpResponse(200)
return self.newHttpResponse(200)
def login(self, httpRequest):
libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None)
userAgent = httpRequest.headers.get('User-Agent', None)
# FIXME: Lasso should have a function to compute useLecp.
# Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent)
useLecp = False
if libertyEnabled:
useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled
if not useLecp:
return self.newHttpResponse(501, 'Unsupported Liberty Version.')
elif userAgent:
useLecp = 'urn:liberty:iff:2003-08' in userAgent
if not useLecp and "LIBV=" in userAgent:
return self.newHttpResponse(501, 'Unsupported Liberty Version.')
else:
useLecp = False
def loginUsingRedirect(self, httpRequest):
server = self.getServer()
login = lasso.Login.new(server)
login.init_authn_request(self.idpSite.providerId)
self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False)
if forceAuthn:
login.request.set_forceAuthn(forceAuthn)
isPassive = httpRequest.getQueryBoolean('isPassive', False)
if not isPassive:
login.request.set_isPassive(isPassive)
login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
login.request.set_consent(lasso.libConsentObtained)
relayState = 'fake'
login.request.set_relayState(relayState)
login.build_authn_request_msg()
authnRequestUrl = login.msg_url
self.failUnless(authnRequestUrl)
return httpRequest.client.redirect(authnRequestUrl)
server = self.getServer()
if useLecp:
lecp = lasso.Lecp.new(server)
lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None.
self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest)
# FIXME: This protocol profile should be set by default by Lasso.
lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost)
# Same treatement as for non LECP login.
if forceAuthn:
lecp.request.set_forceAuthn(forceAuthn)
if not isPassive:
lecp.request.set_isPassive(isPassive)
lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
lecp.request.set_consent(lasso.libConsentObtained)
relayState = 'fake'
lecp.request.set_relayState(relayState)
# FIXME: In my opinion, this method should be the renamed to build_authn_request_msg.
lecp.build_authn_request_envelope_msg()
authnRequestEnvelopeMsg = lecp.msg_body
# FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded.
import base64
authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg)
self.failUnless(authnRequestEnvelopeMsg)
# FIXME: Lasso should set a lecp.msg_content_type to
# "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with
# other profiles.
# contentType = lecp.msg_content_type
# self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml')
contentType = 'application/vnd.liberty-request+xml'
return self.newHttpResponse(
200,
headers = {
'Content-Type': contentType,
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
},
body = authnRequestEnvelopeMsg)
else:
login = lasso.Login.new(server)
login.init_authn_request(self.idpSite.providerId)
self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest)
if forceAuthn:
login.request.set_forceAuthn(forceAuthn)
if not isPassive:
login.request.set_isPassive(isPassive)
login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated)
login.request.set_consent(lasso.libConsentObtained)
relayState = 'fake'
login.request.set_relayState(relayState)
login.build_authn_request_msg()
authnRequestUrl = login.msg_url
self.failUnless(authnRequestUrl)
return httpRequest.client.redirect(authnRequestUrl)
def logoutUsingSoap(self, httpRequest):
webSession = self.getWebSession(httpRequest.client)
if webSession is None:
return HttpResponse(401, 'Access Unauthorized: User has no session opened.')
return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.')
webUser = self.getWebUserFromWebSession(webSession)
if webUser is None:
return HttpResponse(401, 'Access Unauthorized: User is not logged in.')
return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.')
server = self.getServer()
logout = lasso.Logout.new(server, lasso.providerTypeSp)
@ -158,7 +235,8 @@ class ServiceProvider(Provider):
self.failUnless(soapEndpoint)
soapRequestMsg = logout.msg_body
self.failUnless(soapRequestMsg)
httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask()
httpResponse = self.sendHttpRequest(
'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg)
self.failUnlessEqual(httpResponse.statusCode, 200)
logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap)
@ -185,4 +263,4 @@ class ServiceProvider(Provider):
self.failUnless(nameIdentifier)
del self.webSessionIdsByNameIdentifier[nameIdentifier]
return HttpResponse(200)
return self.newHttpResponse(200)

View File

@ -33,6 +33,7 @@ sys.path.insert(0, '../.libs')
import lasso
from IdentityProvider import IdentityProvider
from LibertyEnabledClient import LibertyEnabledClient
from ServiceProvider import ServiceProvider
from websimulator import *
@ -63,6 +64,24 @@ class LoginTestCase(unittest.TestCase):
# Frederic Peters has no account on identity provider.
return site
def generateLibertyEnabledClient(self, internet):
client = LibertyEnabledClient(self, internet)
# FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without
# metadata, instead of using 'singleSignOnServiceUrl'.
## server = lasso.Server.new(
## None, # A LECP has no metadata.
## '../../examples/data/client-public-key.pem',
## '../../examples/data/client-private-key.pem',
## '../../examples/data/client-crt.pem',
## lasso.signatureMethodRsaSha1)
## server.add_provider(
## '../../examples/data/idp-metadata.xml',
## '../../examples/data/idp-public-key.pem',
## '../../examples/data/ca-crt.pem')
## client.server = server
client.idpSingleSignOnServiceUrl = 'https://identity-provider/singleSignOn'
return client
def generateSpSite(self, internet):
site = ServiceProvider(self, internet, 'https://service-provider/')
site.providerId = 'https://service-provider/metadata'
@ -94,6 +113,19 @@ class LoginTestCase(unittest.TestCase):
## def tearDown(self):
## pass
def test00(self):
"""LECP login."""
internet = Internet()
idpSite = self.generateIdpSite(internet)
spSite = self.generateSpSite(internet)
spSite.idpSite = idpSite
lec = self.generateLibertyEnabledClient(internet)
principal = Principal(internet, 'Romain Chantereau')
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
httpResponse = lec.login(principal, spSite, '/login')
raise str((httpResponse.headers['Content-Type'], httpResponse.body))
def test01(self):
"""Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP."""
@ -105,9 +137,9 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
self.failIf(spSite.webSessions)
self.failIf(idpSite.webSessions)
@ -123,30 +155,28 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again. Now the principal already has a federation between spSite and idpSite.
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again. Do a new passive login between normal login and logout.
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 200)
del principal.keyring[idpSite.url] # Ensure identity provider will be really passive.
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?isPassive=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Once again, with isPassive and the user having no web session.
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?isPassive=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test03(self):
@ -160,9 +190,9 @@ class LoginTestCase(unittest.TestCase):
# Frederic Peters has no account on identity provider.
principal.keyring[spSite.url] = 'Frederic'
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 401)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test04(self):
@ -177,9 +207,9 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Nowicki'
# Christophe Nowicki has no account on service provider.
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login')
self.failUnlessEqual(httpResponse.statusCode, 401)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test05(self):
@ -193,8 +223,7 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?isPassive=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
def test06(self):
@ -208,28 +237,24 @@ class LoginTestCase(unittest.TestCase):
principal.keyring[idpSite.url] = 'Chantereau'
principal.keyring[spSite.url] = 'Romain'
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Ask user to reauthenticate while he is already logged.
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 200)
del principal.keyring[idpSite.url] # Ensure user can't authenticate.
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 200)
# Force authentication, but user won't authenticate.
httpResponse = spSite.doHttpRequest(HttpRequest(
principal, 'GET', '/loginUsingRedirect?forceAuthn=1'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1')
self.failUnlessEqual(httpResponse.statusCode, 401)
httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap'))
httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap')
self.failUnlessEqual(httpResponse.statusCode, 401)
## def test06(self):

View File

@ -21,6 +21,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# FIXME: Replace principal with client in most methods.
# FIXME: Rename webUser to userAccount.
@ -28,21 +30,33 @@
class HttpRequest(object):
client = None # Principal or web site sending the request.
body = None
header = None
form = None
headers = None
method = None # 'GET' or 'POST' or 'PUT' or...
url = None
def __init__(self, client, method, url, body = None):
def __init__(self, client, method, url, headers = None, body = None, form = None):
self.client = client
self.method = method
self.url = url
if headers:
self.headers = headers
if body:
self.body = body
if form:
self.form = form
def ask(self):
def send(self):
webSite = self.client.internet.getWebSite(self.url)
return webSite.doHttpRequest(self)
def getFormField(self, name, default = 'none'):
if self.form and name in self.form:
return self.form[name]
if default == 'none':
raise KeyError(name)
return default
def getQueryBoolean(self, name, default = 'none'):
try:
fieldValue = self.getQueryField(name)
@ -60,8 +74,7 @@ class HttpRequest(object):
return ''
def getQueryField(self, name, default = 'none'):
query = self.query
if query:
if self.query:
for field in self.query.split('&'):
fieldName, fieldValue = field.split('=')
if name == fieldName:
@ -75,14 +88,16 @@ class HttpRequest(object):
class HttpResponse(object):
body = None
header = None
headers = None
statusCode = None # 200 or...
statusMessage = None
def __init__(self, statusCode, statusMessage = None, body = None):
def __init__(self, statusCode, statusMessage = None, headers = None, body = None):
self.statusCode = statusCode
if statusMessage:
self.statusMessage = statusMessage
if headers:
self.headers = headers
if body:
self.body = body
@ -137,6 +152,10 @@ class Simulation(object):
class WebClient(object):
internet = None
keyring = None
requestHeaders = {
'User-Agent': 'LassoSimulator/0.0.0',
'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html',
}
webSessionIds = None # Simulate the cookies, stored in user's navigator, and containing the
# IDs of sessions already opened by the user.
@ -146,8 +165,30 @@ class WebClient(object):
self.webSessionIds = {}
def redirect(self, url):
webSite = self.internet.getWebSite(url)
return webSite.doHttpRequest(HttpRequest(self, 'GET', url))
return self.sendHttpRequest('GET', url)
def sendHttpRequest(self, method, url, headers = None, body = None, form = None):
requestHeaders = self.requestHeaders
if headers:
requestHeaders = self.requestHeaders.copy()
for name, value in headers.iteritems():
requestHeaders[name] = value
else:
requestHeaders = self.requestHeaders
return HttpRequest(
self, method, url, headers = requestHeaders, body = body, form = form).send()
def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None,
form = None):
url = webSite.url
if path:
if path[0] == '/':
while url[-1] == '/':
url = url[:-1]
elif url[-1] != '/':
url += '/'
url += path
return self.sendHttpRequest(method, url, headers = headers, body = body, form = form)
class Principal(WebClient):
@ -189,11 +230,11 @@ class WebSite(WebClient, Simulation):
lastWebSessionId = 0
providerId = None # The Liberty providerID of this web site
serverDump = None
responseHeaders = {
'Server': 'Lasso Simulator Web Server',
}
url = None # The main URL of web site
webUserIdsByNameIdentifier = None
webUsers = None
webSessionIdsByNameIdentifier = None
webSessions = None
def __init__(self, test, internet, url):
@ -267,3 +308,14 @@ class WebSite(WebClient, Simulation):
# The user has no account on this site.
return None
return self.webUsers.get(webUserId, None)
def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None):
responseHeaders = self.responseHeaders
if headers:
responseHeaders = self.responseHeaders.copy()
for name, value in headers.iteritems():
responseHeaders[name] = value
else:
responseHeaders = self.responseHeaders
return HttpResponse(
statusCode, statusMessage = statusMessage, headers = headers, body = body)