This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/saml2.py

207 lines
8.1 KiB
Python

import datetime
import os
import urllib2
import lasso
from urlparse import parse_qs
from mandaye import config, utils
from mandaye.saml import saml2utils
from mandaye.auth.authform import AuthForm
from mandaye.response import _302, _500
from mandaye.log import logger
from mandaye.template import serve_template
from mandaye.http import HTTPResponse, HTTPHeader
class SAML2Auth(AuthForm):
""" SAML 2 authentification
"""
def __init__(self, form_values, site_name, saml2_config):
""" saml2_config: saml 2 config module
"""
self.config = saml2_config
self.metadata_map = (
('AssertionConsumerService',
lasso.SAML2_METADATA_BINDING_POST ,
self.config.END_POINTS_PATH['single_sign_on_post']
),
)
self.metadata_options = { 'key': self.config.SAML_SIGNATURE_PUBLIC_KEY }
super(SAML2Auth, self).__init__(form_values, site_name)
def get_default_mapping(self):
return [
{
'path': r'/mandaye/logout',
'on_response': [{
'filter': self.logout,
},]
},
{
'path': r'/mandaye/login$',
'method': 'GET',
'response': [{
'filter': self.login,
'values': {
'associate_url': '/mandaye/associate',
},
'condition': 'response.code==302',
},]
},
{
'path': r'/mandaye/sso$',
'method': 'GET',
'response': [{
'filter': self.sso,
}]
},
{
'path': r'%s$' % self.config.END_POINTS_PATH['metadata'],
'method': 'GET',
'on_response': [{
'filter': self.metadata,
}]
},
{
'path': r'%s' % self.config.END_POINTS_PATH['single_sign_on_post'],
'method': 'POST',
'response': [{
'filter': self.single_sign_on_post,
'values': {
'next_url': '/mandaye/login',
}
}]
},
]
def _get_idp_metadata_file_path(self):
metadata_file_path = None
if self.config.IDP_METADATA:
metadata_file_path = os.path.join(config.data_dir,
self.config.IDP_METADATA.\
replace('://', '_').\
replace('/', '_')
)
if not os.path.isfile(metadata_file_path):
try:
response = urllib2.urlopen(self.config.IDP_METADATA)
metadata = response.read()
except:
return _500('sso', 'Unable to find metadata.')
metadata_file = open(metadata_file_path, 'a+')
metadata_file.write(metadata)
metadata_file.close()
return metadata_file_path
def _get_metadata(self, env):
url_prefix = env['mandaye.scheme'] + '://' + env['HTTP_HOST']
metadata_path = self.config.END_POINTS_PATH['metadata']
single_sign_on_post_path = \
self.config.END_POINTS_PATH['single_sign_on_post']
metagen = saml2utils.Saml2Metadata(url_prefix + metadata_path,
url_prefix = url_prefix)
metagen.add_sp_descriptor(self.metadata_map, self.metadata_options)
return str(metagen)
def sso(self, env, values, request, response):
target_idp = None
metadata_file_path = self._get_idp_metadata_file_path()
if not metadata_file_path:
return _500('sso', 'Unable to load provider.')
logger.debug('sso: target_idp is %s' % target_idp)
logger.debug('sso: metadata url is %s' % self.config.IDP_METADATA)
logger.debug('sso: mandaye metadata are %s' % self._get_metadata(env))
server = lasso.Server.newFromBuffers(self._get_metadata(env),
self.config.SAML_SIGNATURE_PRIVATE_KEY)
if not server:
return _500('sso', 'Error creating server object.')
logger.debug('sso: mandaye server object created')
server.addProvider(lasso.PROVIDER_ROLE_IDP, metadata_file_path)
login = lasso.Login(server)
if not login:
return _500('sso', 'Error creating login object.')
http_method = self.config.AUTHNREQ_HTTP_METHOD
try:
login.initAuthnRequest(target_idp, http_method)
except lasso.Error, error:
return _500('sso', 'Error initiating request.', error)
login.request.nameIDPolicy.format = self.config.SAML2_NAME_IDENTIFIER_FORMAT
login.request.nameIDPolicy.allowCreate = self.config.ALLOW_CREATE
login.request.nameIDPolicy.spNameQualifier = self.config.SP_NAME_QUALIFIER
login.request.protocolBinding = self.config.AUTHNRESP_BINDING
try:
login.buildAuthnRequestMsg()
except lasso.Error, error:
return _500('sso', 'Error initiating request.', error)
logger.debug('sso: set request id in session %s' % login.request.iD)
env['beaker.session']['request_id'] = login.request.iD
env['beaker.session'].save()
if not login.msgUrl:
return _500('sso', 'Enable to perform sso by redirection.')
return _302(login.msgUrl)
def single_sign_on_post(self, env, values, request, response):
target_idp = None
metadata_file_path = self._get_idp_metadata_file_path()
if not metadata_file_path:
return _500('single_sign_on_post', 'Unable to load provider.')
server = lasso.Server.newFromBuffers(self._get_metadata(env),
self.config.SAML_SIGNATURE_PRIVATE_KEY)
if not server:
return _500('singleSignOnPost', 'Error creating server object.')
server.addProvider(lasso.PROVIDER_ROLE_IDP, metadata_file_path)
login = lasso.Login(server)
if not login:
return _500('singleSignOnPost', 'Error creating login object.')
if env['REQUEST_METHOD'] != 'POST':
return _500('singleSignOnPost', 'Not a POST request.')
msg = env['wsgi.input']
params = parse_qs(msg.read())
if not params or not lasso.SAML2_FIELD_RESPONSE in params.keys():
return _500('singleSignOnPost', 'Missing response.')
message = params[lasso.SAML2_FIELD_RESPONSE][0]
logger.debug('sso: message posted %s' % message)
try:
login.processAuthnResponseMsg(message)
except:
return _500('singleSignOnPost', 'Unable to proces authnresponse message.')
subject_confirmation = utils.get_absolute_uri(env)
saml_request_id = env['beaker.session']['request_id']
check = saml2utils.authnresponse_checking(login, subject_confirmation,
logger, saml_request_id=saml_request_id)
if not check:
return _500('singleSignOnPost', 'error checking authn response %s' % env)
logger.debug('sso: response successfully checked')
try:
login.acceptSso()
except lasso.Error, error:
return _500('sso', 'Error validating sso.', error)
logger.debug('sso: sso accepted, session validation')
env['beaker.session']['validated'] = True
attributes = saml2utils.get_attributes_from_assertion(login.assertion,
logger)
env['beaker.session']['attributes'] = attributes
env['beaker.session']['unique_id'] = login.nameIdentifier.content
env['beaker.session'].save()
return _302(values['next_url'])
def metadata(self, env, values, request, response):
title='metadata'
headers = HTTPHeader({'Content-Type': ['text/xml']})
return HTTPResponse(200, 'Found', headers,
self._get_metadata(env))