149 lines
5.2 KiB
Python
149 lines
5.2 KiB
Python
import re
|
|
import time
|
|
import logging
|
|
|
|
from django.template import RequestContext
|
|
from django.contrib.auth import REDIRECT_FIELD_NAME
|
|
from django.utils.translation import ugettext as _
|
|
from django.shortcuts import render_to_response
|
|
from django.contrib import messages
|
|
from django.conf import settings
|
|
|
|
from authentic2.saml.models import (nameid2kwargs, LibertySession,
|
|
LibertyFederation)
|
|
|
|
__redirection_timeout = 1600
|
|
|
|
__root_refererer_re = re.compile('^(https?://[^/]*/?)')
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def error_page(request, message=None, back=None, logger=None,
|
|
default_message=True, timer=False):
|
|
'''View that show a simple error page to the user with a back link.
|
|
|
|
back - url for the back link, if None, return to root of the referer
|
|
or the local root.
|
|
'''
|
|
if logger:
|
|
logger.debug('Showing message %r on an error page' % message)
|
|
else:
|
|
logging.debug('Showing message %r on an error page' % message)
|
|
if back is None:
|
|
referer = request.META.get('HTTP_REFERER')
|
|
if referer:
|
|
root_referer = __root_refererer_re.match(referer)
|
|
if root_referer:
|
|
back = root_referer.group(1)
|
|
if back is None:
|
|
back = '/'
|
|
global __redirection_timeout
|
|
context = RequestContext(request)
|
|
if timer:
|
|
context['redir_timeout'] = __redirection_timeout
|
|
context['next_page'] = back
|
|
display_message = getattr(settings, 'DISPLAY_MESSAGE_ERROR_PAGE', ())
|
|
if default_message and not display_message:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('An error happened. Report this %s to the administrator.') % \
|
|
time.strftime("[%Y-%m-%d %a %H:%M:%S]", time.localtime()))
|
|
elif message:
|
|
messages.add_message(request, messages.ERROR, message)
|
|
return render_to_response('authsaml2/error_authsaml2.html', {'back': back},
|
|
context_instance=context)
|
|
|
|
# Used to register requested url during SAML redirections
|
|
def register_next_target(request, url=None):
|
|
if url:
|
|
next = url
|
|
else:
|
|
next = request.GET.get(REDIRECT_FIELD_NAME)
|
|
if not next:
|
|
next = '/'
|
|
request.session['next'] = next
|
|
logger.debug('saving next target %r', next)
|
|
|
|
def get_registered_url(request):
|
|
if 'next' in request.session:
|
|
return request.session['next']
|
|
return None
|
|
|
|
def register_request_id(request, request_id):
|
|
request.session['saml_request_id'] = request_id
|
|
|
|
# Used for account linking
|
|
def save_federation_temp(request, login, attributes):
|
|
if login and login.identity:
|
|
request.session['identity_dump'] = login.identity.dump()
|
|
request.session['remoteProviderId'] = login.remoteProviderId
|
|
request.session['nameId'] = login.nameIdentifier
|
|
request.session['attributes'] = attributes
|
|
|
|
def load_federation_temp(request, login):
|
|
if 'identity_dump' in request.session:
|
|
login.setIdentityFromDump(request.session['identity_dump'])
|
|
|
|
def save_login_session(request, login):
|
|
try:
|
|
session_index = login.response.assertion[0].authnStatement[0].sessionIndex
|
|
except (AttributeError, IndexError):
|
|
return
|
|
LibertySession.objects.get_or_create(
|
|
django_session_key=request.session.session_key,
|
|
session_index=session_index,
|
|
provider_id=login.remoteProviderId,
|
|
**nameid2kwargs(login.nameIdentifier))
|
|
|
|
def logout_session(session_key, entity_id):
|
|
qs = LibertySession.objects.filter(
|
|
django_session_key=session_key)
|
|
qs = qs.filter(idp__entity_id=entity_id)
|
|
return qs
|
|
|
|
def kill_logout_session(session_key, entity_id):
|
|
qs = logout_session(session_key, entity_id)
|
|
qs.delete()
|
|
|
|
def load_logout_session(logout, session_key, entity_id):
|
|
qs = LibertySession.objects.filter(
|
|
django_session_key=session_key)
|
|
qs = qs.filter(idp__entity_id=entity_id)
|
|
assert qs.exists(), 'no session found for session_key %r and entity_id %r' % \
|
|
(session_key, entity_id)
|
|
logout.setSessionDump(qs.to_session_dump())
|
|
|
|
def has_federation(user, entity_id):
|
|
return LibertyFederation.objects.filter(user=user,
|
|
sp__entity_id=entity_id).exists()
|
|
|
|
def kill_federations(user, entity_id):
|
|
LibertyFederation.objects.filter(user=user,
|
|
sp__entity_id=entity_id).update(user=None)
|
|
|
|
def get_sessions(name_id, session_indexes=None):
|
|
qs = LibertySession.objects.filter(
|
|
**nameid2kwargs(name_id))
|
|
if session_indexes:
|
|
qs.filter(session_index__in=session_indexes)
|
|
return qs
|
|
|
|
def has_sessions(name_id, session_indexes=None):
|
|
return get_sessions(name_id, session_indexes).exists()
|
|
|
|
def get_session_keys(sessions):
|
|
return sessions.values_list('django_session_key', flat=True)
|
|
|
|
def get_django_session_key_for_session_index(session_index):
|
|
ls = LibertySession.objects.get(session_index=session_index)
|
|
return ls.django_session_key
|
|
|
|
def can_do_synchronous_logout(sessions):
|
|
django_session_keys = get_session_keys(sessions)
|
|
# FIXME: we should check every auth and idp for synchronous logout
|
|
if LibertySession.objects.filter(
|
|
django_session__in=django_session_keys,
|
|
idp__isnull=False).exists():
|
|
return False
|
|
return True
|
|
|