wcs/wcs/liberty.ptl

89 lines
3.4 KiB
Plaintext

import libxml2
from quixote import get_publisher
from qommon import get_cfg
import qommon.liberty
class LibertyDirectory(qommon.liberty.LibertyDirectory):
def lookup_user(self, session, login):
import lasso
ni = login.nameIdentifier.content
session.name_identifier = ni
nis = list(get_publisher().user_class.select(lambda x: ni in x.name_identifiers))
if nis:
user = nis[0]
else:
compatibility_id_wsf_user = get_cfg('misc', {}).get('grab-user-with-wsf')
id_wsf_user = get_cfg('saml_identities', {}).get('grab_user_with_wsf')
if lasso.WSF_SUPPORT and (compatibility_id_wsf_user or id_wsf_user):
disco = lasso.Discovery(login.server)
disco.setSessionFromDump(session.lasso_session_dump)
try:
disco.initQuery()
except lasso.Error, error:
pass # XXX: there is no defined error code on lasso side
service = None
else:
disco.addRequestedServiceType(lasso.PP_HREF)
disco.buildRequestMsg()
soap_answer = qommon.liberty.soap_call(disco.msgUrl, disco.msgBody)
disco.processQueryResponseMsg(soap_answer)
service = disco.getService()
if not service:
return None
service.initQuery('/pp:PP/pp:InformalName', 'name')
service.addQueryItem('/pp:PP/pp:MsgContact', 'email')
service.buildRequestMsg()
try:
soap_answer = qommon.liberty.soap_call(service.msgUrl, service.msgBody)
except qommon.liberty.SOAPException:
# it was advertised, it didn't work, too bad.
return None
service.processQueryResponseMsg(soap_answer)
email, name = None, None
emailNode = service.getAnswer('/pp:PP/pp:MsgContact')
if emailNode:
# horrible <MsgContact>; rebuild email
doc = libxml2.parseDoc(emailNode)
node = doc.children.children
account, provider = None, None
while node:
if node.name == 'MsgAccount':
account = node.getContent()
if node.name == 'MsgProvider':
provider = node.getContent()
node = node.next
if account and provider:
email = '%s@%s' % (account, provider)
else:
email = ''
nameNode = service.getAnswer('/pp:PP/pp:InformalName')
if nameNode:
doc = libxml2.parseDoc(nameNode)
name = unicode(doc.getContent(), 'utf-8').encode('iso-8859-1')
if email and name:
user = get_publisher().user_class()
user.email = email
user.name = name
user.name_identifiers.append(login.nameIdentifier.content)
user.lasso_dump = login.identity.dump()
user.store()
return user
return None
user.lasso_dump = login.identity.dump()
user.store()
return user