From b0a3c22d39da3f62f6b036d9ab7921de409c7f3c Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Wed, 17 Feb 2010 10:14:47 +0000 Subject: [PATCH] in idwsf2_tests.py, merge test case for metadata registering, add test case for failure --- bindings/python/tests/idwsf2_tests.py | 388 ++++++++++++++------------ 1 file changed, 211 insertions(+), 177 deletions(-) diff --git a/bindings/python/tests/idwsf2_tests.py b/bindings/python/tests/idwsf2_tests.py index c4d64aa1..10647d5f 100755 --- a/bindings/python/tests/idwsf2_tests.py +++ b/bindings/python/tests/idwsf2_tests.py @@ -42,7 +42,8 @@ try: except NameError: dataDir = os.path.join(os.environ['TOP_SRCDIR'], 'tests', 'data') -disco_soap_url = 'http://idp1/soapEndpoint' +idpSoapEndpoint = 'http://idp1/soapEndpoint' +spSoapEndpoint = 'http://sp1/soapEndpoint' class IdWsf2TestCase(unittest.TestCase): def getWspServer(self): @@ -87,22 +88,21 @@ class IdWsf2TestCase(unittest.TestCase): def metadataRegister(self, wsp, idp): wsp_disco = lasso.IdWsf2Discovery(wsp) abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' wsp_disco.initMetadataRegister() wsp_disco.addSimpleServiceMetadata( service_types = ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], - address = soapEndpoint) + address = idpSoapEndpoint) wsp_disco.buildRequestMsg() idp_disco = lasso.IdWsf2Discovery(idp) idp_disco.processRequestMsg(wsp_disco.msgBody) idp_disco.validateRequest() - assert(len(idp_disco.metadatas) == 1) + self.failUnlessEqual(len(idp_disco.metadatas), 1) idp_disco.buildResponseMsg() wsp_disco.processResponseMsg(idp_disco.msgBody) - assert(len(wsp_disco.metadatas) == 1) - assert(wsp_disco.metadatas[0].svcMDID == wsp_disco.response.svcMDID[0]) + self.failUnlessEqual(len(wsp_disco.metadatas), 1) + self.failUnlessEqual(wsp_disco.metadatas[0].svcMDID, wsp_disco.response.svcMDID[0]) return idp, wsp_disco.metadatas[0] def login(self, sp, idp, sp_identity_dump=None, sp_session_dump=None, @@ -123,7 +123,7 @@ class IdWsf2TestCase(unittest.TestCase): idp_login.processAuthnRequestMsg(query) idp_login.validateRequestMsg(True, True) idp_login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD, None, None, None, None) - idp_login.idwsf2AddDiscoveryBootstrapEpr(url = disco_soap_url, abstract = 'Discovery Service', security_mech_id = lasso.SECURITY_MECH_BEARER) + idp_login.idwsf2AddDiscoveryBootstrapEpr(url = idpSoapEndpoint, abstract = 'Discovery Service', security_mech_id = lasso.SECURITY_MECH_BEARER) idp_login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET) artifact_message = idp_login.artifactMessage @@ -153,193 +153,227 @@ class IdWsf2TestCase(unittest.TestCase): return sp_identity_dump, sp_session_dump, idp_identity_dump, idp_session_dump, sp_login.idwsf2GetDiscoveryBootstrapEpr() -class MetadataRegisterTestCase(IdWsf2TestCase): +class MetadataTestCase(IdWsf2TestCase): def test01(self): - """Init metadata registration request""" - + """Test metadata registration on the IdP""" idp = self.getIdpServer() wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + + wsp_disco = lasso.IdWsf2Discovery(wsp) wsp_disco.setEpr(dst_epr) abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - try: - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, - provider_id = wsp.providerIds[0], address = soapEndpoint) - self.failUnless(wsp_disco.request is not None) - self.failUnlessEqual(len(wsp_disco.metadatas), 1) - except lasso.Error, e: - self.fail(e) - - - def test02(self): - """Build metadata registration request""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' wsp_disco.initMetadataRegister() + self.failUnless(wsp_disco.request is not None) wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - self.failUnless(wsp_disco.msgUrl, 'missing soap URL target') - self.failUnless(wsp_disco.msgBody, 'missing soap request') - - def test03(self): - """Check metadata registration request type""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - request_type = lasso.getRequestTypeFromSoapMsg(wsp_disco.msgBody) - self.failUnlessEqual(request_type, lasso.REQUEST_TYPE_IDWSF2_DISCO_SVCMD_REGISTER, - 'wrong request type in metadata_register : %s' % request_type) - - def test04(self): - """Process metadata registration request""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - try: - idp_disco.processRequestMsg(wsp_disco.msgBody) - self.failUnlessEqual(idp_disco.getRequestType(), lasso.IDWSF2_DISCOVERY_REQUEST_TYPE_MD_REGISTER) - except lasso.Error, e: - self.fail(e) - - try: - idp_disco.checkSecurityMechanism(lasso.SECURITY_MECH_BEARER) - except lasso.Error, e: - self.fail(e) - - def test05(self): - """Check metadata registration on the Discovery service""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) + (lasso.PP11_HREF,), abstract = abstract, + provider_id = wsp.providerId, address = spSoapEndpoint, + security_mech_ids = (lasso.SECURITY_MECH_BEARER,)) + self.failUnlessEqual(len(wsp_disco.metadatas), 1) + metadata = wsp_disco.metadatas[0] + self.failUnlessEqual(metadata.abstract, abstract) + self.failUnlessEqual(metadata.providerId, wsp.providerId) + self.failUnlessEqual(len(metadata.serviceContext), 1) + self.failUnlessEqual(len(metadata.serviceContext[0].serviceType), 1) + self.failUnlessEqual(metadata.serviceContext[0].serviceType[0], + lasso.PP11_HREF) + self.failUnlessEqual(len(metadata.serviceContext[0].endpointContext), 1) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].address), + 1) + self.failUnlessEqual(metadata.serviceContext[0].endpointContext[0].address[0], + spSoapEndpoint) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].securityMechId), + 1) + self.failUnlessEqual( + metadata.serviceContext[0].endpointContext[0].securityMechId[0], + lasso.SECURITY_MECH_BEARER) + self.failUnless(metadata.svcMDID is None) wsp_disco.buildRequestMsg() + self.failUnlessEqual(wsp_disco.msgUrl, idpSoapEndpoint) + self.failUnless(wsp_disco.msgBody is not None) idp_disco = lasso.IdWsf2Discovery(idp) + self.failUnless(idp_disco is not None) idp_disco.processRequestMsg(wsp_disco.msgBody) - - try: - idp_disco.validateRequest() - except lasso.Error, e: - self.fail(e) - + self.failUnless(idp_disco.request is not None) + self.failUnlessEqual(len(idp_disco.request.svcMD), 1) + self.failUnless(idp_disco.request.svcMD[0].svcMDID is None) + idp_disco.validateRequest() + self.failUnless(idp_disco.response is not None) self.failUnlessEqual(len(idp_disco.metadatas), 1) - - def test06(self): - """Build metadata registration response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processRequestMsg(wsp_disco.msgBody) - idp_disco.validateRequest() - try: - idp_disco.buildResponseMsg() - except lasso.Error, e: - self.fail(e) - - self.failUnless(idp_disco.msgBody, 'missing soap answer') - - def test07(self): - """Process metadata registration response""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processRequestMsg(wsp_disco.msgBody) - idp_disco.validateRequest() - idp_disco.buildResponseMsg() - - try: - wsp_disco.processResponseMsg(idp_disco.msgBody) - except lasso.Error, e: - self.fail(e) - - def test08(self): - """Check metadata registration on the WSP""" - idp = self.getIdpServer() - wsp = self.getWspServer() - wsp_disco = lasso.IdWsf2Discovery(wsp) - wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) - wsp_disco.setEpr(dst_epr) - - abstract = 'Personal Profile service' - soapEndpoint = 'http://idp1/soapEndpoint' - wsp_disco.initMetadataRegister() - wsp_disco.addSimpleServiceMetadata(service_types = - ('urn:liberty:id-sis-pp:2005-05',), abstract = abstract, provider_id = wsp.providerIds[0], address = soapEndpoint) - wsp_disco.buildRequestMsg() - - idp_disco = lasso.IdWsf2Discovery(idp) - idp_disco.processRequestMsg(wsp_disco.msgBody) - idp_disco.validateRequest() + metadata = idp_disco.metadatas[0] + self.failUnlessEqual(metadata.abstract, abstract) + self.failUnlessEqual(metadata.providerId, wsp.providerId) + self.failUnlessEqual(len(metadata.serviceContext), 1) + self.failUnlessEqual(len(metadata.serviceContext[0].serviceType), 1) + self.failUnlessEqual(metadata.serviceContext[0].serviceType[0], + lasso.PP11_HREF) + self.failUnlessEqual(len(metadata.serviceContext[0].endpointContext), 1) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].address), + 1) + self.failUnlessEqual(metadata.serviceContext[0].endpointContext[0].address[0], + spSoapEndpoint) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].securityMechId), + 1) + self.failUnlessEqual( + metadata.serviceContext[0].endpointContext[0].securityMechId[0], + lasso.SECURITY_MECH_BEARER) idp_disco.buildResponseMsg() + self.failUnless(metadata.svcMDID is not None) + self.failUnless(idp_disco.msgUrl is None) + self.failUnless(idp_disco.msgBody is not None) wsp_disco.processResponseMsg(idp_disco.msgBody) self.failUnless(len(wsp_disco.metadatas) == 1, 'missing svcMDID') self.failUnless(wsp_disco.metadatas[0].svcMDID, 'missing svcMDID') + def test01(self): + """Test metadata register with redirection""" + idp = self.getIdpServer() + wsp = self.getWspServer() + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + + wsp_disco = lasso.IdWsf2Discovery(wsp) + wsp_disco.setEpr(dst_epr) + + abstract = 'Personal Profile service' + wsp_disco.initMetadataRegister() + self.failUnless(wsp_disco.request is not None) + wsp_disco.addSimpleServiceMetadata(service_types = + (lasso.PP11_HREF,), abstract = abstract, + provider_id = wsp.providerId, address = spSoapEndpoint, + security_mech_ids = (lasso.SECURITY_MECH_BEARER,)) + self.failUnlessEqual(len(wsp_disco.metadatas), 1) + metadata = wsp_disco.metadatas[0] + self.failUnlessEqual(metadata.abstract, abstract) + self.failUnlessEqual(metadata.providerId, wsp.providerId) + self.failUnlessEqual(len(metadata.serviceContext), 1) + self.failUnlessEqual(len(metadata.serviceContext[0].serviceType), 1) + self.failUnlessEqual(metadata.serviceContext[0].serviceType[0], + lasso.PP11_HREF) + self.failUnlessEqual(len(metadata.serviceContext[0].endpointContext), 1) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].address), + 1) + self.failUnlessEqual(metadata.serviceContext[0].endpointContext[0].address[0], + spSoapEndpoint) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].securityMechId), + 1) + self.failUnlessEqual( + metadata.serviceContext[0].endpointContext[0].securityMechId[0], + lasso.SECURITY_MECH_BEARER) + self.failUnless(metadata.svcMDID is None) + wsp_disco.buildRequestMsg() + self.failUnlessEqual(wsp_disco.msgUrl, idpSoapEndpoint) + self.failUnless(wsp_disco.msgBody is not None) + + idp_disco = lasso.IdWsf2Discovery(idp) + self.failUnless(idp_disco is not None) + idp_disco.processRequestMsg(wsp_disco.msgBody) + self.failUnless(idp_disco.request is not None) + self.failUnlessEqual(len(idp_disco.request.svcMD), 1) + self.failUnless(idp_disco.request.svcMD[0].svcMDID is None) + idp_disco.validateRequest() + self.failUnless(idp_disco.response is not None) + self.failUnlessEqual(len(idp_disco.metadatas), 1) + metadata = idp_disco.metadatas[0] + self.failUnlessEqual(metadata.abstract, abstract) + self.failUnlessEqual(metadata.providerId, wsp.providerId) + self.failUnlessEqual(len(metadata.serviceContext), 1) + self.failUnlessEqual(len(metadata.serviceContext[0].serviceType), 1) + self.failUnlessEqual(metadata.serviceContext[0].serviceType[0], + lasso.PP11_HREF) + self.failUnlessEqual(len(metadata.serviceContext[0].endpointContext), 1) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].address), + 1) + self.failUnlessEqual(metadata.serviceContext[0].endpointContext[0].address[0], + spSoapEndpoint) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].securityMechId), + 1) + self.failUnlessEqual( + metadata.serviceContext[0].endpointContext[0].securityMechId[0], + lasso.SECURITY_MECH_BEARER) + idp_disco.buildResponseMsg() + self.failUnless(metadata.svcMDID is not None) + self.failUnless(idp_disco.msgUrl is None) + self.failUnless(idp_disco.msgBody is not None) + + wsp_disco.processResponseMsg(idp_disco.msgBody) + + self.failUnless(len(wsp_disco.metadatas) == 1, 'missing svcMDID') + self.failUnless(wsp_disco.metadatas[0].svcMDID, 'missing svcMDID') + + def test02(self): + "Test failure by IdP for register request" + idp = self.getIdpServer() + wsp = self.getWspServer() + wsp_identity_dump, wsp_session_dump, idp_identity_dump, idp_session_dump, dst_epr = self.login(wsp, idp) + + wsp_disco = lasso.IdWsf2Discovery(wsp) + wsp_disco.setEpr(dst_epr) + + abstract = 'Personal Profile service' + wsp_disco.initMetadataRegister() + self.failUnless(wsp_disco.request is not None) + wsp_disco.addSimpleServiceMetadata(service_types = + (lasso.PP11_HREF,), abstract = abstract, + provider_id = wsp.providerId, address = spSoapEndpoint, + security_mech_ids= (lasso.SECURITY_MECH_BEARER,)) + self.failUnlessEqual(len(wsp_disco.metadatas), 1) + metadata = wsp_disco.metadatas[0] + self.failUnlessEqual(metadata.abstract, abstract) + self.failUnlessEqual(metadata.providerId, wsp.providerId) + self.failUnlessEqual(len(metadata.serviceContext[0].serviceType), 1) + self.failUnlessEqual(metadata.serviceContext[0].serviceType[0], + lasso.PP11_HREF) + self.failUnlessEqual(len(metadata.serviceContext[0].endpointContext), 1) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].address), + 1) + self.failUnlessEqual(metadata.serviceContext[0].endpointContext[0].address[0], + spSoapEndpoint) + self.failUnlessEqual( + len(metadata.serviceContext[0].endpointContext[0].securityMechId), + 1) + self.failUnlessEqual( + metadata.serviceContext[0].endpointContext[0].securityMechId[0], + lasso.SECURITY_MECH_BEARER) + self.failUnless(metadata.svcMDID is None) + wsp_disco.buildRequestMsg() + self.failUnlessEqual(wsp_disco.msgUrl, idpSoapEndpoint) + self.failUnless(wsp_disco.msgBody is not None) + print wsp_disco.dump() + + idp_disco = lasso.IdWsf2Discovery(idp) + self.failUnless(idp_disco is not None) + idp_disco.processRequestMsg(wsp_disco.msgBody) + self.failUnless(idp_disco.request is not None) + idp_disco.failRequest(lasso.IDWSF2_DISCOVERY_STATUS_CODE_FAILED, lasso.IDWSF2_DISCOVERY_STATUS_CODE_FORBIDDEN) + self.failUnless(idp_disco.response is not None) + self.failUnless(idp_disco.response.status is not None) + self.failUnless(idp_disco.response.status.code is not lasso.IDWSF2_DISCOVERY_STATUS_CODE_FAILED) + self.failUnlessEqual(len(idp_disco.response.status.status), 1) + self.failUnless(idp_disco.response.status.status[0].code is not lasso.IDWSF2_DISCOVERY_STATUS_CODE_FORBIDDEN) + idp_disco.buildResponseMsg() + self.failUnless(idp_disco.msgUrl is None) + self.failUnless(idp_disco.msgBody is not None) + + try: + wsp_disco.processResponseMsg(idp_disco.msgBody) + except lasso.Idwsf2DiscoveryForbiddenError: + pass + except lasso.Error, e: + self.fail(e) + class MetadataAssociationAddTestCase(IdWsf2TestCase): def test01(self): """Init metadata association add request""" @@ -1726,13 +1760,13 @@ class DataServiceQueryTestCase(IdWsf2TestCase): self.failUnless(email_strings[1] == email2) -metadataRegisterSuite = unittest.makeSuite(MetadataRegisterTestCase, 'test') -metadataAssociationAddSuite = unittest.makeSuite(MetadataAssociationAddTestCase, 'test') +metadataSuite = unittest.makeSuite(MetadataTestCase, 'test') +#metadataAssociationAddSuite = unittest.makeSuite(MetadataAssociationAddTestCase, 'test') #discoveryQuerySuite = unittest.makeSuite(DiscoveryQueryTestCase, 'test') #dataServiceQuerySuite = unittest.makeSuite(DataServiceQueryTestCase, 'test') -allTests = unittest.TestSuite((metadataRegisterSuite, - metadataAssociationAddSuite,)) # discoveryQuerySuite, dataServiceQuerySuite)) +allTests = unittest.TestSuite((metadataSuite,)) +# metadataAssociationAddSuite,)) # discoveryQuerySuite, dataServiceQuerySuite)) if __name__ == '__main__': sys.exit(not unittest.TextTestRunner(verbosity = 2).run(allTests).wasSuccessful())