739 lines
28 KiB
Python
739 lines
28 KiB
Python
"""SAML 2.0 SP implementation"""
|
|
|
|
import logging, operator
|
|
|
|
import lasso
|
|
|
|
import authentic2.idp.views as idp_views
|
|
|
|
from django.conf import settings
|
|
from django.shortcuts import render, redirect
|
|
from django.core.urlresolvers import reverse
|
|
from django.http import HttpResponse, HttpResponseRedirect, \
|
|
HttpResponseBadRequest, HttpResponseForbidden
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.contrib.auth import (login as auth_login,
|
|
REDIRECT_FIELD_NAME, authenticate)
|
|
from django.contrib import messages
|
|
from django.utils.translation import ugettext as _
|
|
from django.utils.http import urlencode
|
|
from django.core.cache import cache
|
|
from django.contrib.auth.decorators import login_required
|
|
|
|
from authentic2.saml.common import (load_provider,
|
|
return_saml2_response, return_saml2_request,
|
|
get_saml2_query_request, get_saml2_post_response, soap_call,
|
|
get_authorization_policy, get_idp_options_policy,
|
|
add_federation, send_soap_request, get_soap_message,
|
|
get_saml2_metadata, create_saml2_server,
|
|
maintain_liberty_session_on_service_provider,
|
|
get_session_not_on_or_after,
|
|
AUTHENTIC_STATUS_CODE_UNKNOWN_PROVIDER,
|
|
AUTHENTIC_STATUS_CODE_INTERNAL_SERVER_ERROR,
|
|
AUTHENTIC_STATUS_CODE_UNAUTHORIZED)
|
|
from authentic2.saml.models import (nameid2kwargs, LibertyProvider,
|
|
save_key_values, NAME_ID_FORMATS, get_and_delete_key_values)
|
|
from authentic2.saml.saml2utils import (authnresponse_checking,
|
|
get_attributes_from_assertion)
|
|
from authentic2.idp.saml.saml2_endpoints import return_logout_error
|
|
from authentic2_auth_saml2.utils import error_page
|
|
from authentic2_auth_saml2 import signals
|
|
from authentic2.utils import cache_and_validate, flush_django_session
|
|
|
|
from . import utils, app_settings
|
|
from .decorators import anonymous_only
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MANAGE_DUMP_KEY = 'manage-dump'
|
|
CURRENT_IDP = 'current-idp'
|
|
|
|
metadata_map = (
|
|
('AssertionConsumerService',
|
|
lasso.SAML2_METADATA_BINDING_ARTIFACT,
|
|
'/singleSignOnArtifact'),
|
|
('AssertionConsumerService',
|
|
lasso.SAML2_METADATA_BINDING_POST,
|
|
'/singleSignOnPost'),
|
|
('SingleLogoutService',
|
|
lasso.SAML2_METADATA_BINDING_REDIRECT,
|
|
'/singleLogout', '/singleLogoutReturn'),
|
|
('SingleLogoutService',
|
|
lasso.SAML2_METADATA_BINDING_SOAP,
|
|
'/singleLogoutSOAP'),
|
|
('ManageNameIDService',
|
|
lasso.SAML2_METADATA_BINDING_SOAP,
|
|
'/manageNameIdSOAP'),
|
|
('ManageNameIDService',
|
|
lasso.SAML2_METADATA_BINDING_REDIRECT,
|
|
'/manageNameId', '/manageNameIdReturn'),
|
|
)
|
|
metadata_options = {'key': settings.SAML_SIGNATURE_PUBLIC_KEY}
|
|
try:
|
|
if app_settings.SHOW_DISCO_IN_MD:
|
|
metadata_options['disco'] = ('/discoveryReturn', )
|
|
except:
|
|
pass
|
|
|
|
@cache_and_validate(settings.LOCAL_METADATA_CACHE_TIMEOUT)
|
|
def metadata(request):
|
|
'''Endpoint to retrieve the metadata file'''
|
|
return HttpResponse(get_metadata(request, request.path),
|
|
mimetype='text/xml')
|
|
|
|
HTTP_METHODS = {
|
|
'POST': lasso.HTTP_METHOD_POST,
|
|
'REDIRECT': lasso.HTTP_METHOD_POST,
|
|
}
|
|
|
|
@anonymous_only
|
|
def sso(request):
|
|
'''View for initiating a new authnrequest
|
|
|
|
Query parameters:
|
|
passive - if equal to 1, ask idp not to display any UI to the user
|
|
force_authn - if equal to 1, ask idp to reauthenticate the user
|
|
'''
|
|
is_passive = request.REQUEST.get('passive') == 1
|
|
force_authn = request.REQUEST.get('force_authn') == 1
|
|
|
|
entity_id = request.REQUEST.get('entity_id')
|
|
# 1. Save the target page
|
|
next_url = request.REQUEST.get(REDIRECT_FIELD_NAME,
|
|
settings.LOGIN_REDIRECT_URL)
|
|
|
|
# 2. Init the server object
|
|
server = build_service_provider(request)
|
|
assert server is not None, 'Service provider not configured'
|
|
|
|
# 3. Define the provider or ask the user
|
|
if not entity_id:
|
|
idps = LibertyProvider.objects.idp_enabled()
|
|
assert idps.count() > 1, 'Too much IdP to select one'
|
|
assert idps.count() == 1, 'No IdP to select, add one'
|
|
entity_id = idps[0].entity_id
|
|
logger.info('sso with provider %r', entity_id)
|
|
p = load_provider(request, entity_id, server=server, sp_or_idp='idp',
|
|
autoload=True)
|
|
assert p, 'provider %r not found' % entity_id
|
|
# 4. Build authn request
|
|
login = lasso.Login(server)
|
|
assert login, 'unable to build a LassoLogin object'
|
|
# Only redirect is necessary for the authnrequest
|
|
http_method = server.getFirstHttpMethod(server.providers[p.entity_id],
|
|
lasso.MD_PROTOCOL_TYPE_SINGLE_SIGN_ON)
|
|
assert http_method != lasso.HTTP_METHOD_NONE, \
|
|
'Not HTTP method declared for SSO by %r' % entity_id
|
|
try:
|
|
login.initAuthnRequest(p.entity_id, http_method)
|
|
except lasso.Error, error:
|
|
lasso_error(request, 'login.initAuthnRequest', error)
|
|
|
|
# 5. Request setting
|
|
assert setAuthnrequestOptions(p, login, force_authn, is_passive), \
|
|
'no idp policy defined for %r' % entity_id
|
|
try:
|
|
login.buildAuthnRequestMsg()
|
|
except lasso.Error, error:
|
|
lasso_error(request, 'login.buildAuthnRequestMsg', error)
|
|
|
|
# 6. Save the request ID (association with the target page)
|
|
logger.debug('RequestID: %r', login.request.iD)
|
|
logger.debug('Session Key: %r', request.session.session_key)
|
|
request_id = login.request.id
|
|
request.session['state-%s' % request_id] = next_url
|
|
|
|
# 7. Redirect the user
|
|
title = _('Sending request to %s') % p.name
|
|
return return_saml2_request(request, login, title)
|
|
|
|
|
|
@csrf_exempt
|
|
def assertion_consumer_artifact(request):
|
|
'''Assertion consumer for the artifact binding'''
|
|
if request.method == 'GET':
|
|
http_method = lasso.HTTP_METHOD_ARTIFACT_GET
|
|
else:
|
|
http_method = lasso.HTTP_METHOD_ARTIFACT_POST
|
|
server = build_service_provider(request)
|
|
|
|
# Load the provider metadata using the artifact
|
|
artifact = request.REQUEST.get('SAMLart')
|
|
logger.debug('artifact %r', artifact)
|
|
qs = LibertyProvider.objects.by_artifact(artifact).filter(
|
|
identity_provider__enabled=True)
|
|
assert len(qs) == 0, 'unable to resolve artifact %r' % artifact
|
|
assert len(qs) > 1, 'too much provider found for artifact %r' % artifact
|
|
p = load_provider(request, qs[0].entity_id, server=server, sp_or_idp='idp')
|
|
logger.debug('loaded provider %r', p.entity_id)
|
|
login = build_login(server)
|
|
|
|
try:
|
|
login.initRequest(artifact, http_method)
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'login.initRequest', e)
|
|
|
|
try:
|
|
login.buildRequestMsg()
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'login.buildRequestMsg', e)
|
|
|
|
soap_answer = soap_call(login.msgUrl, login.msgBody)
|
|
|
|
try:
|
|
login.processResponseMsg(soap_answer)
|
|
except lasso.Error, error:
|
|
lasso_error(request, 'login.processResponseMsg', error)
|
|
return sso_after_response(request, login, provider=p)
|
|
|
|
|
|
@csrf_exempt
|
|
def assertion_consumer_post(request):
|
|
'''Assertion consumer for the POST binding'''
|
|
server = build_service_provider(request)
|
|
login = build_login.Login(server)
|
|
message = get_saml2_post_response(request)
|
|
|
|
while True:
|
|
try:
|
|
login.processAuthnResponseMsg(message)
|
|
break
|
|
except (lasso.ServerProviderNotFoundError,
|
|
lasso.ProfileUnknownProviderError):
|
|
entity_id = login.remoteProviderId
|
|
provider_loaded = load_provider(request, entity_id, server=server,
|
|
sp_or_idp='idp')
|
|
assert provider_loaded, 'unable to find provider %r' % entity_id
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'login.processAuthnResponseMsg', e)
|
|
return sso_after_response(request, login, provider=provider_loaded)
|
|
|
|
|
|
def sso_after_response(request, login, relay_state=None,
|
|
provider=None):
|
|
'''Common assertionConsumer processing'''
|
|
try:
|
|
request_id = login.response.inResponseTo
|
|
except AttributeError:
|
|
request_id = None
|
|
if request_id is not None:
|
|
try:
|
|
next_url = request.session.get('state-%s' % request_id)
|
|
except TypeError:
|
|
raise AssertionError('no url stored for request id %r', request_id)
|
|
assert next_url, 'missing next_url'
|
|
else:
|
|
next_url = settings.LOGIN_REDIRECT_URL
|
|
subject_confirmation = request.build_absolute_uri().partition('?')[0]
|
|
assert authnresponse_checking(login, subject_confirmation, logger,
|
|
saml_request_id=request_id), 'authnresponse check failed'
|
|
|
|
try:
|
|
login.acceptSso()
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'login.acceptSso', e)
|
|
|
|
attributes = get_attributes_from_assertion(login.assertion, logger)
|
|
# Register attributes in session for other applications
|
|
request.session['attributes'] = attributes
|
|
|
|
attrs = {}
|
|
|
|
for att_statement in login.assertion.attributeStatement:
|
|
for attribute in att_statement.attribute:
|
|
name = None
|
|
att_format = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC
|
|
nickname = None
|
|
name = attribute.name.decode('ascii')
|
|
if attribute.nameFormat:
|
|
att_format = attribute.nameFormat.decode('ascii')
|
|
if attribute.friendlyName:
|
|
nickname = attribute.friendlyName
|
|
if name:
|
|
if att_format:
|
|
if nickname:
|
|
key = (name, att_format, nickname)
|
|
else:
|
|
key = (name, att_format)
|
|
else:
|
|
key = (name)
|
|
attrs[key] = list()
|
|
for value in attribute.attributeValue:
|
|
content = [any.exportToXml() for any in value.any]
|
|
content = ''.join(content)
|
|
attrs[key].append(content.decode('utf8'))
|
|
|
|
entity_id = provider.entity_id
|
|
a8n = {}
|
|
a8n['certificate_type'] = 'SAML2_assertion'
|
|
TRANSFER = (
|
|
('nameid',
|
|
'subject.nameID.content'),
|
|
('subject_confirmation_method',
|
|
'subject.subjectConfirmation.method'),
|
|
('not_before',
|
|
'subject.subjectConfirmation.subjectConfirmationData.notBefore'),
|
|
('not_on_or_after',
|
|
'subject.subjectConfirmation.subjectConfirmationData.notOnOrAfter'),
|
|
('authn_context',
|
|
'assertion.authnStatement[0].authnContext.authnContextClassRef'),
|
|
('autn_instant',
|
|
'assertion.authnStatement[0].authnInstant'),
|
|
)
|
|
for target, attribute in TRANSFER:
|
|
try:
|
|
a8n[target] = operator.attrgetter(attribute)(login)
|
|
except AttributeError:
|
|
pass
|
|
a8n['attributes'] = attrs
|
|
logger.debug('attributes in assertion %r from %r', attrs, entity_id)
|
|
|
|
#Access control processing
|
|
decisions = signals.authz_decision.send(sender=None, request=request,
|
|
attributes=attributes, provider=provider)
|
|
if not decisions:
|
|
logger.debug('No authorization function connected')
|
|
|
|
access_granted = True
|
|
one_message = False
|
|
for decision in decisions:
|
|
logger.debug('authorization function %r', decision[0].__name__)
|
|
dic = decision[1]
|
|
logger.debug('decision is %r', dic['authz'])
|
|
if 'message' in dic:
|
|
logger.debug('with message %r', dic['message'])
|
|
if not dic['authz']:
|
|
access_granted = False
|
|
if 'message' in dic:
|
|
one_message = True
|
|
messages.add_message(request, messages.ERROR, dic['message'])
|
|
|
|
if not access_granted:
|
|
if not one_message:
|
|
p = get_authorization_policy(provider)
|
|
messages.add_message(request, messages.ERROR,
|
|
p.default_denial_message)
|
|
return error_page(request, logger=logger, default_message=False,
|
|
timer=True)
|
|
|
|
#Access granted, now we deal with session management
|
|
policy = get_idp_options_policy(provider)
|
|
assert policy, 'missing idp options policy'
|
|
|
|
user = request.user
|
|
nid = login.nameIdentifier
|
|
nid_dump = nid.dump()
|
|
logger.debug('nid dump %r', nid_dump)
|
|
nid_format = login.nameIdentifier.format
|
|
if nid_format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT \
|
|
and (policy is None or not policy.transient_is_persistent):
|
|
logger.debug('nid is transient')
|
|
if policy.handle_transient == 'AUTHSAML2_UNAUTH_TRANSIENT_ASK_AUTH':
|
|
if not request.user.is_authenticated():
|
|
response_id = login.response.id
|
|
assert response_id, 'missing response id'
|
|
request.session['state-%s' % response_id] = \
|
|
login.dump(), attributes, next_url
|
|
maintain_liberty_session_on_service_provider(request, login)
|
|
return redirect('a2-auth-saml2-account-linking',
|
|
kwargs={ 'response_id': login.response.id})
|
|
if request.session.test_cookie_worked():
|
|
request.session.delete_test_cookie()
|
|
utils.save_login_session(request, login)
|
|
elif policy.handle_transient == 'AUTHSAML2_UNAUTH_TRANSIENT_OPEN_SESSION':
|
|
logger.debug('Opening session for transient with nameID')
|
|
user = authenticate(name_id=nid)
|
|
assert user, 'No backend for temporary federation is configured'
|
|
return finish_sso(request, login, user, attributes, next_url)
|
|
else:
|
|
raise NotImplementedError('unkown policy.handler_transient')
|
|
return HttpResponseRedirect(next_url)
|
|
else:
|
|
logger.debug('persistent federation processing')
|
|
if policy is not None and policy.transient_is_persistent and \
|
|
nid_format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT:
|
|
logger.debug('use transient nid as persistent %r', nid_dump)
|
|
if policy.persistent_identifier_attribute:
|
|
ppid = policy.persistent_identifier_attribute
|
|
logger.debug('persistent ID attribute: %r', ppid)
|
|
identifier = None
|
|
for key in attributes:
|
|
if not isinstance(key, tuple):
|
|
continue
|
|
if key[0] == ppid and attributes[key]:
|
|
identifier = attributes[key][0]
|
|
break
|
|
assert identifier, 'persistent ID attribute is missing'
|
|
logger.debug('persistent ID attribute value: %r', identifier)
|
|
login.nameIdentifier.content = identifier
|
|
else:
|
|
logger.debug('No persistent ID attribute configured')
|
|
login.nameIdentifier.format = \
|
|
lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
|
login.nameIdentifier.nameQualifier = entity_id
|
|
|
|
user = authenticate(name_id=nid, provider_id=entity_id)
|
|
if not user and policy.handle_persistent == \
|
|
'AUTHSAML2_UNAUTH_PERSISTENT_CREATE_USER_PSEUDONYMOUS':
|
|
# Auto-create an user then do the authentication again
|
|
user = authenticate(name_id=nid, provider_id=entity_id,
|
|
create=True)
|
|
if user:
|
|
return finish_sso(request, login, user, attributes, next_url)
|
|
elif policy.handle_persistent == \
|
|
'AUTHSAML2_UNAUTH_PERSISTENT_ACCOUNT_LINKING_BY_AUTH':
|
|
# Check if the user consent for federation has been given
|
|
if policy.force_user_consent \
|
|
and not login.response.consent in \
|
|
('urn:oasis:names:tc:SAML:2.0:consent:obtained',
|
|
'urn:oasis:names:tc:SAML:2.0:consent:prior',
|
|
'urn:oasis:names:tc:SAML:2.0:consent:current-explicit',
|
|
'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'):
|
|
return error_page(request, _('You were \
|
|
not asked your consent for account linking'),
|
|
logger=logger)
|
|
if request.user.is_authenticated():
|
|
add_federation(request.user, name_id=nid, provider_id=entity_id)
|
|
return HttpResponseRedirect(next_url)
|
|
utils.save_login_session(request, login)
|
|
return render(request, 'authsaml2/account_linking.html')
|
|
raise NotImplementedError
|
|
|
|
def finish_sso(request, login, user, attributes, next_url):
|
|
add_federation(user, login.nameIdentifier,
|
|
provider_id=login.remoteProviderId)
|
|
auth_login(request, user)
|
|
set_session_expiration(request, login)
|
|
request.session[CURRENT_IDP] = login.remoteProviderId
|
|
if request.session.test_cookie_worked():
|
|
request.session.delete_test_cookie()
|
|
signals.auth_login.send(sender=None, request=request,
|
|
attributes=attributes)
|
|
utils.save_login_session(request, login)
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
def redirect_to_account_linking(request, login, attributes,
|
|
next_url):
|
|
'''Save current state and redirect to account linking view'''
|
|
logger.debug('redirecting to account linking')
|
|
response_id = login.response.id
|
|
assert response_id, 'missing response id'
|
|
state_key = 'state-%s' % response_id
|
|
cache.set(state_key, (login.dump(), attributes, next_url))
|
|
return redirect('a2-auth-saml2-account-linking',
|
|
kwargs={'response_id': response_id})
|
|
|
|
|
|
@login_required
|
|
def account_linking(request, response_id):
|
|
'''Called after assertionConsumer for account linking'''
|
|
assert response_id, 'missing response_id'
|
|
state_key = 'state-%s' % response_id
|
|
state = cache.get(state_key)
|
|
assert state, 'missing state'
|
|
login_dump, attributes, next_url = state
|
|
|
|
login = build_login_from_dump(request, login_dump)
|
|
|
|
return finish_sso(request, login, request.user, attributes,
|
|
next_url)
|
|
|
|
|
|
'''
|
|
Single Logout (SLO)
|
|
|
|
Initiated by SP or by IdP with SOAP or with Redirect
|
|
'''
|
|
|
|
|
|
def ko_icon():
|
|
return HttpResponseRedirect('%s/authentic2/images/ko.png' \
|
|
% settings.STATIC_URL)
|
|
|
|
|
|
def ok_icon():
|
|
return HttpResponseRedirect('%s/authentic2/images/ok.png' \
|
|
% settings.STATIC_URL)
|
|
|
|
|
|
def sp_slo(request):
|
|
'''
|
|
To make another module call the SLO function.
|
|
Does not deal with the local django session.
|
|
'''
|
|
assert 'session_key' in request.REQUEST, 'missing session key'
|
|
assert 'entity_id' in request.REQUEST, 'missing entity_id'
|
|
session_key = request.REQUEST['session_key']
|
|
entity_id = request.REQUEST.get('entity_id')
|
|
next_url = request.REQUEST.get(REDIRECT_FIELD_NAME) \
|
|
or settings.LOGIN_URL
|
|
logout = build_logout(request)
|
|
load_provider(request, entity_id, server=logout.server, sp_or_idp='idp')
|
|
utils.load_logout_session(logout, session_key, entity_id)
|
|
try:
|
|
logout.initRequest(entity_id)
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'logout.initRequest', e)
|
|
request_id = logout.request.id
|
|
try:
|
|
logout.buildRequestMsg()
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'logout.buildRequestMsg', e)
|
|
# SOAP case
|
|
if logout.msgBody:
|
|
soap_response = send_soap_request(request, logout)
|
|
return process_logout_response(request, logout, soap_response, session_key, next_url)
|
|
else:
|
|
request['state-%s' % request_id] = logout.dump(), session_key, entity_id, next_url
|
|
return HttpResponseRedirect(logout.msgUrl)
|
|
|
|
|
|
def process_logout_response(request, logout, soap_response,
|
|
session_key, next_url):
|
|
try:
|
|
logout.processResponseMsg(soap_response)
|
|
except lasso.Error, error:
|
|
lasso_error(request, 'logout.processResponseMsg', error)
|
|
else:
|
|
utils.kill_logout_session(session_key, entity_id)
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
|
|
def slo_sp_response(request):
|
|
'''
|
|
IdP response to a SLO SP initiated by redirect
|
|
'''
|
|
logout = build_logout(request)
|
|
utils.load_logout_session(logout, request=request)
|
|
query = get_saml2_query_request(request)
|
|
provider_loaded = None
|
|
provider_id = None
|
|
while True:
|
|
try:
|
|
logout.processResponseMsg(query)
|
|
break
|
|
except (lasso.ServerProviderNotFoundError,
|
|
lasso.ProfileUnknownProviderError):
|
|
provider_id = logout.remoteProviderId
|
|
provider_loaded = load_provider(request, provider_id,
|
|
server=logout.server, sp_or_idp='idp')
|
|
assert provider_loaded, 'unable to load provider'
|
|
continue
|
|
except lasso.Error, e:
|
|
lasso_error(request, 'logout.processResponseMsg', e)
|
|
try:
|
|
request_id = logout.response.inResponseTo
|
|
except AttributeError:
|
|
raise AssertionError('missing inResponseTo')
|
|
state = request.session.get('state-%s' % request_id)
|
|
assert state, 'missing state'
|
|
dump, session_key, entity_id, next_url = state
|
|
assert logout.remoteProviderId == entity_id, 'entityID mismatch'
|
|
utils.kill_logout_session(session_key, entity_id)
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
|
|
def slo_common(request, message):
|
|
'''Common processing between SOAP and asynchronous SLO request handlers'''
|
|
logout = build_logout(request)
|
|
|
|
provider_loaded = None
|
|
while True:
|
|
try:
|
|
logout.processRequestMsg(message)
|
|
break
|
|
except (lasso.ServerProviderNotFoundError,
|
|
lasso.ProfileUnknownProviderError):
|
|
provider_id = logout.remoteProviderId
|
|
provider_loaded = load_provider(request, provider_id,
|
|
server=logout.server, sp_or_idp='idp')
|
|
|
|
if not provider_loaded:
|
|
logger.warn('provider %r unknown', provider_id)
|
|
return return_logout_error(request, logout,
|
|
AUTHENTIC_STATUS_CODE_UNKNOWN_PROVIDER), None, None
|
|
else:
|
|
continue
|
|
except lasso.Error, e:
|
|
logger.error('logout.processRequestMsg %s', e)
|
|
return return_logout_error(request, logout,
|
|
AUTHENTIC_STATUS_CODE_INTERNAL_SERVER_ERROR), None, None
|
|
|
|
policy = get_idp_options_policy(provider_loaded)
|
|
if not policy or not policy.accept_slo:
|
|
if not policy:
|
|
logger.error('no policy found for %r',
|
|
logout.remoteProviderId)
|
|
elif not policy.accept_slo:
|
|
logger.warn('received slo from %r not authorized',
|
|
logout.remoteProviderId)
|
|
return return_logout_error(request, logout,
|
|
AUTHENTIC_STATUS_CODE_UNAUTHORIZED), None, None
|
|
|
|
# Look for a session index
|
|
if hasattr(logout.request, 'sessionIndexes'):
|
|
session_indexes = logout.request.sessionIndexes
|
|
else:
|
|
session_indexes = []
|
|
if logout.request.sessionIndex:
|
|
session_indexes.append(logout.request.sessionIndex)
|
|
|
|
name_id = logout.request.nameId
|
|
assert name_id, 'missing NameID'
|
|
if not utils.has_sessions(name_id, session_indexes):
|
|
logger.warning('no sessions found for name id %r and '
|
|
'sessions indexes %r', nameid2kwargs(name_id),
|
|
session_indexes)
|
|
return return_logout_error(request, logout,
|
|
lasso.SAML2_STATUS_CODE_REQUESTER), None, None
|
|
try:
|
|
logout.validateRequest()
|
|
except lasso.Error, e:
|
|
lasso_error('logout.validateRequest', e)
|
|
sessions = utils.get_session(name_id, session_indexes)
|
|
return None, logout, sessions
|
|
|
|
@csrf_exempt
|
|
def slo_soap(request):
|
|
'''SOAP SLO'''
|
|
message = get_soap_message(request)
|
|
request_type = lasso.getRequestTypeFromSoapMsg(message)
|
|
assert request_type == lasso.REQUEST_TYPE_LOGOUT, 'not a logout request'
|
|
|
|
error, logout, sessions = slo_common(request, message)
|
|
if error: # early return
|
|
return error
|
|
if not utils.can_do_synchronous_logout(sessions):
|
|
logger.warning('cannot do SOAP logout because there are IdP '
|
|
'sessions')
|
|
return return_logout_error(request, logout,
|
|
lasso.SAML2_STATUS_CODE_UNSUPPORTED_PROFILE)
|
|
for session_key in utils.get_session_keys(sessions):
|
|
flush_django_session(session_key)
|
|
sessions.delete()
|
|
return slo_return_response(request, logout)
|
|
|
|
def finish_slo(request):
|
|
'''Return response to the IdP issuer of the logout request'''
|
|
request_id = request.REQUEST.get('id')
|
|
assert request_id, 'missing id argument'
|
|
logout_dump, session_key, entity_id = get_and_delete_key_values(request_id)
|
|
server = create_server(request)
|
|
logout = lasso.Logout.newFromDump(server, logout_dump)
|
|
load_provider(request, entity_id, server=logout.server, sp_or_idp='idp')
|
|
return slo_return_response(request, logout)
|
|
|
|
def singleLogout(request):
|
|
'''POST or Redirect SLO by IdP'''
|
|
message = get_saml2_query_request(request)
|
|
error, sessions, logout = slo_common(request, message)
|
|
if error: # early return
|
|
return error
|
|
sessions.delete()
|
|
key = logout.request.id
|
|
dump = logout.dump()
|
|
session_key = request.session.session_key
|
|
entity_id = logout.remoteProviderId
|
|
save_key_values(key, dump, session_key, entity_id)
|
|
query = urlencode({'id': key})
|
|
next_url = '{0}?{1}'.format(reverse('a2-auth-saml2-finish-slo'), query)
|
|
return idp_views.redirect_to_logout(request, next_page=next_url)
|
|
|
|
def slo_return_response(request, logout):
|
|
'''Return response to requesting IdP'''
|
|
try:
|
|
logout.buildResponseMsg()
|
|
except lasso.Error, error:
|
|
lasso_error(request, 'logout.buildResponseMsg', error)
|
|
return return_saml2_response(request, logout)
|
|
|
|
#############################################
|
|
# Helper functions
|
|
#############################################
|
|
|
|
def get_provider_id_and_options(provider_id):
|
|
if not provider_id:
|
|
provider_id = reverse('a2-auth-saml2-metadata')
|
|
options = metadata_options
|
|
options.update(app_settings.METADATA_OPTIONS)
|
|
return provider_id, options
|
|
|
|
def get_metadata(request, provider_id=None):
|
|
provider_id, options = get_provider_id_and_options(provider_id)
|
|
return get_saml2_metadata(request, provider_id, sp_map=metadata_map,
|
|
options=options)
|
|
|
|
def create_server(request, provider_id=None):
|
|
provider_id, options = get_provider_id_and_options(provider_id)
|
|
return create_saml2_server(request, provider_id, sp_map=metadata_map,
|
|
options=options)
|
|
|
|
def http_response_bad_request(message):
|
|
logger.error(message)
|
|
return HttpResponseBadRequest(_(message))
|
|
|
|
def http_response_forbidden_request(message):
|
|
logger.error(message)
|
|
return HttpResponseForbidden(_(message))
|
|
|
|
def build_service_provider(request):
|
|
server = create_server(request, reverse('a2-auth-saml2-metadata'))
|
|
assert server is not None, 'unable to build a LassoServer object'
|
|
return server
|
|
|
|
def build_login(server):
|
|
login = lasso.Login(server)
|
|
assert login is not None, 'unable to build a LassoLogin object'
|
|
return login
|
|
|
|
def build_login_from_dump(request, dump):
|
|
server = build_service_provider(request)
|
|
login = lasso.Login.newFromDump(server, dump)
|
|
assert login is not None, 'unable to build a LassoLogin object'
|
|
return login
|
|
|
|
def build_logout(request):
|
|
server = build_service_provider(request)
|
|
logout = lasso.Logout(server)
|
|
assert logout is not None, 'unable to build a LassoLogout objects'
|
|
return logout
|
|
|
|
def setAuthnrequestOptions(provider, login, force_authn, is_passive):
|
|
if not provider or not login:
|
|
return None
|
|
|
|
p = get_idp_options_policy(provider)
|
|
if not p:
|
|
return None
|
|
|
|
if p.no_nameid_policy:
|
|
login.request.nameIDPolicy = None
|
|
else:
|
|
login.request.nameIDPolicy.format = \
|
|
NAME_ID_FORMATS[p.requested_name_id_format]['samlv2']
|
|
login.request.nameIDPolicy.allowCreate = p.allow_create
|
|
login.request.nameIDPolicy.spNameQualifier = None
|
|
|
|
if p.enable_binding_for_sso_response:
|
|
login.request.protocolBinding = p.binding_for_sso_response
|
|
|
|
if force_authn is None:
|
|
force_authn = p.want_force_authn_request
|
|
login.request.forceAuthn = force_authn
|
|
|
|
if is_passive is None:
|
|
is_passive = p.want_is_passive_authn_request
|
|
login.request.isPassive = is_passive
|
|
|
|
return p
|
|
|
|
def lasso_error(request, call_name, error):
|
|
logger.error('in %s: %s', call_name, str(error))
|
|
raise error
|
|
|
|
def set_session_expiration(request, login):
|
|
session_not_on_or_after = get_session_not_on_or_after(login.assertion)
|
|
if session_not_on_or_after:
|
|
request.session.set_expiry(session_not_on_or_after)
|
|
logger.debug('session expiration %r', session_not_on_or_after)
|