This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-saml2/authentic2_auth_saml2/utils.py

149 lines
5.2 KiB
Python

import re
import time
import logging
from django.template import RequestContext
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.utils.translation import ugettext as _
from django.shortcuts import render_to_response
from django.contrib import messages
from django.conf import settings
from authentic2.saml.models import (nameid2kwargs, LibertySession,
LibertyFederation)
__redirection_timeout = 1600
__root_refererer_re = re.compile('^(https?://[^/]*/?)')
logger = logging.getLogger(__name__)
def error_page(request, message=None, back=None, logger=None,
default_message=True, timer=False):
'''View that show a simple error page to the user with a back link.
back - url for the back link, if None, return to root of the referer
or the local root.
'''
if logger:
logger.debug('Showing message %r on an error page' % message)
else:
logging.debug('Showing message %r on an error page' % message)
if back is None:
referer = request.META.get('HTTP_REFERER')
if referer:
root_referer = __root_refererer_re.match(referer)
if root_referer:
back = root_referer.group(1)
if back is None:
back = '/'
global __redirection_timeout
context = RequestContext(request)
if timer:
context['redir_timeout'] = __redirection_timeout
context['next_page'] = back
display_message = getattr(settings, 'DISPLAY_MESSAGE_ERROR_PAGE', ())
if default_message and not display_message:
messages.add_message(request, messages.ERROR,
_('An error happened. Report this %s to the administrator.') % \
time.strftime("[%Y-%m-%d %a %H:%M:%S]", time.localtime()))
elif message:
messages.add_message(request, messages.ERROR, message)
return render_to_response('authsaml2/error_authsaml2.html', {'back': back},
context_instance=context)
# Used to register requested url during SAML redirections
def register_next_target(request, url=None):
if url:
next = url
else:
next = request.GET.get(REDIRECT_FIELD_NAME)
if not next:
next = '/'
request.session['next'] = next
logger.debug('saving next target %r', next)
def get_registered_url(request):
if 'next' in request.session:
return request.session['next']
return None
def register_request_id(request, request_id):
request.session['saml_request_id'] = request_id
# Used for account linking
def save_federation_temp(request, login, attributes):
if login and login.identity:
request.session['identity_dump'] = login.identity.dump()
request.session['remoteProviderId'] = login.remoteProviderId
request.session['nameId'] = login.nameIdentifier
request.session['attributes'] = attributes
def load_federation_temp(request, login):
if 'identity_dump' in request.session:
login.setIdentityFromDump(request.session['identity_dump'])
def save_login_session(request, login):
try:
session_index = login.response.assertion[0].authnStatement[0].sessionIndex
except (AttributeError, IndexError):
return
LibertySession.objects.get_or_create(
django_session_key=request.session.session_key,
session_index=session_index,
provider_id=login.remoteProviderId,
**nameid2kwargs(login.nameIdentifier))
def logout_session(session_key, entity_id):
qs = LibertySession.objects.filter(
django_session_key=session_key)
qs = qs.filter(idp__entity_id=entity_id)
return qs
def kill_logout_session(session_key, entity_id):
qs = logout_session(session_key, entity_id)
qs.delete()
def load_logout_session(logout, session_key, entity_id):
qs = LibertySession.objects.filter(
django_session_key=session_key)
qs = qs.filter(idp__entity_id=entity_id)
assert qs.exists(), 'no session found for session_key %r and entity_id %r' % \
(session_key, entity_id)
logout.setSessionDump(qs.to_session_dump())
def has_federation(user, entity_id):
return LibertyFederation.objects.filter(user=user,
sp__entity_id=entity_id).exists()
def kill_federations(user, entity_id):
LibertyFederation.objects.filter(user=user,
sp__entity_id=entity_id).update(user=None)
def get_sessions(name_id, session_indexes=None):
qs = LibertySession.objects.filter(
**nameid2kwargs(name_id))
if session_indexes:
qs.filter(session_index__in=session_indexes)
return qs
def has_sessions(name_id, session_indexes=None):
return get_sessions(name_id, session_indexes).exists()
def get_session_keys(sessions):
return sessions.values_list('django_session_key', flat=True)
def get_django_session_key_for_session_index(session_index):
ls = LibertySession.objects.get(session_index=session_index)
return ls.django_session_key
def can_do_synchronous_logout(sessions):
django_session_keys = get_session_keys(sessions)
# FIXME: we should check every auth and idp for synchronous logout
if LibertySession.objects.filter(
django_session__in=django_session_keys,
idp__isnull=False).exists():
return False
return True