From b46a6f80382d309a4e0c4ebdca346c296b66a789 Mon Sep 17 00:00:00 2001 From: Emmanuel Raviart Date: Sat, 7 Aug 2004 20:42:02 +0000 Subject: [PATCH] Added LECP support in Python simulator and unit tests. I think I have found several bugs in Lasso LECP implementation. My biggest problem is that I didn't find a way for IDP to set userAuthenticated, authenticationMethod, reauthenticateOnOrAfter to lecp before (or when) building response envelope with lecp.build_authn_response_envelope_msg(). Did I overlook something? --- python/tests/IdentityProvider.py | 125 ++++++++++++++------- python/tests/LibertyEnabledClient.py | 107 ++++++++++++++++++ python/tests/Provider.py | 10 +- python/tests/ServiceProvider.py | 160 ++++++++++++++++++++------- python/tests/login_tests.py | 83 +++++++++----- python/tests/websimulator.py | 76 +++++++++++-- 6 files changed, 437 insertions(+), 124 deletions(-) create mode 100644 python/tests/LibertyEnabledClient.py diff --git a/python/tests/IdentityProvider.py b/python/tests/IdentityProvider.py index e1cfef5b..2715c42a 100644 --- a/python/tests/IdentityProvider.py +++ b/python/tests/IdentityProvider.py @@ -21,8 +21,6 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# FIXME: Replace principal with client in most methods. -# FIXME: Rename webUser to userAccount. import lasso @@ -40,49 +38,92 @@ class IdentityProvider(Provider): def singleSignOn(self, httpRequest): server = self.getServer() - login = lasso.Login.new(server) - webSession = self.getWebSession(httpRequest.client) - webUser = None - if webSession is not None: + if httpRequest.method == 'GET': + # Single sign-on using HTTP redirect. + login = lasso.Login.new(server) + webSession = self.getWebSession(httpRequest.client) + webUser = None + if webSession is not None: + if webSession.sessionDump is not None: + login.set_session_from_dump(webSession.sessionDump) + webUser = self.getWebUserFromWebSession(webSession) + if webUser is not None and webUser.identityDump is not None: + login.set_identity_from_dump(webUser.identityDump) + login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect) + + if not login.must_authenticate(): + userAuthenticated = webUser is not None + authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + return self.singleSignOn_part2( + httpRequest, login, webSession, webUser, userAuthenticated, + authenticationMethod) + + if webSession is None: + webSession = self.createWebSession(httpRequest.client) + webSession.loginDump = login.dump() + + # A real identity provider using a HTML form to ask user's login & password would store + # idpLoginDump in a session variable and display the HTML login form. + webUserId = httpRequest.client.keyring.get(self.url, None) + userAuthenticated = webUserId in self.webUsers + if userAuthenticated: + webSession.webUserId = webUserId + authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + + server = self.getServer() + webSession = self.getWebSession(httpRequest.client) + loginDump = webSession.loginDump + del webSession.loginDump + login = lasso.Login.new_from_dump(server, loginDump) + # Set identity & session in login, because loginDump doesn't contain them. if webSession.sessionDump is not None: login.set_session_from_dump(webSession.sessionDump) webUser = self.getWebUserFromWebSession(webSession) if webUser is not None and webUser.identityDump is not None: login.set_identity_from_dump(webUser.identityDump) - login.init_from_authn_request_msg(httpRequest.query, lasso.httpMethodRedirect) - if not login.must_authenticate(): - userAuthenticated = webUser is not None - authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME return self.singleSignOn_part2( httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) - - if webSession is None: - webSession = self.createWebSession(httpRequest.client) - webSession.loginDump = login.dump() - - # A real identity provider using a HTML form to ask user's login & password would store - # idpLoginDump in a session variable and display the HTML login form. - webUserId = httpRequest.client.keyring.get(self.url, None) - userAuthenticated = webUserId in self.webUsers - if userAuthenticated: - webSession.webUserId = webUserId - authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME - - server = self.getServer() - webSession = self.getWebSession(httpRequest.client) - loginDump = webSession.loginDump - del webSession.loginDump - login = lasso.Login.new_from_dump(server, loginDump) - # Set identity & session in login, because loginDump doesn't contain them. - if webSession.sessionDump is not None: - login.set_session_from_dump(webSession.sessionDump) - webUser = self.getWebUserFromWebSession(webSession) - if webUser is not None and webUser.identityDump is not None: - login.set_identity_from_dump(webUser.identityDump) - - return self.singleSignOn_part2( - httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod) + elif httpRequest.method == 'POST' \ + and httpRequest.headers.get('Content-Type', None) == 'text/xml': + # SOAP request => LECP single sign-on. + lecp = lasso.Lecp.new(server) + webSession = self.getWebSession(httpRequest.client) + webUser = None + if webSession is not None: + if webSession.sessionDump is not None: + lecp.set_session_from_dump(webSession.sessionDump) + webUser = self.getWebUserFromWebSession(webSession) + if webUser is not None and webUser.identityDump is not None: + lecp.set_identity_from_dump(webUser.identityDump) + lecp.init_from_authn_request_msg(httpRequest.body, lasso.httpMethodSoap) + # FIXME: lecp.must_authenticate() should always return False. + # The other solution is that we are able to not call lecp.must_authenticate() + # self.failIf(lecp.must_authenticate()) + userAuthenticated = webUser is not None + authenticationMethod = lasso.samlAuthenticationMethodPassword # FIXME + # FIXME: It is very very strange that we don't provide userAuthenticated, + # authenticationMethod, reauthenticateOnOrAfter' to lecp before or when build response. + lecp.build_authn_response_envelope_msg() + soapResponseMsg = lecp.msg_body + self.failUnless(soapResponseMsg) + # FIXME: Lasso should set a lecp.msg_content_type to + # "application/vnd.liberty-response+xml". This should also be done for SOAP, etc, with + # other profiles. + # contentType = lecp.msg_content_type + # self.failUnlessEqual(contentType, 'application/vnd.liberty-response+xml') + contentType = 'application/vnd.liberty-response+xml' + return self.newHttpResponse( + 200, + headers = { + 'Content-Type': contentType, + 'Cache-Control': 'no-cache', + 'Pragma': 'no-cache', + }, + body = soapResponseMsg) + else: + return self.newHttpResponse( + 400, 'Bad Request: Method %s not handled by singleSignOn' % httpRequest.method) def singleSignOn_part2(self, httpRequest, login, webSession, webUser, userAuthenticated, authenticationMethod): @@ -127,7 +168,8 @@ class IdentityProvider(Provider): soapResponseMsg = self.soapResponseMsgs.get(artifact, None) if soapResponseMsg is None: raise Exception('FIXME: Handle the case when artifact is wrong') - return HttpResponse(200, body = soapResponseMsg) + return self.newHttpResponse( + 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) elif requestType == lasso.requestTypeLogout: server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeIdp) @@ -183,7 +225,9 @@ class IdentityProvider(Provider): self.failUnless(soapEndpoint) soapRequestMsg = logout.msg_body self.failUnless(soapRequestMsg) - httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask() + httpResponse = sendHttpRequest( + 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, + body = soapRequestMsg) self.failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) @@ -192,6 +236,7 @@ class IdentityProvider(Provider): logout.build_response_msg() soapResponseMsg = logout.msg_body self.failUnless(soapResponseMsg) - return HttpResponse(200, body = soapResponseMsg) + return self.newHttpResponse( + 200, headers = {'Content-Type': 'text/xml'}, body = soapResponseMsg) else: raise Exception('Unknown request type: %s' % requestType) diff --git a/python/tests/LibertyEnabledClient.py b/python/tests/LibertyEnabledClient.py new file mode 100644 index 00000000..397999bf --- /dev/null +++ b/python/tests/LibertyEnabledClient.py @@ -0,0 +1,107 @@ +# -*- coding: UTF-8 -*- + + +# Python Lasso Simulator +# +# Copyright (C) 2004 Entr'ouvert +# http://lasso.entrouvert.org +# +# Author: Emmanuel Raviart +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +import lasso + +from websimulator import * + + +class LibertyEnabledClient(WebClient, Simulation): + # A service provider MAY provide a list of identity providers it recognizes by including the + # element in the . The format and processing rules for + # the identity provider list MUST be as defined in [LibertyProtSchema]. + + # The identity provider list can be used by the LECP to create a user identifier to be + # presented to the Principal. For example, the LECP could compare the list of the Principal's + # known identities (and the identities of the identity provider that provides those identities) + # against the list provided by the service provider and then only display the intersection. + + # If the service provider does not support the LECP-advertised Liberty version, the service + # provider MUST return to the LECP an HTTP 501 response with the reason phrase "Unsupported + # Liberty Version." + # + # The responses in step 3 and step 6 SHOULD NOT be cached. To this end service providers and + # identity providers SHOULD place both "Cache-Control: no-cache" and "Pragma: no-cache" on + # their responses to ensure that the LECP and any intervening proxies will not cache the + # response. + + # If the LECP discovers a syntax error due to the service provider or cannot proceed any + # further for other reasons (for example, cannot resolve identity provider, cannot reach the + # identity provider, etc.), the LECP MUST return to the service provider a + # with a indicating the desired error element as defined in + # [LibertyProtSchema]. The containing the error status MUST be sent using + # a POST to the service provider's assertion consumer service URL obtained from the + # element of the . The POST MUST + # be a form that contains the field LARES with the value being the + # protocol message as defined in [LibertyProtSchema], containing the . The + # MUST be encoded by applying a base64 transformation (refer to + # [RFC2045]) to the and all its elements. + + # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without + # metadata, instead of using 'singleSignOnServiceUrl'. + idpSingleSignOnServiceUrl = None + requestHeaders = WebClient.requestHeaders.copy() + requestHeaders.update({ + # FIXME: Is this the correct syntax for several URLs in LIBV? + 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', + 'Liberty-Agent': 'LassoSimulator/0.0.0', + # FIXME: As an alternative to 'Liberty-Enabled' header, a user agent may use: + # 'User-Agent': ' '.join(( + # requestHeaders['User-Agent'], + # 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1')) + 'Accept': ','.join((requestHeaders['Accept'], 'application/vnd.liberty-request+xml')) + }) + + def __init__(self, test, internet): + Simulation.__init__(self, test) + WebClient.__init__(self, internet) + + def login(self, principal, site, path): + httpResponse = self.sendHttpRequestToSite(site, 'GET', path) + self.failUnlessEqual( + httpResponse.headers['Content-Type'], 'application/vnd.liberty-request+xml') + lecp = lasso.Lecp.new(None) + authnRequestEnvelope = httpResponse.body + # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. I think this + # is an error. + import base64 + authnRequestEnvelope = base64.encodestring(authnRequestEnvelope) + lecp.process_authn_request_envelope_msg(authnRequestEnvelope) + lecp.build_authn_request_msg() + # FIXME: The service provider could return an IDPList in authnRequestEnvelope, so that + # we verify that self.idpSingleSignOnServiceUrl belongs to one of them + httpResponse = self.sendHttpRequest( + 'POST', self.idpSingleSignOnServiceUrl, headers = {'Content-Type': 'text/xml'}, + body = lecp.msg_body) + self.failUnlessEqual( + httpResponse.headers.get('Content-Type', None), 'application/vnd.liberty-response+xml') + lecp.process_authn_response_envelope_msg(httpResponse.body) + lecp.build_authn_response_msg() + self.failUnless(lecp.msg_url) + self.failUnless(lecp.msg_body) + # FIXME: Should we use 'multipart/form-data' for forms? + return self.sendHttpRequest( + 'POST', lecp.msg_url, headers = {'Content-Type': 'multipart/form-data'}, + form = {'LARES': lecp.msg_body}) diff --git a/python/tests/Provider.py b/python/tests/Provider.py index 809376d4..88406814 100644 --- a/python/tests/Provider.py +++ b/python/tests/Provider.py @@ -21,8 +21,6 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# FIXME: Replace principal with client in most methods. -# FIXME: Rename webUser to userAccount. import lasso @@ -31,5 +29,13 @@ from websimulator import * class Provider(WebSite): + responseHeaders = WebSite.responseHeaders.copy() + responseHeaders.update({ + 'Liberty-Enabled': 'LIBV=urn:liberty:iff:2003-08,http://projectliberty.org/specs/v1', + }) + serverDump = None + webUserIdsByNameIdentifier = None + webSessionIdsByNameIdentifier = None + def getServer(self): return lasso.Server.new_from_dump(self.serverDump) diff --git a/python/tests/ServiceProvider.py b/python/tests/ServiceProvider.py index 4ecff2df..c548655b 100644 --- a/python/tests/ServiceProvider.py +++ b/python/tests/ServiceProvider.py @@ -21,8 +21,6 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# FIXME: Replace principal with client in most methods. -# FIXME: Rename webUser to userAccount. import lasso @@ -37,23 +35,46 @@ class ServiceProvider(Provider): def assertionConsumer(self, httpRequest): server = self.getServer() login = lasso.Login.new(server) - login.init_request(httpRequest.query, lasso.httpMethodRedirect) - login.build_request_msg() - soapEndpoint = login.msg_url - self.failUnless(soapEndpoint) - soapRequestMsg = login.msg_body - self.failUnless(soapRequestMsg) - httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask() - self.failUnlessEqual(httpResponse.statusCode, 200) - try: - login.process_response_msg(httpResponse.body) - except lasso.Error, error: - if error.code == -7: # FIXME: This will change, he said. - return HttpResponse( - 401, 'Access Unauthorized: User authentication failed on identity provider.') - else: - raise + if httpRequest.method == 'GET': + login.init_request(httpRequest.query, lasso.httpMethodRedirect) + login.build_request_msg() + + soapEndpoint = login.msg_url + self.failUnless(soapEndpoint) + soapRequestMsg = login.msg_body + self.failUnless(soapRequestMsg) + httpResponse = self.sendHttpRequest( + 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, + body = soapRequestMsg) + self.failUnlessEqual(httpResponse.statusCode, 200) + try: + login.process_response_msg(httpResponse.body) + except lasso.Error, error: + if error.code == -7: # FIXME: This will change, he said. + return self.newHttpResponse( + 401, + 'Access Unauthorized: User authentication failed on identity provider.') + else: + raise + elif httpRequest.method == 'POST': + authnResponseMsg = httpRequest.getFormField('LARES', None) + self.failUnless(authnResponseMsg) + # FIXME: Should we do an init before process_authn_response_msg? + try: + login.process_authn_response_msg(authnResponseMsg) + except lasso.Error, error: + if error.code == -7: # FIXME: This will change, he said. + return self.newHttpResponse( + 401, + 'Access Unauthorized: User authentication failed on identity provider.') + else: + raise + else: + return self.newHttpResponse( + 400, + 'Bad Request: Method %s not handled by assertionConsumer' % httpRequest.method) + nameIdentifier = login.nameIdentifier self.failUnless(nameIdentifier) @@ -100,7 +121,7 @@ class ServiceProvider(Provider): webUserId = httpRequest.client.keyring.get(self.url, None) userAuthenticated = webUserId in self.webUsers if not userAuthenticated: - return HttpResponse(401, 'Access Unauthorized: User has no account.') + return self.newHttpResponse(401, 'Access Unauthorized: User has no account.') webUser = self.webUsers[webUserId] webSession.webUserId = webUser.uniqueId @@ -113,35 +134,91 @@ class ServiceProvider(Provider): self.webUserIdsByNameIdentifier[nameIdentifier] = webUser.uniqueId self.webSessionIdsByNameIdentifier[nameIdentifier] = webSession.uniqueId - return HttpResponse(200) + return self.newHttpResponse(200) + + def login(self, httpRequest): + libertyEnabled = httpRequest.headers.get('Liberty-Enabled', None) + userAgent = httpRequest.headers.get('User-Agent', None) + # FIXME: Lasso should have a function to compute useLecp. + # Or this should be done in lasso.Login.new(server, libertyEnabled, userAgent) + useLecp = False + if libertyEnabled: + useLecp = 'urn:liberty:iff:2003-08' in libertyEnabled + if not useLecp: + return self.newHttpResponse(501, 'Unsupported Liberty Version.') + elif userAgent: + useLecp = 'urn:liberty:iff:2003-08' in userAgent + if not useLecp and "LIBV=" in userAgent: + return self.newHttpResponse(501, 'Unsupported Liberty Version.') + else: + useLecp = False - def loginUsingRedirect(self, httpRequest): - server = self.getServer() - login = lasso.Login.new(server) - login.init_authn_request(self.idpSite.providerId) - self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) forceAuthn = httpRequest.getQueryBoolean('forceAuthn', False) - if forceAuthn: - login.request.set_forceAuthn(forceAuthn) isPassive = httpRequest.getQueryBoolean('isPassive', False) - if not isPassive: - login.request.set_isPassive(isPassive) - login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) - login.request.set_consent(lasso.libConsentObtained) - relayState = 'fake' - login.request.set_relayState(relayState) - login.build_authn_request_msg() - authnRequestUrl = login.msg_url - self.failUnless(authnRequestUrl) - return httpRequest.client.redirect(authnRequestUrl) + server = self.getServer() + if useLecp: + lecp = lasso.Lecp.new(server) + lecp.init_authn_request(self.idpSite.providerId) # FIXME: The argument should be None. + self.failUnlessEqual(lecp.request_type, lasso.messageTypeAuthnRequest) + + # FIXME: This protocol profile should be set by default by Lasso. + lecp.request.set_protocolProfile(lasso.libProtocolProfileBrwsPost) + + # Same treatement as for non LECP login. + if forceAuthn: + lecp.request.set_forceAuthn(forceAuthn) + if not isPassive: + lecp.request.set_isPassive(isPassive) + lecp.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) + lecp.request.set_consent(lasso.libConsentObtained) + relayState = 'fake' + lecp.request.set_relayState(relayState) + + # FIXME: In my opinion, this method should be the renamed to build_authn_request_msg. + lecp.build_authn_request_envelope_msg() + authnRequestEnvelopeMsg = lecp.msg_body + # FIXME: I don't understand why authnRequestEnvelopeMsg is base64 encoded. + import base64 + authnRequestEnvelopeMsg = base64.decodestring(authnRequestEnvelopeMsg) + self.failUnless(authnRequestEnvelopeMsg) + # FIXME: Lasso should set a lecp.msg_content_type to + # "application/vnd.liberty-request+xml". This should also be done for SOAP, etc, with + # other profiles. + # contentType = lecp.msg_content_type + # self.failUnlessEqual(contentType, 'application/vnd.liberty-request+xml') + contentType = 'application/vnd.liberty-request+xml' + return self.newHttpResponse( + 200, + headers = { + 'Content-Type': contentType, + 'Cache-Control': 'no-cache', + 'Pragma': 'no-cache', + }, + body = authnRequestEnvelopeMsg) + else: + login = lasso.Login.new(server) + login.init_authn_request(self.idpSite.providerId) + self.failUnlessEqual(login.request_type, lasso.messageTypeAuthnRequest) + if forceAuthn: + login.request.set_forceAuthn(forceAuthn) + if not isPassive: + login.request.set_isPassive(isPassive) + login.request.set_nameIDPolicy(lasso.libNameIDPolicyTypeFederated) + login.request.set_consent(lasso.libConsentObtained) + relayState = 'fake' + login.request.set_relayState(relayState) + login.build_authn_request_msg() + authnRequestUrl = login.msg_url + self.failUnless(authnRequestUrl) + return httpRequest.client.redirect(authnRequestUrl) def logoutUsingSoap(self, httpRequest): webSession = self.getWebSession(httpRequest.client) if webSession is None: - return HttpResponse(401, 'Access Unauthorized: User has no session opened.') + return self.newHttpResponse(401, 'Access Unauthorized: User has no session opened.') webUser = self.getWebUserFromWebSession(webSession) if webUser is None: - return HttpResponse(401, 'Access Unauthorized: User is not logged in.') + return self.newHttpResponse(401, 'Access Unauthorized: User is not logged in.') server = self.getServer() logout = lasso.Logout.new(server, lasso.providerTypeSp) @@ -158,7 +235,8 @@ class ServiceProvider(Provider): self.failUnless(soapEndpoint) soapRequestMsg = logout.msg_body self.failUnless(soapRequestMsg) - httpResponse = HttpRequest(self, 'POST', soapEndpoint, body = soapRequestMsg).ask() + httpResponse = self.sendHttpRequest( + 'POST', soapEndpoint, headers = {'Content-Type': 'text/xml'}, body = soapRequestMsg) self.failUnlessEqual(httpResponse.statusCode, 200) logout.process_response_msg(httpResponse.body, lasso.httpMethodSoap) @@ -185,4 +263,4 @@ class ServiceProvider(Provider): self.failUnless(nameIdentifier) del self.webSessionIdsByNameIdentifier[nameIdentifier] - return HttpResponse(200) + return self.newHttpResponse(200) diff --git a/python/tests/login_tests.py b/python/tests/login_tests.py index 75b00aad..c3a1c46b 100644 --- a/python/tests/login_tests.py +++ b/python/tests/login_tests.py @@ -33,6 +33,7 @@ sys.path.insert(0, '../.libs') import lasso from IdentityProvider import IdentityProvider +from LibertyEnabledClient import LibertyEnabledClient from ServiceProvider import ServiceProvider from websimulator import * @@ -63,6 +64,24 @@ class LoginTestCase(unittest.TestCase): # Frederic Peters has no account on identity provider. return site + def generateLibertyEnabledClient(self, internet): + client = LibertyEnabledClient(self, internet) + # FIXME: Lasso should provide a way for Liberty-enabled client to create a "server" without + # metadata, instead of using 'singleSignOnServiceUrl'. +## server = lasso.Server.new( +## None, # A LECP has no metadata. +## '../../examples/data/client-public-key.pem', +## '../../examples/data/client-private-key.pem', +## '../../examples/data/client-crt.pem', +## lasso.signatureMethodRsaSha1) +## server.add_provider( +## '../../examples/data/idp-metadata.xml', +## '../../examples/data/idp-public-key.pem', +## '../../examples/data/ca-crt.pem') +## client.server = server + client.idpSingleSignOnServiceUrl = 'https://identity-provider/singleSignOn' + return client + def generateSpSite(self, internet): site = ServiceProvider(self, internet, 'https://service-provider/') site.providerId = 'https://service-provider/metadata' @@ -94,6 +113,19 @@ class LoginTestCase(unittest.TestCase): ## def tearDown(self): ## pass + def test00(self): + """LECP login.""" + internet = Internet() + idpSite = self.generateIdpSite(internet) + spSite = self.generateSpSite(internet) + spSite.idpSite = idpSite + lec = self.generateLibertyEnabledClient(internet) + principal = Principal(internet, 'Romain Chantereau') + principal.keyring[idpSite.url] = 'Chantereau' + principal.keyring[spSite.url] = 'Romain' + httpResponse = lec.login(principal, spSite, '/login') + raise str((httpResponse.headers['Content-Type'], httpResponse.body)) + def test01(self): """Service provider initiated login using HTTP redirect and service provider initiated logout using SOAP.""" @@ -105,9 +137,9 @@ class LoginTestCase(unittest.TestCase): principal.keyring[idpSite.url] = 'Chantereau' principal.keyring[spSite.url] = 'Romain' - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) self.failIf(spSite.webSessions) self.failIf(idpSite.webSessions) @@ -123,30 +155,28 @@ class LoginTestCase(unittest.TestCase): principal.keyring[idpSite.url] = 'Chantereau' principal.keyring[spSite.url] = 'Romain' - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) # Once again. Now the principal already has a federation between spSite and idpSite. - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) # Once again. Do a new passive login between normal login and logout. - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 200) del principal.keyring[idpSite.url] # Ensure identity provider will be really passive. - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?isPassive=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') self.failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) # Once again, with isPassive and the user having no web session. - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?isPassive=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') self.failUnlessEqual(httpResponse.statusCode, 401) def test03(self): @@ -160,9 +190,9 @@ class LoginTestCase(unittest.TestCase): # Frederic Peters has no account on identity provider. principal.keyring[spSite.url] = 'Frederic' - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 401) def test04(self): @@ -177,9 +207,9 @@ class LoginTestCase(unittest.TestCase): principal.keyring[idpSite.url] = 'Nowicki' # Christophe Nowicki has no account on service provider. - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/loginUsingRedirect')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login') self.failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 401) def test05(self): @@ -193,8 +223,7 @@ class LoginTestCase(unittest.TestCase): principal.keyring[idpSite.url] = 'Chantereau' principal.keyring[spSite.url] = 'Romain' - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?isPassive=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?isPassive=1') self.failUnlessEqual(httpResponse.statusCode, 401) def test06(self): @@ -208,28 +237,24 @@ class LoginTestCase(unittest.TestCase): principal.keyring[idpSite.url] = 'Chantereau' principal.keyring[spSite.url] = 'Romain' - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?forceAuthn=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') self.failUnlessEqual(httpResponse.statusCode, 200) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) # Ask user to reauthenticate while he is already logged. - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?forceAuthn=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') self.failUnlessEqual(httpResponse.statusCode, 200) del principal.keyring[idpSite.url] # Ensure user can't authenticate. - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?forceAuthn=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') self.failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 200) # Force authentication, but user won't authenticate. - httpResponse = spSite.doHttpRequest(HttpRequest( - principal, 'GET', '/loginUsingRedirect?forceAuthn=1')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/login?forceAuthn=1') self.failUnlessEqual(httpResponse.statusCode, 401) - httpResponse = spSite.doHttpRequest(HttpRequest(principal, 'GET', '/logoutUsingSoap')) + httpResponse = principal.sendHttpRequestToSite(spSite, 'GET', '/logoutUsingSoap') self.failUnlessEqual(httpResponse.statusCode, 401) ## def test06(self): diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py index 806b5dd5..65064a0a 100644 --- a/python/tests/websimulator.py +++ b/python/tests/websimulator.py @@ -21,6 +21,8 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + # FIXME: Replace principal with client in most methods. # FIXME: Rename webUser to userAccount. @@ -28,21 +30,33 @@ class HttpRequest(object): client = None # Principal or web site sending the request. body = None - header = None + form = None + headers = None method = None # 'GET' or 'POST' or 'PUT' or... url = None - def __init__(self, client, method, url, body = None): + def __init__(self, client, method, url, headers = None, body = None, form = None): self.client = client self.method = method self.url = url + if headers: + self.headers = headers if body: self.body = body + if form: + self.form = form - def ask(self): + def send(self): webSite = self.client.internet.getWebSite(self.url) return webSite.doHttpRequest(self) + def getFormField(self, name, default = 'none'): + if self.form and name in self.form: + return self.form[name] + if default == 'none': + raise KeyError(name) + return default + def getQueryBoolean(self, name, default = 'none'): try: fieldValue = self.getQueryField(name) @@ -60,8 +74,7 @@ class HttpRequest(object): return '' def getQueryField(self, name, default = 'none'): - query = self.query - if query: + if self.query: for field in self.query.split('&'): fieldName, fieldValue = field.split('=') if name == fieldName: @@ -75,14 +88,16 @@ class HttpRequest(object): class HttpResponse(object): body = None - header = None + headers = None statusCode = None # 200 or... statusMessage = None - def __init__(self, statusCode, statusMessage = None, body = None): + def __init__(self, statusCode, statusMessage = None, headers = None, body = None): self.statusCode = statusCode if statusMessage: self.statusMessage = statusMessage + if headers: + self.headers = headers if body: self.body = body @@ -137,6 +152,10 @@ class Simulation(object): class WebClient(object): internet = None keyring = None + requestHeaders = { + 'User-Agent': 'LassoSimulator/0.0.0', + 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html', + } webSessionIds = None # Simulate the cookies, stored in user's navigator, and containing the # IDs of sessions already opened by the user. @@ -146,8 +165,30 @@ class WebClient(object): self.webSessionIds = {} def redirect(self, url): - webSite = self.internet.getWebSite(url) - return webSite.doHttpRequest(HttpRequest(self, 'GET', url)) + return self.sendHttpRequest('GET', url) + + def sendHttpRequest(self, method, url, headers = None, body = None, form = None): + requestHeaders = self.requestHeaders + if headers: + requestHeaders = self.requestHeaders.copy() + for name, value in headers.iteritems(): + requestHeaders[name] = value + else: + requestHeaders = self.requestHeaders + return HttpRequest( + self, method, url, headers = requestHeaders, body = body, form = form).send() + + def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None, + form = None): + url = webSite.url + if path: + if path[0] == '/': + while url[-1] == '/': + url = url[:-1] + elif url[-1] != '/': + url += '/' + url += path + return self.sendHttpRequest(method, url, headers = headers, body = body, form = form) class Principal(WebClient): @@ -189,11 +230,11 @@ class WebSite(WebClient, Simulation): lastWebSessionId = 0 providerId = None # The Liberty providerID of this web site - serverDump = None + responseHeaders = { + 'Server': 'Lasso Simulator Web Server', + } url = None # The main URL of web site - webUserIdsByNameIdentifier = None webUsers = None - webSessionIdsByNameIdentifier = None webSessions = None def __init__(self, test, internet, url): @@ -267,3 +308,14 @@ class WebSite(WebClient, Simulation): # The user has no account on this site. return None return self.webUsers.get(webUserId, None) + + def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None): + responseHeaders = self.responseHeaders + if headers: + responseHeaders = self.responseHeaders.copy() + for name, value in headers.iteritems(): + responseHeaders[name] = value + else: + responseHeaders = self.responseHeaders + return HttpResponse( + statusCode, statusMessage = statusMessage, headers = headers, body = body)