This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
glasnost/shared/web/IdentitiesWeb.py

722 lines
31 KiB
Python

# -*- coding: iso-8859-15 -*-
# Glasnost
# By: Odile Bénassy <obenassy@entrouvert.com>
# Romain Chantereau <rchantereau@entrouvert.com>
# Nicolas Clapiès <nclapies@easter-eggs.org>
# Pierre-Antoine Dejace <padejace@entrouvert.be>
# Thierry Dulieu <tdulieu@easter-eggs.com>
# Florent Monnier <monnier@codelutin.com>
# Cédric Musso <cmusso@easter-eggs.org>
# Frédéric Péters <fpeters@entrouvert.be>
# Benjamin Poussin <poussin@codelutin.com>
# Emmanuel Raviart <eraviart@entrouvert.com>
# Sébastien Régnier <regnier@codelutin.com>
# Emmanuel Saracco <esaracco@easter-eggs.com>
#
# Copyright (C) 2000, 2001 Easter-eggs & Emmanuel Raviart
# Copyright (C) 2002 Odile Bénassy, Code Lutin, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Frédéric Péters, Benjamin Poussin, Emmanuel Raviart,
# Emmanuel Saracco & Théridion
# Copyright (C) 2003 Odile Bénassy, Romain Chantereau, Nicolas Clapiès,
# Code Lutin, Pierre-Antoine Dejace, Thierry Dulieu, Easter-eggs,
# Entr'ouvert, Florent Monnier, Cédric Musso, Ouvaton, Frédéric Péters,
# Benjamin Poussin, Rodolphe Quiédeville, Emmanuel Raviart, Sébastien
# Régnier, Emmanuel Saracco, Théridion & Vecam
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
__doc__ = """Glasnost Identities Web"""
__version__ = '$Revision$'[11:-2]
import httplib
import md5
from OpenSSL import SSL
import socket
import urlparse
import whrandom
from xml.dom.minidom import parseString
import lasso.Protocols.SingleSignOnAndFederation as sso
from lasso.Schemas.SchemaDom import importFromNode
import lasso.Soap
import glasnost.common.context as context
import glasnost.common.faults as faults
import glasnost.common.slots as slots
import glasnost.common.xhtmlgenerator as X
import glasnost.proxy.IdentitiesProxy as proxyIdentities
from glasnost.proxy.tools import getProxyForServerRole
import ObjectsWeb as objects
import things
from tools import OK, accessForbidden, getWeb, getWebForServerRole, redirect, writePageLayout
class AdminIdentities(objects.AdminMixin, proxyIdentities.AdminIdentities):
pass
objects.register(AdminIdentities)
class HTTPSConnection(httplib.HTTPConnection):
certificateFile = None
default_port = httplib.HTTPS_PORT
peerCaCertificateFile = None
privateKeyFile = None
def __init__(self, host, port = None, privateKeyFile = None,
certificateFile = None, peerCaCertificateFile = None,
strict = None):
httplib.HTTPConnection.__init__(self, host, port, strict)
self.privateKeyFile = privateKeyFile
self.certificateFile = certificateFile
self.peerCaCertificateFile = peerCaCertificateFile
def connect(self):
"Connect to a host on a given (SSL) port."
context = SSL.Context(SSL.SSLv23_METHOD)
# Demand a certificate.
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
self.verifyCallback)
context.use_privatekey_file(self.privateKeyFile)
context.use_certificate_file(self.certificateFile)
context.load_verify_locations(self.peerCaCertificateFile)
# Strange hack, that is derivated from httplib.HTTPSConnection, but
# that I (Emmanuel) don't really understand...
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sslSocket = SSL.Connection(context, sock)
sslSocket.connect((self.host, self.port))
self.sock = httplib.FakeSocket(sslSocket, sslSocket)
def verifyCallback(self, connection, x509Object, errorNumber, errorDepth,
returnCode):
# FIXME: What should be done?
return returnCode
class Identification(things.ThingMixin, proxyIdentities.Identification):
pass
things.register(Identification)
class Identity(objects.ObjectWebMixin, proxyIdentities.Identity):
pass
objects.register(Identity)
class IdentitiesWeb(objects.ObjectsWebMixin, proxyIdentities.IdentitiesProxy):
def createSessionIfNeeded(self, nextUri):
"""Create a session, if it doesn't exist yet.
Return either None (if the session already existed or has been created
immediately) or a redirect page which must be returned to the user
agent, in order to create the session.
"""
session = context.getVar('session')
if session is None:
req = context.getVar('req')
sessionsProxy = getProxyForServerRole('sessions')
virtualHost = context.getVar('virtualHost')
try:
session = sessionsProxy.newSession(
req.connection.remote_ip,
serverId = virtualHost.defaultDispatcherId)
except: # Do a tighter check?
if context.getVar('debug'):
raise
return failure(_('Failed to initialize a session.'),
X.roleUrl('login'))
sessionToken = session['sessionToken']
context.setVar('sessionToken', sessionToken)
context.setVar('session', session)
canUseCookie = context.getVar('canUseCookie', default = 0)
if canUseCookie:
url = X.roleUrl('define', action = 'testCookie')
url.add('nextUri', nextUri)
url.add('sessionToken', sessionToken)
context.setVar('canUseCookie', 0)
url = url.getAsUrl()
context.setVar('canUseCookie', 1)
return redirect(url)
return None
def getViewAboveButtonsBarLayout(self, object, fields):
ballotsWeb = getWebForServerRole('ballots')
electionsWeb = getWebForServerRole('elections')
layout = X.array()
layout += objects.ObjectsWebMixin.getViewAboveButtonsBarLayout(
self, object, fields)
def getVotesSectionLayout(person, votes, elections, intertitle):
layout = None
if len(elections) > 0:
layout = X.array( X.h3()(intertitle) )
layout += X.br()
table = X.table()
layout += table
for election in elections:
if not person.id in election.getVoterIds():
continue
tr = X.tr(X.th(scope = 'row')(
X.a(href = X.idUrl(election.id))(
election.getLabelTranslated(
context.getVar('readLanguages')))))
table += tr
if not votes.has_key(election.id):
tr += X.td(_('Abstention'))
else:
vote = votes[election.id]
if vote == 'secret':
tr += X.td(_('Secret Ballot'))
else:
tr += X.td(X.a(href = X.idUrl(vote.id))(
X.asIs(vote.getLabelLine(election))))
return layout
userId = context.getVar('userId')
knownRoles = context.getVar('knownRoles')
if userId and 'ballots' in knownRoles and 'elections' in knownRoles:
# Only an identified user can see someone's votes.
votes = ballotsWeb.getVotesFromTokens(object.voteTokens)
try:
lastElections = electionsWeb.getLastObjectsWithVoter(
50, object.id, ['running'])
except faults.UserAccessDenied:
lastElections = []
layout += getVotesSectionLayout(
object, votes, lastElections,
_("""The votes for the elections in progress"""))
try:
lastElections = electionsWeb.getLastObjectsWithVoter(
10, object.id, ['closed'])
except faults.UserAccessDenied:
lastElections = []
layout += getVotesSectionLayout(
object, votes, lastElections,
_("""The votes for the closed elections"""))
return layout
def login(self): # FIXME: Rename to signOn or authentication or ..., because some authentication methods require no password?
identityProviderId = None
providersWeb = getWebForServerRole('providers')
if providersWeb is not None:
try:
identityProviderId = providersWeb.getRemoteIdentityProviderId()
except faults.MissingItem:
pass
if identityProviderId is None:
# Don't use Liberty Alliance protocol to authenticate user.
return self.loginLocal(context.getVar('nextUri'))
identityProvider = providersWeb.getObject(identityProviderId)
singleSignOnServiceUrl = identityProvider.singleSignOnServiceUrl
loginUrl = sso.getIdentityProviderAuthenticationUrl(
authenticationMethods = ['softwarePki', 'password'],
profile = 'artifact', # FIXME: or 'post'.
relayState = context.getVar('nextUri'), # FIXME: To encrypt.
serviceProviderId = context.getVar('httpHostName'),
singleSignOnServiceUrl = singleSignOnServiceUrl)
return redirect(loginUrl)
login.isPublicForWeb = 1
def loginLibertyAlliance(self, **authenticationRequestKeywords):
"""Liberty Alliance Identity Provider Login Method."""
nextUri = X.actionUrl('loginLibertyAlliance')
for key, value in authenticationRequestKeywords.items():
nextUri.add(key, value)
redirectPage = self.createSessionIfNeeded(nextUri)
if redirectPage is not None:
return redirectPage
session = context.getVar('session')
session['authenticationRequestKeywords'] \
= authenticationRequestKeywords
session['isDirty'] = 1
authenticationRequest = sso.buildAuthenticationRequestFromKeywords(
authenticationRequestKeywords)
authenticationMethods = authenticationRequest.getAuthenticationMethods(
)
isPassive, forceAuthentication = sso.processAuthenticationRequest(
authenticationRequest)
userId = context.getVar('userId')
if isPassive:
if not userId:
url = sso.getServiceProviderAssertionArtifactHandlerUrl(
serviceProvider.assertionConsumerServiceUrl, # FIXME
sso.buildStatus('noPassive'))
return redirect(url)
return self.loginSucceeded2(session['authenticationMethod'],
authenticationRequest)
elif forceAuthentication or not userId:
accountsServerRoles = {
'password': 'passwordaccounts',
'softwarePki': 'x509accounts',
}
accountsWeb = getWebForServerRole(
accountsServerRoles[authenticationMethods[0]]) # FIXME
return accountsWeb.login()
else:
return self.loginSucceeded2(session['authenticationMethod'],
authenticationRequest)
loginLibertyAlliance.isPublicForWeb = 1
def loginLocal(self, afterLoginUri = ''):
"""Non Liberty Alliance local login."""
nextUri = X.actionUrl('loginLocal')
nextUri.add('afterLoginUri', afterLoginUri)
redirectPage = self.createSessionIfNeeded(nextUri)
if redirectPage is not None:
return redirectPage
session = context.getVar('session')
session['afterLoginUri'] = afterLoginUri
session['isDirty'] = 1
authenticationMethods = ['password', 'softwarePki'] # FIXME
accountsServerRoles = {
'password': 'passwordaccounts',
'softwarePki': 'x509accounts',
}
accountsWeb = getWebForServerRole(
accountsServerRoles[authenticationMethods[0]]) # FIXME
return accountsWeb.login()
loginLocal.isPublicForWeb = 1
def loginSucceeded(self, userToken, authenticationMethod):
"""Liberty Alliance Identity Provider Login Succeeded Method."""
session = context.getVar('session')
session['authenticationMethod'] = authenticationMethod
context.setVar('userToken', userToken)
userId = self.getUserId()
user = None
if userId:
try:
user = self.getObject(userId)
except faults.MissingItem:
pass
if user is None:
userToken = ''
userId = None
if session.has_key('userToken'):
del session['userToken']
else:
session['userToken'] = userToken
session['isDirty'] = 1
context.setVar('userToken', userToken)
context.setVar('userId', userId)
context.setVar('user', user)
if not session.has_key('authenticationRequestKeywords'):
# Non Liberty Alliance local login.
if session.has_key('afterLoginUri') and session['afterLoginUri']:
nextUri = session['afterLoginUri']
canUseCookie = context.getVar('canUseCookie', default = 0)
if not canUseCookie:
sessionToken = context.getVar('sessionToken')
nextUri = appendToUri(nextUri,
'sessionToken=' + sessionToken)
else:
nextUri = X.rootUrl()
return redirect(nextUri)
# Liberty Alliance login.
authenticationRequestKeywords \
= session['authenticationRequestKeywords']
authenticationRequest = sso.buildAuthenticationRequestFromKeywords(
authenticationRequestKeywords)
return self.loginSucceeded2(authenticationMethod,
authenticationRequest)
def loginSucceeded2(self, authenticationMethod, authenticationRequest):
user = context.getVar('user')
serviceProviderHostName = authenticationRequest.getProviderID()
providersWeb = getWebForServerRole('providers')
serviceProviderId = providersWeb.getServiceProviderId(
serviceProviderHostName)
serviceProvider = providersWeb.getObject(serviceProviderId)
serviceIdentification = None
if user.serviceIdentifications is not None:
for serviceIdentification in user.serviceIdentifications:
if serviceIdentification.peerHostName \
== serviceProviderHostName:
break
if serviceIdentification is None:
serviceIdentification = Identification()
serviceIdentification.peerHostName = serviceProviderHostName
digest = md5.new(serviceIdentification.peerHostName)
randomGenerator = whrandom.whrandom()
randomSalt = str(randomGenerator.uniform(0.1, 1))[2:]
digest.update(randomSalt)
serviceIdentification.localNameIdentifier = digest.hexdigest()
if user.serviceIdentifications is None:
user.serviceIdentifications = []
user.serviceIdentifications.append(serviceIdentification)
self.modifyPartialObject(user, ['serviceIdentifications'])
peerNameIdentifier = serviceIdentification.peerNameIdentifier
if not peerNameIdentifier:
peerNameIdentifier = serviceIdentification.localNameIdentifier
nameIdentifierPolicy = authenticationRequest.getNameIDPolicy()
assertion = sso.buildAuthenticationAssertion(
authnRequest = authenticationRequest,
issuer = context.getVar('httpHostName'),
authenticationMethod = authenticationMethod,
nameIdentifier = peerNameIdentifier,
nameIdentifierFormat = 'federated', # FIXME
idpNameIdentifier = serviceIdentification.localNameIdentifier,
idpNameIdentifierFormat = 'federated', # FIXME
)
profile = authenticationRequest.getProtocolProfile()
if profile == 'artifact':
assertionsProxy = getProxyForServerRole('assertions')
artifact = assertionsProxy.addAssertion(assertion.exportToString())
url = sso.getServiceProviderAssertionArtifactHandlerUrl(
serviceProvider.assertionConsumerServiceUrl, # FIXME
artifact,
authenticationRequest.getRelayState())
return redirect(url)
else: # profile == 'post'
authenticationResponse \
= sso.buildAuthenticationResponseFromAuthnRequest(
providerId = context.getVar('httpHostName'),
statusCode = 'success', # FIXME
assertion = assertion,
authnRequest = authenticationRequest)
authenticationResponseEmbedded \
= authenticationResponse.exportToEmbeddedUrl()
context.push(_level = 'loginSucceeded', layoutMode = 'edit')
try:
layout = X.array()
submitUrl = serviceProvider.assertionConsumerServiceUrl # FIXME
form = X.form(action = submitUrl,
enctype = 'multipart/form-data', method = 'post')
layout += form
form += X.input(name = 'LARES', type = 'hidden',
value = authenticationResponseEmbedded)
buttonsBar = X.div(_class = 'buttons-bar')
form += buttonsBar
buttonsBar += X.buttonInForm('ok', 'okButton')
return writePageLayout(layout, _('Authentication Succeeded'))
finally:
context.pull(_level = 'loginSucceeded')
def logout(self):
self.deleteUserToken()
session = context.getVar('session')
if session is not None:
# Don't delete the session, just remove userToken from it.
context.setVar('userId', None)
context.setVar('user', None)
if session.has_key('userToken'):
del session['userToken']
session['isDirty'] = 1
context.setVar('userToken', '')
# if not context.getVar('sessionTokenInCookie', default = 0):
# # The sessionToken was not stored in a cookie, so don't try to
# # use
# # the cookie when loging out.
# context.setVar('canUseCookie', 0)
nextUri = context.getVar('nextUri') or ''
if not nextUri:
nextUri = '/'
else:
nextUri = cleanUpUri(nextUri, ['sessionToken'])
canUseCookie = context.getVar('canUseCookie', default = 0)
if not canUseCookie:
nextUri = appendToUri(nextUri,
'sessionToken=' + context.getVar('sessionToken'))
return redirect(nextUri)
logout.isPublicForWeb = 1
def newPerson(self):
peopleWeb = getWebForServerRole('people')
if not peopleWeb.canAddObject():
return accessForbidden()
person = peopleWeb.newObject()
return self.newPersonObject(person)
def newPersonObject(self, object):
context.push(_level = 'newPersonObject', layoutMode = 'edit')
try:
layout = X.array()
if context.getVar('error'):
layout += object.getErrorLayout()
form = X.form(action = X.actionUrl('newPersonSubmit'),
enctype = 'multipart/form-data', method = 'post')
layout += form
if context.getVar('nextUri'):
form += X.input(name = 'nextUri', type = 'hidden',
value = context.getVar('nextUri'))
slot = slots.Root(object)
widget = slot.getWidget()
form += widget.getModelPageBodyLayout(slot, fields = None)
form += X.div(_class = 'buttons-bar')(
X.span(_class = 'action-buttons-bar')(
X.buttonInForm('create', 'createButton')))
return writePageLayout(layout, _('New Account'))
finally:
context.pull(_level = 'newPersonObject')
def newPersonSubmit(self, **keywords):
if keywords is None:
keywords = {}
peopleWeb = getWebForServerRole('people')
person = peopleWeb.newObject(keywords)
person.submitFields(keywords)
if context.getVar('again'):
return self.newPersonObject(person)
try:
result = peopleWeb.submitAddObject(person)
except:
if context.getVar('debug'):
raise
return accessForbidden() # TODO: return failure ?
if result:
return result
user = context.getVar('user')
if user is not None:
user.personId = person.id
self.modifyPartialObject(user, ['personId'])
if context.getVar('nextUri'):
return redirect(context.getVar('nextUri'))
return redirect(X.idUrl(person.id))
newPersonSubmit.isPublicForWeb = 1
def processAuthenticationResponse(self, **keywords):
# Create the service provider side session.
nextUri = X.actionUrl('processAuthenticationResponse')
for key, value in keywords.items():
nextUri.add(key, value)
redirectPage = self.createSessionIfNeeded(nextUri)
if redirectPage is not None:
return redirectPage
if keywords.has_key('LARES'):
# POST
authenticationResponseEmbedded = keywords['LARES']
# Remove base64 url encoding.
import lasso.Schemas.Schema as schema
authenticationResponseEmbeddedDecoded = schema.decodeEmbeddedUrl(
authenticationResponseEmbedded)
authenticationResponseKeywords = {}
for segment in authenticationResponseEmbeddedDecoded.split('&'):
key, value = segment.split('=')
authenticationResponseKeywords[key] = value
response = sso.buildAuthenticationResponseFromKeywords(
authenticationResponseKeywords)
relayState = response.getRelayState() # FIXME: Uncrypt relayState.
identityProviderHostName = response.getProviderID()
elif keywords.has_key('samlp:AssertionArtifact'):
# Artifact.
if keywords.has_key('RelayState'):
# FIXME: Uncrypt relayState.
relayState = keywords['RelayState']
del keywords['RelayState']
else:
relayState = None
providersWeb = getWebForServerRole('providers')
# FIXME: Don't use getRemoteIdentityProviderId(). Use instead, the
# IdentityProviderSuccintID which is encoded into the artifact.
identityProviderId = providersWeb.getRemoteIdentityProviderId()
identityProvider = providersWeb.getObject(identityProviderId)
identityProviderHostName = identityProvider.providerId
idpParsedSoapEndpoint = urlparse.urlparse(
identityProvider.soapEndpoint)
idpAddressingScheme, idpHostName, idpPath \
= idpParsedSoapEndpoint[:3]
assertionArtifact = sso.buildAssertionArtifactFromKeywords(
keywords)
request = sso.buildRequest(assertionArtifact)
requestSoapMessage = lasso.Soap.buildMessage(
request.exportToString())
headers = {'Content-type': 'text/xml'}
if idpAddressingScheme == 'https':
# Use HTTPS protocol.
# FIXME: Use PEM stored in glasnost attributes.
shortHostName = context.getVar('httpHostName').split('.')[0]
connection = HTTPSConnection(
idpHostName,
privateKeyFile = '/home/manou/ssl-certificates/servers/%s/%s_private_key_uncrypted.pem' % (shortHostName, shortHostName),
certificateFile = '/home/manou/ssl-certificates/servers/%s/%s_entrouvert.crt' % (shortHostName, shortHostName),
peerCaCertificateFile = '/home/manou/ssl-certificates/certification-authorities/entrouvert_and_vandoeuvre-les-nancy_ca.crt')
else:
# Use HTTP protocol.
connection = httplib.HTTPConnection(idpHostName)
connection.request(
'POST',
idpPath,
requestSoapMessage,
headers)
responseSoapMessageFile = connection.getresponse()
responseSoapMessage = responseSoapMessageFile.read()
try:
responseXml = lasso.Soap.extractFromMessage(
responseSoapMessage)
except: # FIXME
raise responseSoapMessage
responseDom = parseString(responseXml)
response = importFromNode(responseDom, 1)
else:
# FIXME: Is this needed? Can idp directly return a status?
response = buildStatusFromKeywords(keywords)
statusCode = response.getStatusCode()
if statusCode != 'success':
layout = X.array()
layout += X.p(_class = 'alert')(
_('Liberty Alliance authentication assertion request'
' failed (reason = %s)') % statusCode)
return writePageLayout(layout, _('Failure'), canCache = 0)
assertion = response.getAssertion()
idpNameIdentifier = assertion.getIDPProvidedNameIdentifier()
nameIdentifier = assertion.getNameIdentifier()
if nameIdentifier and nameIdentifier != idpNameIdentifier:
try:
userToken = self.checkIdentityLocalNameIdentifier(
identityProviderHostName, nameIdentifier)
except faults.WrongNameIdentifier:
userToken = None
else:
try:
userToken = self.checkIdentityPeerNameIdentifier(
identityProviderHostName, idpNameIdentifier)
except faults.WrongNameIdentifier:
userToken = None
nextUri = relayState
if nextUri:
canUseCookie = context.getVar('canUseCookie', default = 0)
if not canUseCookie:
sessionToken = context.getVar('sessionToken')
nextUri = appendToUri(nextUri,
'sessionToken=' + sessionToken)
else:
nextUri = X.rootUrl()
session = context.getVar('session')
session['authenticationMethod'] = assertion.getAuthenticationMethod()
session['isDirty'] = 1
if userToken is None:
user = Identity()
user.language = context.getVar('readLanguages')[0]
identification = Identification()
if nameIdentifier and nameIdentifier != idpNameIdentifier:
identification.localNameIdentifier = nameIdentifier
identification.peerHostName = identityProviderHostName
identification.peerNameIdentifier = idpNameIdentifier
user.identityIdentifications = [identification]
userId = self.addObject(user)
if nameIdentifier and nameIdentifier != idpNameIdentifier:
userToken = self.checkIdentityLocalNameIdentifier(
identityProviderHostName, nameIdentifier)
else:
userToken = self.checkIdentityPeerNameIdentifier(
identityProviderHostName, idpNameIdentifier)
session['userToken'] = userToken
context.setVar('userToken', userToken)
context.setVar('userId', userId)
context.setVar('user', user)
context.push(_level = 'processAuthenticationResponse',
nextUri = nextUri)
try:
return self.newPerson()
finally:
context.pull(_level = 'processAuthenticationResponse')
context.setVar('userToken', userToken)
userId = self.getUserId()
user = None
if userId:
try:
user = self.getObject(userId)
except faults.MissingItem:
pass
if user is None:
userToken = ''
userId = None
if session.has_key('userToken'):
del session['userToken']
else:
session['userToken'] = userToken
session['isDirty'] = 1
context.setVar('userToken', userToken)
context.setVar('userId', userId)
context.setVar('user', user)
return redirect(nextUri)
processAuthenticationResponse.isPublicForWeb = 1
def processSoapRequest(self):
"""Liberty Alliance Identity Provider SOAP Request Processing Method.
"""
requestSoapMessage = context.getVar('xmlPostRaw') # FIXME: Use xmlPost.
requestXml = lasso.Soap.extractFromMessage(requestSoapMessage)
requestDom = parseString(requestXml)
request = importFromNode(requestDom, 1)
artifact = request.samlp_AssertionArtifact[0].PCDATA
assertionsProxy = getProxyForServerRole('assertions')
try:
assertionXml = assertionsProxy.getAssertion(artifact)
except faults.WrongArtifact:
# FIXME: Is it the proper way ton signal an error?
# I believe the good solution is to call buildResponse without any
# assertion => buildStatus should be modified.
response = sso.buildStatus('requestDenied') # FIXME
except:
# FIXME: Is it the proper way ton signal an error?
# I believe the good solution is to call buildResponse without any
# assertion => buildStatus should be modified.
response = sso.buildStatus('requestDenied') # FIXME
else:
assertionDom = parseString(assertionXml)
assertion = importFromNode(assertionDom, 1)
response = sso.buildResponse('success', assertion)
responseSoapMessage = lasso.Soap.buildMessage(
response.exportToString())
req = context.getVar('req')
req.content_type = 'text/xml'
req.send_http_header()
req.write(responseSoapMessage)
return OK
processSoapRequest.isPublicForWeb = 1