This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic-old/authentic/cas.py

496 lines
19 KiB
Python

import quixote.directory
import qommon.storage
import qommon.errors as errors
from quixote import get_session, get_field, get_request, \
redirect, get_publisher, get_session_manager
from qommon import get_logger
import random
import string
import datetime
import identities
import xml.etree.ElementTree as ET
import httplib
import urllib
import urllib2
try:
import ssl
except ImportError:
ssl = None
import os
import admin.configuration as configuration
import authentic.misc as misc
import authentic.login_token as login_token
'''CAS 1.0 and 2.0 implementation as Client and Server
Root certificates are to be put in /var/lib/app_name/domain/cas_trusted_root_certs.
FIXME: use another way to point to the trusted root certs.
'''
TRUSTED_ROOT_CERTS = 'cas_trusted_root_certs'
if ssl:
def cert_args():
certs = os.path.join(get_publisher().app_dir, TRUSTED_ROOT_CERTS)
if os.path.exists(certs):
return { 'cert_reqs': ssl.CERT_REQUIRED, 'ca_certs': certs }
return {}
# Helper class for verified HTTPS connections
class VerifiedHTTPSConnection(httplib.HTTPSConnection):
def connect(self):
# overrides the version in httplib so that we do
# certificate verification
sock = socket.create_connection((self.host, self.port),
self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
# wrap the socket using verification with the root
# certs in trusted_root_certs
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
**cert_args())
# wraps https connections with ssl certificate verification
class VerifiedHTTPSHandler(urllib2.HTTPSHandler):
def __init__(self, connection_class = VerifiedHTTPSConnection):
self.specialized_conn_class = connection_class
urllib2.HTTPSHandler.__init__(self)
def https_open(self, req):
return self.do_open(self.specialized_conn_class, req)
def urlopen(url):
https_handler = VerifiedHTTPSHandler()
url_opener = urllib2.build_opener(https_handler)
return url_opener.open(url)
else:
def urlopen(url):
return urllib.URLopener().open(url)
def test_pgt_url(pgt_url, pgt_id, pgt_iou):
try:
if '?' in url:
prefix = '&'
else:
prefix = '?'
url = '%s%s%s=%s&%s=%s' % (url, prefix, _pgt_id_param,
urllib.quote(str(pgt_id)), _pgt_id_iou, urllib.quote(str(pgt_iou)))
handle = urlopen(url)
handle.readlines()
handle.close()
return True
except:
return False
# Constants #
_CAS_NAMESPACE = 'http://www.yale.edu/tp/cas'
_renew_param = 'renew'
_service_param = 'service'
_gateway_param = 'gateway'
_warn_param = 'warn'
_url_param = 'url'
_ticket_param = 'ticket'
_pgt_url_param = 'pgtUrl'
_pgt_param = 'pgt'
_pgt_id_param = 'pgtId'
_pgt_iou_param = 'pgtIou'
_target_service_param = 'targetService'
_username_field = 'username' # unused
_password_field = 'password' # unused
_lt_field = 'lt' # unused
# ERROR codes
_INVALID_REQUEST_ERROR = 'INVALID_REQUEST'
_INVALID_TICKET_ERROR = 'INVALID_TICKET'
_INVALID_SERVICE_ERROR = 'INVALID_SERVICE'
_INTERNAL_ERROR = 'INTERNAL_ERROR'
_BAD_PGT_ERROR = 'BAD_PGT'
# XML Elements
SERVICE_RESPONSE_ELT = 'serviceResponse'
AUTHENTICATION_SUCCESS_ELT = 'authenticationSuccess'
USER_ELT = 'user'
PGT_ELT = 'proxyGrantingTicket'
PROXIES_ELT = 'proxies'
PROXY_ELT = 'proxy'
AUTHENTICATION_FAILURE_ELT = 'authenticationFailure'
CODE_ELT = 'code'
PROXY_SUCCESS_ELT = 'proxySuccess'
PROXY_TICKET_ELT = 'proxyTicket'
PROXY_FAILURE_ELT = 'proxyFailure'
# CODE_ELT
def make_id(prefix, length):
content = ( random.choice(string.letters+string.digits+'-') for x in range(length-len(prefix)) )
return prefix + ''.join(content)
class CASTicket(qommon.storage.StorableObject):
_names = 'cas-ticket'
def __init__(self, service, user, duration = None, prefix = '', length = 32, valid = True):
id = make_id(prefix, length)
self.service = service
self.session = get_session().id
self.user = user
self.creation_time = datetime.datetime.today()
# if duration is None it means use global setting for it
assert not duration or isinstance(duration, datetime.timedelta)
self.duration = duration
qommon.storage.StorableObject.__init__(self, id)
def stillValid(self):
if not self.user:
return False
if self.duration:
if datetime.datetime.today() > self.creation_time + self.duration:
return True
else:
return False
else:
return True
class ServiceTicket(CASTicket):
def __init__(self, service, user, renew = False, **kwargs):
self.renew = renew
CASTicket.__init__(self, service, user, prefix = 'ST-', **kwargs)
def migrate(self):
'''Retrieve user from a login token if absent'''
if not self.user:
tok = login_token.LoginToken.get(self.id, ignore_errors = True)
if tok.authentication:
self.user = tok.user
self.store()
def stillValid(self):
self.remove_self()
return CASTicket.stillValid(self)
class ProxyTicket(CASTicket):
def __init__(self, pgt_ticket, proxy_service, **kwargs):
CASTicket.__init__(self, proxy_service, pgt_ticket.user,
prefix = 'PT-', **kwargs)
self.proxies = pgt_ticket.proxies
self.session = pgt_ticket.session
def stillValid(self):
self.remove_self()
return CASTicket.stillValid(self)
class PGTTicket(CASTicket):
def __init__(self, proxy_or_service_ticket, pgt_url = None, **kwargs):
CASTicket.__init__(self, proxy_or_service_ticket.service,
proxy_or_service_ticket.user,
prefix = 'PGT-',
length = 64, **kwargs)
self.proxies = getattr(proxy_or_service_ticket, 'proxies', [])
# prepend traversed proxies
self.proxies.insert(0, pgt_url)
self.iou = make_id('PGTIOU-', 64)
self.session = proxy_or_service_ticket.session
def stillValid(self):
try:
# Verify session and session validity
session = get_session_manager()[self.session]
if session.user != self.user:
return False
except:
return False
return CASTicket.stillValid(self)
def cas_element(name, parent = None, **attrib):
if parent is None:
return ET.Element('{%s}%s' % (_CAS_NAMESPACE, name), attrib)
else:
return ET.SubElement(parent, '{%s}%s' % (_CAS_NAMESPACE, name), attrib)
class CASDirectory(quixote.directory.AccessControlled, quixote.directory.Directory):
_q_exports = [ 'login', 'logout', 'validate', 'proxy', 'serviceValidate', 'proxyValidate' ]
def _q_access(self):
if not configuration.get_configuration('cas').get('enable'):
raise errors.TraversalError()
self.logger = get_logger()
def get_cas_uid(self, id):
'''Extract a CAS user id from the Identity object of the current
user'''
user = identities.get_store().get_identity(id)
if isinstance(user.id, str) and user.id.startswith('http'):
return user.id
source = configuration.get_configuration('cas').get('uid_source')
if source == 'username':
try:
for account in user.accounts:
if hasattr(account, 'username'):
return account.username
except:
pass
else:
return user.id
def create_service_ticket(self, service, user, renew = False):
ticket = ServiceTicket(service, user, renew = renew)
ticket.store()
self.logger.info('''[CAS] created service ticket %r with params\
service: %r user: %r renew: %r''' % (ticket.id, service, user, renew))
if '?' in service:
prefix = '&'
else:
prefix = '?'
return '%s%s%s=%s' % (service, prefix, _ticket_param, ticket.id), ticket.id
def login(self):
# FIXME: Is POST accepted ?
# if get_request().get_method() != 'GET':
# raise errors.TraversalError()
# FIXME: do we really have to support the
# credential acceptor mode ? Wouldn't it be
# useless ?
service=get_field(_service_param)
renew=get_field(_renew_param) is not None
gateway=get_field(_gateway_param) is not None
if not service:
# Go back to homepage
return redirect('..')
self.logger.info('''[CAS] login request from %r with parameters \
(renew: %r, gateway: %r)''' % (service, renew, gateway))
session = get_session()
if not session or not session.user or renew:
# Return immediately if passive
if gateway and not renew:
return redirect(service)
return_url, id = self.create_service_ticket(service, None, renew = True)
login_token.LoginToken(id).store()
return_urls = (('okURL', return_url), ('cancelURL', service), ('LoginToken', id))
return misc.redirect_with_return_url('/login', return_urls)
url, _ = self.create_service_ticket(service, session.user)
self.logger.info('[CAS] redirecting to %r' % url)
return redirect(url)
def logout(self):
get_session().after_url = get_field(_url_param)
return redirect('../logout')
def validate(self):
message = 'validation '
try:
while True:
if get_request().get_method() != 'GET':
break
service = get_field(_service_param)
ticket = get_field(_ticket_param)
renew = get_field(_renew_param) is not None
message += ' from %r for ticket %r and params (renew: %r)' \
% (service, ticket, renew)
if not service or not ticket:
break
if not ticket.startswith('ST-'):
break
service_ticket = ServiceTicket.get(ticket, ignore_errors=True)
if not service_ticket:
break
if not service_ticket.stillValid():
break
try:
uid = self.get_cas_uid(service_ticket.user)
except:
break
if service_ticket.service != service:
break
if renew and (not service_ticket.renew or
not login_token.LoginToken.has_good_authentication(ticket)):
break
# Wrong really wrong, but what to return ?
self.logger.info('[CAS] %s, result is YES' % message)
return 'yes\n%s\n' % uid
except:
pass
self.logger.info('[CAS] %s, result is NO' % message)
return 'no\n\n'
def process_pgt_url(self, pgt_url, service_ticket, authentication_success):
'''Contact the passed pgtUrl and eventually produce
proxy-granting-ticket and a proxy-granting-ticket IOU'''
import pdb; pdb.set_trace()
if not pgt_url.startswith('https://'):
return
pgt = PGTTicket(service_ticket, pgt_url = pgt_url)
if test_pgt_url(pgt_url, pgt.id, pgt.iou):
self.logger.info('''[CAS] validated pgt callback URL %r for pgt ticket\
%r and iou %r''' % (pgt_url, pgt.id, pgt.iou))
pgt.store()
proxy_granting_ticket = cas_element(PGT_ELT, parent = authentication_success)
proxy_granting_ticket.text = pgt.iou
else:
self.logger.warn('''[CAS] validation failed pgt callback URL %r for pgt ticket\
%r and iou %r''' % (pgt_url, pgt.id, pgt.iou))
def serviceValidate(self):
return self.cas20_validate()
def proxyValidate(self):
return self.cas20_validate(proxy=True)
def cas20_validate(self, proxy=False):
service = get_field(_service_param)
ticket = get_field(_ticket_param)
pgt_url = get_field(_pgt_url_param)
renew = get_field(_renew_param) is not None
message = '''[CAS] validation 2.0 from %r for ticket %r \
with params (pgt_url: %r, renew: %r)''' % (service, ticket, pgt_url, renew)
service_response = cas_element(SERVICE_RESPONSE_ELT)
comment = ''
code = _INTERNAL_ERROR
try:
while True:
if get_request().get_method() != 'GET':
comment += 'request is not a GET. '
code = _INVALID_REQUEST_ERROR
break
if not service or not ticket:
if not service:
comment += 'service parameter is missing. '
if not ticket:
comment += 'ticket parameter is missing. '
code = _INVALID_REQUEST_ERROR
break
if not ticket.startswith('ST-') and \
(not proxy or not ticket.startswith('PT-')):
comment += 'ticket is not a valid service ticket. '
if proxy:
comment += 'ticket is not a valid proxy ticket. '
code = _INVALID_TICKET_ERROR
break
service_ticket = ServiceTicket.get(ticket, ignore_errors=True)
# Only validable one time
if service_ticket:
# service_ticket.remove_self()
pass
if not service_ticket or not service_ticket.stillValid() \
or (renew and not service_ticket.renew):
if not service_ticket:
comment += 'ticket is unknown. '
elif not service_ticket.stillValid():
comment += 'ticket is expired. '
elif renew and (not service_ticket.renew or
not login_token.LoginToken.has_good_authentication(ticket)):
comment += 'ticket not created with fresh credentials. '
code = _INVALID_TICKET_ERROR
break
if service_ticket.service != service:
comment += 'ticket service differ from service parameter. '
code = _INVALID_SERVICE_ERROR
break
try:
user = identities.get_store().get_identity(service_ticket.user)
except:
self.logger.error('CAS Service ticket %s contained an\
unknown user %s' % (ticket, service_ticket.user))
comment += 'internal error contact your administrator. '
code = _INTERNAL_ERROR
break
try:
# Wrong really wrong, but what to return ?
username = self.get_cas_uid(service_ticket.user)
except:
self.logger.error('User %s has no account name, cannot\
validate a CAS ticket %s' % (user, ticket))
comment += 'internal error contact your administrator. '
code = _INTERNAL_ERROR
break
# Everything is ok
authentication_success = cas_element(AUTHENTICATION_SUCCESS_ELT,
parent = service_response)
user = cas_element(USER_ELT, parent = authentication_success)
user.text = username
if pgt_url:
# Try to contact the callback PGT service
self.process_pgt_url(pgt_url, service_ticket,
authentication_success)
if ticket.startswith('PT-'):
proxies = cas_element(PROXIES_ELT, parent = authentication_success)
for proxy in service_ticket.proxies:
proxy = cas_element(PROXY_ELT, parent = proxies)
proxy.text = proxy
self.logger.info(message + ' is YES (comment: %r)' % comment)
return ET.tostring(service_response, 'utf-8')
except:
comment += 'internal error contact your administrator. '
authentication_failure = cas_element(AUTHENTICATION_FAILURE_ELT,
parent = service_response, code = code)
authentication_failure.text = comment
self.logger.info(message + ' is NO (comment: %r)' % comment)
return ET.tostring(service_response, 'utf-8')
def proxy(self):
pgt = get_field(_pgt_param)
target_service = get_firld(_target_service_param)
service_response = cas_element(SERVICE_RESPONSE_ELT)
comment = ''
code = _INTERNAL_ERROR
message = '''[CAS] proxy to %r for ticket %r ''' % \
(target_service, pgt)
try:
while True:
if get_request().get_method() != 'GET':
comment += 'request is not a GET. '
code = _INVALID_REQUEST_ERROR
break
if not target_service:
code = _INVALID_REQUEST_ERROR
comment += 'targetService parameter is missing. '
break
if not pgt:
code = _INVALID_REQUEST_ERROR
comment += 'pgt is missing. '
break
if not pgt.startswith('PGT-'):
code = _BAD_PGT_ERROR
comment += 'pgt does not start with PGT-. '
break
proxy_granting_ticket = PGTTicket.get(pgt)
if proxy_granting_ticket:
code = BAD_PGT
comment += 'unknown pgt. '
break
message += ' from pgt_url %r' % proxy_granting_ticket.proxies[0]
if not proxy_granting_ticket.stillValid():
code = BAD_PGT
comment += 'expired pgt. '
break
proxy_ticket = ProxyTicket(proxy_granting_ticket, target_service)
proxy_success = cas_element(PROXY_SUCCESS_ELT, parent = service_response)
proxy_ticket_elt = cas_element(PROXY_TICKET_ELT, parent = proxy_success)
proxy_ticket_elt.text = proxy_ticket.id
proxy_ticket.store()
self.logger.info(message + ' success')
return ET.tostring(service_response, 'utf-8')
except:
comment += 'internal error contact your administrator. '
proxy_failure = cas_element(PROXY_FAILURE_ELT,
parent = service_response, code = code)
proxy_failure.text = comment
self.logger.warn(message + ' failure (comment: %r)' % comment)
return ET.tostring(service_response, 'utf-8')