[idp] remove unused imports, remove * imports, remove long lines

This commit is contained in:
Benjamin Dauvergne 2011-04-27 11:49:21 +02:00
parent 4314565ee6
commit 63f5b675f5
11 changed files with 168 additions and 144 deletions

View File

@ -1,4 +1,3 @@
import urllib2
import logging
import random
import datetime
@ -9,16 +8,23 @@ from django.http import HttpResponseRedirect, HttpResponseBadRequest, \
from django.core.urlresolvers import reverse
from django.contrib.auth.views import redirect_to_login, logout
from django.utils.http import urlquote, urlencode
from django.conf.urls.defaults import *
from django.conf.urls.defaults import patterns, url
from django.conf import settings
from constants import *
from models import CasTicket
from authentic2.auth2_auth.views import redirect_to_login as auth2_redirect_to_login
from models import CasTicket
from authentic2.auth2_auth.views import redirect_to_login as \
auth2_redirect_to_login
import authentic2.auth2_auth.models as auth2_auth_models
from constants import SERVICE_PARAM, RENEW_PARAM, GATEWAY_PARAM, ID_PARAM, \
CANCEL_PARAM, SERVICE_TICKET_PREFIX, TICKET_PARAM, \
CAS10_VALIDATION_FAILURE, CAS10_VALIDATION_SUCCESS, PGT_URL_PARAM, \
INVALID_REQUEST_ERROR, INVALID_TICKET_ERROR, INVALID_SERVICE_ERROR, \
INTERNAL_ERROR, CAS20_VALIDATION_FAILURE, CAS20_VALIDATION_SUCCESS
logger = logging.getLogger('authentic2.idp.idp_cas')
ALPHABET = string.letters+string.digits+'-'
class CasProvider(object):
def get_url(self):
return patterns('cas',
@ -30,7 +36,8 @@ class CasProvider(object):
url = property(get_url)
def make_id(self, prefix='', length=29):
content = ( random.SystemRandom().choice(string.letters+string.digits+'-') for x in range(length-len(prefix)) )
l = length-len(prefix)
content = ( random.SystemRandom().choice(ALPHABET) for x in range(l) )
return prefix + ''.join(content)
def create_service_ticket(self, service, renew=False, validity=True,
@ -118,7 +125,8 @@ renew:%s and gateway:%s' % (service, renew, gateway))
return self.handle_login_after_authentication(request, st)
def cas_failure(self, request, st, reason):
logger.debug('%s, redirecting without ticket to %r' % (reason, service))
logger.debug('%s, redirecting without ticket to %r' % (reason, \
st.service))
st.delete()
return HttpResponseRedirect(st.service)
@ -175,12 +183,14 @@ renew:%s and gateway:%s' % (service, renew, gateway))
def handle_login_after_authentication(self, request, st):
if not st.valid():
return self.cas_failure(request, st, 'service ticket id is not valid')
return self.cas_failure(request, st,
'service ticket id is not valid')
else:
return self.return_ticket(request, st)
def return_ticket(self, request, st):
return HttpResponseRedirect('%s?ticket=%s' % (st.service, st.ticket_id))
return HttpResponseRedirect('%s?ticket=%s' % (st.service,
st.ticket_id))
def validate(self, request):
if request.method != 'GET':
@ -243,9 +253,10 @@ renew:%s and gateway:%s' % (service, renew, gateway))
if st.service != service:
return self.cas20_error(request, INVALID_SERVICE_ERROR)
if pgt_url:
raise NotImplementedError('CAS 2.0 pgtUrl parameter is not handled')
raise NotImplementedError(
'CAS 2.0 pgtUrl parameter is not handled')
return HttpResponse(CAS20_VALIDATION_SUCCESS % st.user)
except Exception, e:
except Exception:
return self.cas20_error(INTERNAL_ERROR)
def logout(self, request):
@ -254,13 +265,14 @@ renew:%s and gateway:%s' % (service, renew, gateway))
class Authentic2CasProvider(CasProvider):
def authenticate(self, request, st, passive=False):
next = '%s?id=%s' % (reverse(self.continue_cas), urlquote(st.ticket_id))
next = '%s?id=%s' % (reverse(self.continue_cas),
urlquote(st.ticket_id))
if passive:
if getattr(settings, 'AUTH_SSL', False):
query = { 'next': next,
'nonce': st.ticket_id }
return HttpResponseRedirect('%s?%s' % (reverse('user_signin_ssl'),
urlencode(query)))
return HttpResponseRedirect('%s?%s' %
(reverse('user_signin_ssl'), urlencode(query)))
else:
return self.cas_failure(request, st,
'''user needs to login and no passive authentication \

View File

@ -7,9 +7,7 @@ import calendar
import openid.association
import openid.store.nonce
from django.utils.translation import ugettext_lazy as _
from django.db import models
from django.contrib.auth.models import User
from authentic2.saml.fields import PickledObjectField
@ -26,10 +24,13 @@ class Association(models.Model):
handle = models.CharField(max_length=255, blank=False)
secret = PickledObjectField()
issued = models.DateTimeField(auto_now_add=True,
verbose_name="Issue time for this association, as seconds since EPOCH")
verbose_name="Issue time for this association, as seconds \
since EPOCH")
lifetime = models.IntegerField(
verbose_name="Lifetime of this association as seconds since the issued time")
expire = models.DateTimeField("After this time, the association will be expired")
verbose_name="Lifetime of this association as seconds since \
the issued time")
expire = models.DateTimeField("After this time, the association will \
be expired")
assoc_type = models.CharField(max_length=64, blank=False)
class Meta:
@ -106,7 +107,8 @@ class Nonce(models.Model):
if timestamp > now or timestamp + openid.store.nonce.SKEW < now:
return False
n, created = cls.objects.get_or_create(server_url=server_url, salt=salt)
n, created = cls.objects.get_or_create(server_url=server_url,
salt=salt)
if created:
n.timestamp = timestamp
n.save()

View File

@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
# vim: set ts=4 sw=4 : */
from django.conf.urls.defaults import *
from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('authentic2.idp.idp_openid.views',
url(r'^$', 'openid_server', name='openid-provider-root'),
url(r'^decide/$', 'openid_decide', name='openid-provider-decide'),
url(r'^xrds/$', 'openid_xrds', name='openid-provider-xrds'),
url(r'^(?P<id>.+)/xrds$', 'openid_xrds', {'identity': True}, name='openid-provider-identity-xrds'),
url(r'^(?P<id>.+)/?$', 'openid_discovery', name='openid-provider-identity'),
url(r'^(?P<id>.+)/xrds$', 'openid_xrds', {'identity': True},
name='openid-provider-identity-xrds'),
url(r'^(?P<id>.+)/?$', 'openid_discovery',
name='openid-provider-identity'),
)

View File

@ -3,7 +3,6 @@
# some code from http://www.djangosnippets.org/snippets/310/ by simon
# and from examples/djopenid from python-openid-2.2.4
from openid.extensions import sreg
import openid.server
from django.core.exceptions import ImproperlyConfigured
from django.utils.importlib import import_module
@ -26,15 +25,6 @@ def import_module_attr(path):
package, module = path.rsplit('.', 1)
return getattr(import_module(package), module)
def add_sreg_data(request, orequest, oresponse):
sreg_req = sreg.SRegRequest.fromOpenIDRequest(orequest)
sreg_resp = sreg.SRegResponse.extractResponse(sreg_req, {
'email': request.user.email,
'nickname': request.user.username,
'fullname': request.user.get_full_name(),
})
oresponse.addExtension(sreg_resp)
def get_store(request):
try:
store_class = import_module_attr(conf.STORE)

View File

@ -7,20 +7,17 @@ import logging
import hashlib
import urlparse
from django.conf import settings
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseRedirect
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext as _
from django.utils.http import urlquote
try:
from django.views.decorators.csrf import csrf_exempt
except ImportError:
from django.contrib.csrf.middleware import csrf_exempt
import django.forms as forms
from django.contrib.auth import REDIRECT_FIELD_NAME
from openid.consumer.discover import OPENID_IDP_2_0_TYPE, \
OPENID_2_0_TYPE, OPENID_1_0_TYPE, OPENID_1_1_TYPE
@ -33,12 +30,21 @@ from openid.message import IDENTIFIER_SELECT
from openid.extensions.sreg import ns_uri as SREG_TYPE, SRegRequest, \
SRegResponse, data_fields
from utils import add_sreg_data, get_store, oresponse_to_response
from utils import get_store, oresponse_to_response
from authentic2.auth2_auth.views import redirect_to_login
import models
logger = logging.getLogger('authentic.idp.idp_openid')
def check_exploded(exploded, request):
username = request.user.username
return not exploded.path.startswith(request.path) \
or exploded.path[len(request.path):].strip('/') != username \
or exploded.params \
or exploded.query \
or exploded.fragment
@csrf_exempt
def openid_server(request):
"""
@ -46,7 +52,8 @@ def openid_server(request):
the <link rel="openid.server"> tag.
"""
server = Server(get_store(request),
op_endpoint=request.build_absolute_uri(reverse('openid-provider-root')))
op_endpoint=request.build_absolute_uri(
reverse('openid-provider-root')))
# Cancellation
if 'cancel' in request.REQUEST:
@ -86,38 +93,41 @@ def openid_server(request):
if not request.user.is_authenticated():
# Site does not want interaction
if orequest.immediate:
logger.debug('User not logged and checkid immediate request, returning OpenID failure')
logger.debug('User not logged and checkid immediate request, \
returning OpenID failure')
return oresponse_to_response(server, orequest.answer(False))
else:
# Try to login
request.session['OPENID_REQUEST'] = orequest
logger.debug('User not logged and checkid request, redirecting to login page')
logger.debug('User not logged and checkid request, \
redirecting to login page')
return redirect_to_login(request, nonce='1')
else:
identity = orequest.identity
if identity != IDENTIFIER_SELECT:
exploded = urlparse.urlparse(identity)
# Allows only /openid/<user_id>
if not exploded.path.startswith(request.path) \
or not exploded.path[len(request.path):].strip('/') == request.user.username \
or exploded.params \
or exploded.query \
or exploded.fragment :
if check_exploded(exploded, request):
# We only support directed identity
logger.debug('Invalid OpenID identity %s' % identity)
return oresponse_to_response(server, orequest.answer(False))
try:
trusted_root = models.TrustedRoot.objects.get(user=request.user.id,
trust_root=orequest.trust_root)
trusted_root = models.TrustedRoot.objects.get(
user=request.user.id, trust_root=orequest.trust_root)
# Check the choices are sufficient
if not set(sreg_request.required).issubset(set(trusted_root.choices)):
if not set(sreg_request.required)\
.issubset(set(trusted_root.choices)):
# Current assertion is not sufficent, ask again !
if orequest.immediate:
logger.debug('Attributes authorization unsufficient and checkid immediate, returning OpenID failure')
return oresponse_to_response(server, orequest.answer(False))
logger.debug('Attributes authorization unsufficient \
and checkid immediate, returning OpenID failure')
return oresponse_to_response(server,
orequest.answer(False))
request.session['OPENID_REQUEST'] = orequest
logger.debug('Attributes authorization unsufficient for %s, redirecting to consent page' % orequest.trust_root)
return HttpResponseRedirect(reverse('openid-provider-decide'))
logger.debug('Attributes authorization unsufficient \
for %s, redirecting to consent page' % orequest.trust_root)
return HttpResponseRedirect(
reverse('openid-provider-decide'))
user_data = {}
for field in trusted_root.choices:
if field == 'email':
@ -126,40 +136,48 @@ def openid_server(request):
user_data[field] = '%s %s' % (request.user.first_name,
request.user.last_name)
elif field == 'nickname':
user_data[field] = getattr(request.user, 'username', '')
user_data[field] = getattr(request.user, 'username',
'')
else:
logger.debug('Could not provide SReg field %s' % field)
except models.TrustedRoot.MultipleObjectsReturned:
# Too much trustedroots remove
models.TrustedRoot.objects.filter(user=request.user.id,
trust_root=orequest.trust_root).delete()
trust_root=orequest.trust_root).delete()
# RP does not want any interaction
if orequest.immediate:
logger.warning('Too much trusted root records and checkid immediate, returning OpenID failure')
return oresponse_to_response(server, orequest.answer(False))
logger.warning('Too much trusted root records and \
checkid immediate, returning OpenID failure')
return oresponse_to_response(server,
orequest.answer(False))
request.session['OPENID_REQUEST'] = orequest
logger.info('Too much trusted root for %s, redirecting to consent page' % orequest.trust_root)
logger.info('Too much trusted root for %s, redirecting to \
consent page' % orequest.trust_root)
return HttpResponseRedirect(reverse('openid-provider-decide'))
except models.TrustedRoot.DoesNotExist:
# RP does not want any interaction
if orequest.immediate:
logger.info('Trusted root unknown and checkid immediate, returning OpenID failure')
return oresponse_to_response(server, orequest.answer(False))
logger.info('Trusted root unknown and checkid \
immediate, returning OpenID failure')
return oresponse_to_response(server,
orequest.answer(False))
request.session['OPENID_REQUEST'] = orequest
logger.info('Trusted root %s unknown, redirecting to consent page' % orequest.trust_root)
logger.info('Trusted root %s unknown, redirecting to \
consent page' % orequest.trust_root)
return HttpResponseRedirect(reverse('openid-provider-decide'))
# Create a directed identity if needed
if identity == IDENTIFIER_SELECT:
hash = hashlib.sha1(str(request.user.id)+'|'+orequest.trust_root).hexdigest()
hash = hashlib.sha1(str(request.user.id)+'|'+orequest.trust_root) \
.hexdigest()
claimed_id = request.build_absolute_uri(
reverse('openid-provider-identity', args=[hash]))
logger.info('Giving directed identity %r to trusted root %r with sreg data %s' %
(claimed_id, orequest.trust_root, user_data))
logger.info('Giving directed identity %r to trusted root %r \
with sreg data %s' % (claimed_id, orequest.trust_root, user_data))
else:
claimed_id = identity
logger.info('Giving claimed identity %r to trusted root %r with sreg data %s' %
(claimed_id, orequest.trust_root, user_data))
logger.info('Giving claimed identity %r to trusted root %r \
with sreg data %s' % (claimed_id, orequest.trust_root, user_data))
oresponse = orequest.answer(True, identity=claimed_id)
sreg_response = SRegResponse.extractResponse(sreg_request, user_data)
@ -190,8 +208,8 @@ class DecideForm(forms.Form):
def __init__(self, sreg_request=None, *args, **kwargs):
super(DecideForm, self).__init__(*args, **kwargs)
for field in sreg_request.optional:
self.fields[str(field)] = forms.BooleanField(label=data_fields[str(field)],
required=False)
self.fields[str(field)] = forms.BooleanField(
label=data_fields[str(field)], required=False)
logger.info('3SREG request: %s' % self.fields)
def openid_decide(request):
@ -204,13 +222,15 @@ def openid_decide(request):
orequest = request.session.get('OPENID_REQUEST')
# No request ? Failure..
if not orequest:
logger.warning('OpenID decide view failed, because no OpenID request is saved')
logger.warning('OpenID decide view failed, \
because no OpenID request is saved')
return HttpResponseRedirect('/')
sreg_request = SRegRequest.fromOpenIDRequest(orequest)
logger.debug('SREG request: %s' % sreg_request.__dict__)
if not request.user.is_authenticated():
# Not authenticated ? Authenticate and go back to the server endpoint
return redirect_to_login(request, next=reverse(openid_server), nonce='1')
return redirect_to_login(request, next=reverse(openid_server),
nonce='1')
if request.method == 'POST':
if 'cancel' in request.POST:
@ -222,8 +242,8 @@ def openid_decide(request):
if form.is_valid():
data = form.cleaned_data
# Remember the choice
t, created = models.TrustedRoot.objects.get_or_create(user=request.user.id,
trust_root=orequest.trust_root)
t, created = models.TrustedRoot.objects.get_or_create(
user=request.user.id, trust_root=orequest.trust_root)
t.choices = sreg_request.required \
+ [ field for field in data if data[field] ]
t.save()
@ -235,7 +255,8 @@ def openid_decide(request):
# verify return_to of trust_root
try:
trust_root_valid = verifyReturnTo(orequest.trust_root, orequest.return_to) and "Valid" or "Invalid"
trust_root_valid = verifyReturnTo(orequest.trust_root,
orequest.return_to) and "Valid" or "Invalid"
except HTTPFetchingError:
trust_root_valid = "Unreachable"
except DiscoveryFailure:

View File

@ -1,5 +1,5 @@
from django.contrib.auth.decorators import login_required
from django.http import Http404, HttpResponseRedirect
from django.http import HttpResponseRedirect
from django.template import RequestContext
from django.shortcuts import render_to_response

View File

@ -1,4 +1,3 @@
import urllib
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext as _
@ -8,7 +7,8 @@ import authentic2.idp.saml.saml2_endpoints as saml2_endpoints
class SamlBackend(object):
def service_list(self, request):
q = models.LibertyServiceProvider.objects.filter(enabled = True, idp_initiated_sso = True)
q = models.LibertyServiceProvider.objects.filter(enabled = True,
idp_initiated_sso = True)
list = []
for service_provider in q:
liberty_provider = service_provider.liberty_provider
@ -42,7 +42,7 @@ class SamlBackend(object):
except models.LibertyProvider.DoesNotExist:
pass
code = '<div>'
code += _('Sending logout to %(pid)s....') % { 'pid': pid }
code += _('Sending logout to %(name)s....') % { 'name': name}
code += '<iframe src="%s" marginwidth="0" marginheight="0" \
scrolling="no" style="border: none" width="16" height="16"></iframe></div>' % \
reverse(saml2_endpoints.idp_slo, args=[pid])

View File

@ -1,14 +1,13 @@
import logging
from django.shortcuts import render_to_response
from django.contrib.auth import REDIRECT_FIELD_NAME, SESSION_KEY
from django.utils.http import *
from django.utils.html import escape
from django.utils.http import urlencode
from django.utils.importlib import import_module
from django.conf import settings
from django.http import *
from django.http import HttpResponseRedirect
def redirect_to_login(next, login_url=None, redirect_field_name=REDIRECT_FIELD_NAME, other_keys = {}):
def redirect_to_login(next, login_url=None,
redirect_field_name=REDIRECT_FIELD_NAME, other_keys = {}):
"Redirects the user to the login page, passing the given 'next' page"
if not login_url:
login_url = settings.LOGIN_URL
@ -28,7 +27,8 @@ def kill_django_sessions(session_key):
try:
for key in session_key:
store = engine.SessionStore(key)
logging.debug('Killing session %s of user %s' % (key, store[SESSION_KEY]))
logging.debug('Killing session %s of user %s' %
(key, store[SESSION_KEY]))
store.delete()
except Exception, e:
logging.error(e)

View File

@ -4,15 +4,19 @@ import urllib
import lasso
from django.contrib.auth.views import redirect_to_login
from django.conf.urls.defaults import *
from django.http import *
from django.conf.urls.defaults import patterns
from django.http import HttpResponse, HttpResponseForbidden, \
HttpResponseRedirect
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from authentic2.saml.models import *
from authentic2.saml.common import *
from authentic2.idp.interactions import consent_federation
from authentic2.saml.models import LibertyArtifact
from authentic2.saml.common import get_idff12_metadata, create_idff12_server, \
load_provider, load_federation, load_session, save_federation, \
save_session, return_idff12_response, get_idff12_request_message, \
get_soap_message, return_saml_soap_response
def fill_assertion(request, saml_request, assertion, provider_id):
'''Stuff an assertion with information extracted from the user record

View File

@ -1,23 +1,40 @@
import datetime
import logging
import urllib
import lasso
from django.conf.urls.defaults import *
from django.conf.urls.defaults import patterns
from django.core.urlresolvers import reverse
from django.contrib.auth.decorators import login_required
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.http import *
from django.shortcuts import render_to_response
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpResponse, HttpResponseRedirect, HttpResponseForbidden, HttpResponseBadRequest, Http404
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt, csrf_protect
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth import BACKEND_SESSION_KEY
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth import User
from django.conf import settings
import authentic2.idp as idp
import authentic2.idp.views as idp_views
from authentic2.saml.models import *
from authentic2.saml.common import *
from authentic2.saml.models import LibertyAssertion, LibertyArtifact, LibertySession, LibertyFederation
from authentic2.saml.common import redirect_next, asynchronous_bindings, \
soap_bindings, nameid2kwargs, load_provider, get_saml2_request_message, \
error_page, set_saml2_response_responder_status_code, \
saml2_urn_to_nidformat, nidformat_to_saml2_urn, save_key_values, \
AUTHENTIC_STATUS_CODE_MISSING_DESTINATION, \
get_and_delete_key_values, load_federation, load_session, \
return_saml2_response, save_federation, save_session, \
get_soap_message, soap_fault, return_saml_soap_response, \
AUTHENTIC_STATUS_CODE_UNKNOWN_PROVIDER, \
AUTHENTIC_STATUS_CODE_MISSING_NAMEID, \
AUTHENTIC_STATUS_CODE_MISSING_SESSION_INDEX, \
AUTHENTIC_STATUS_CODE_UNKNOWN_SESSION, \
AUTHENTIC_STATUS_CODE_INTERNAL_SERVER_ERROR, \
send_soap_request, get_saml2_query_request, \
get_saml2_request_message_async_binding, create_saml2_server, \
get_saml2_metadata
import authentic2.saml.saml2utils as saml2utils
from authentic2.auth2_auth.models import AuthenticationEvent
from common import redirect_to_login, kill_django_sessions
@ -25,9 +42,9 @@ from authentic2.auth2_auth import NONCE_FIELD_NAME
from authentic2.idp.interactions import consent_federation, consent_attributes
from authentic2.idp import signals as idp_signals
from authentic2.idp.models import *
# from authentic2.idp.models import *
from authentic2.authsaml2.models import *
from authentic2.authsaml2.models import SAML2TransientUser
'''SAMLv2 IdP implementation
@ -174,7 +191,7 @@ def build_assertion(request, login, nid_format = 'transient', attributes=None):
ssl = request.environ.has_key('HTTPS')
if __user_backend_from_session:
backend = request.session[BACKEND_SESSION_KEY]
logger.debug("build_assertion: backend from session %s" %backend)
logger.debug("build_assertion: authentication from session %s" %backend)
if backend in ('django.contrib.auth.backends.ModelBackend',
'authentic2.idp.auth_backends.LogginBackend',
'django_auth_ldap.backend.LDAPBackend'):
@ -192,9 +209,9 @@ def build_assertion(request, login, nid_format = 'transient', attributes=None):
else:
raise Exception('unknown backend: ' + backend)
else:
logger.debug("build_assertion: backend from stored event %s" %sbackend)
try:
auth_event = AuthenticationEvent.objects.get(nonce = login.request.id)
logger.debug("build_assertion: authentication from stored event %s" % auth_event)
if auth_event.how == 'password':
authn_context = lasso.SAML2_AUTHN_CONTEXT_PASSWORD
elif auth_event.how == 'password-on-https':
@ -204,7 +221,6 @@ def build_assertion(request, login, nid_format = 'transient', attributes=None):
else:
raise NotImplementedError('Unknown authentication method %r' % auth_event.how)
except ObjectDoesNotExist:
logger.debug("build_assertion: backend from stored event not found %s" %backend)
# TODO: previous session over secure transport (ssl) ?
authn_context = lasso.SAML2_AUTHN_CONTEXT_PREVIOUS_SESSION
logger.info("build_assertion: authn_context %s" %authn_context)
@ -270,9 +286,9 @@ def sso(request):
'WebSSO profile with HTTP-Redirect binding: %r') % message)
except lasso.ProfileInvalidProtocolprofileError:
log_info_authn_request_details(login)
message = N_('SAMLv2 Single Sign On: the request cannot be answered because no valid protocol binding could be found')
message = _('SAMLv2 Single Sign On: the request cannot be answered because no valid protocol binding could be found')
logger.error('sso: the request cannot be answered because no valid protocol binding could be found')
return HttpResponseBadRequest(_(message))
return HttpResponseBadRequest(message)
except lasso.DsError, e:
log_info_authn_request_details(login)
logger.error('sso: digital signature treatment error: %s' % e)
@ -730,7 +746,7 @@ def finish_slo(request):
all_sessions = LibertySession.objects.filter(django_session_key=session_key)
if all_sessions.exists():
all_sessions.delete()
return return_logout_error(logout, lasso.SAML2_STATUS_CODE_PARTIAL_LOGOUT)
return return_logout_error(request, logout, lasso.SAML2_STATUS_CODE_PARTIAL_LOGOUT)
try:
logout.buildResponseMsg()
except:
@ -738,7 +754,7 @@ def finish_slo(request):
raise NotImplementedError()
return return_saml2_response(request, logout)
def return_logout_error(logout, error):
def return_logout_error(request, logout, error):
logout.buildResponseMsg()
set_saml2_response_responder_status_code(logout.response, error)
# Hack because response is not initialized before
@ -757,25 +773,25 @@ def process_logout_request(request, message, binding):
try:
try:
logout.processRequestMsg(message)
except (lasso.ServerProviderNotFoundError, lasso.ProfileUnknownProviderError), e:
except (lasso.ServerProviderNotFoundError, lasso.ProfileUnknownProviderError):
logger.debug('process_logout_request: loading provider %s' %logout.remoteProviderId)
p = load_provider(request, logout.remoteProviderId,
server=logout.server)
if not p:
logger.error('process_logout_request: slo unknown provider %s' %logout.remoteProviderId)
return logout, return_logout_error(logout,
return logout, return_logout_error(request, logout,
AUTHENTIC_STATUS_CODE_UNKNOWN_PROVIDER)
logout.processRequestMsg(message)
except lasso.DsError, e:
except lasso.DsError:
logger.error('process_logout_request: slo signature error on request %s' % message)
return logout, return_logout_error(logout,
return logout, return_logout_error(request, logout,
lasso.LIB_STATUS_CODE_INVALID_SIGNATURE)
except Exception, e:
except Exception:
logger.error('process_logout_request: slo unknown error when processing a request %s' % message)
return logout, HttpResponseBadRequest('Invalid logout request')
if binding != 'SOAP' and not check_destination(request, logout.request):
logger.error('process_logout_request: slo wrong or absent destination')
return logout, return_logout_error(AUTHENTIC_STATUS_CODE_MISSING_DESTINATION)
return logout, return_logout_error(request, logout, AUTHENTIC_STATUS_CODE_MISSING_DESTINATION)
return logout, None
def log_logout_request(logout):
@ -787,12 +803,12 @@ def log_logout_request(logout):
def validate_logout_request(request, logout, idp=True):
if not isinstance(logout.request.nameId, lasso.Saml2NameID):
logger.error('validate_logout_request: slo request lacks a NameID')
return return_logout_error(logout,
return return_logout_error(request, logout,
AUTHENTIC_STATUS_CODE_MISSING_NAMEID)
# only idp have the right to send logout request without session indexes
if not logout.request.sessionIndexes and idp:
logger.error('validate_logout_request: slo request lacks SessionIndex')
return return_logout_error(logout,
return return_logout_error(request, logout,
AUTHENTIC_STATUS_CODE_MISSING_SESSION_INDEX)
logger.info('validate_logout_request: valid logout request')
log_logout_request(logout)
@ -808,7 +824,7 @@ def logout_synchronous_other_backends(request, logout, django_sessions_keys):
for backend in backends:
ok = ok and backends.can_synchronous_logout(django_sessions_keys)
if not ok:
return return_logout_error(logout,
return return_logout_error(request, logout,
lasso.SAML2_STATUS_CODE_UNSUPPORTED_BINDING)
logger.info('logout_synchronous_other_backends: treatments ended')
return None
@ -875,7 +891,7 @@ def slo_soap(request):
logout.request.sessionIndexes, logout.remoteProviderId)
if not found:
logger.debug('slo_soap: no session found, return logout error')
return return_logout_error(logout, AUTHENTIC_STATUS_CODE_UNKNOWN_SESSION)
return return_logout_error(request, logout, AUTHENTIC_STATUS_CODE_UNKNOWN_SESSION)
for lib_session in lib_sessions:
p = load_provider(request, lib_session.provider_id,
server=logout.provider)
@ -886,12 +902,12 @@ def slo_soap(request):
set_session_dump_from_liberty_sessions(logout, found[0:1] + lib_sessions)
try:
logout.validateRequest()
except lasso.LogoutUnsupportedProfileError, e:
except lasso.LogoutUnsupportedProfileError:
logger.error('slo_soap: slo cannot do SOAP logout, one provider does \
not support it %s' % [ s.provider_id for s in lib_sessions])
logout.buildResponseMsg()
return return_saml2_response(request, logout)
except Exception, e:
except Exception:
logger.exception('slo_soap: slo, unknown error')
logout.buildResponseMsg()
return return_saml2_response(request, logout)
@ -956,7 +972,8 @@ def slo(request):
if not remote_provider_sessions.exists():
logger.error('slo: slo refused, since no session exists with the \
requesting provider')
return return_logout_error(logout, AUTHENTIC_STATUS_CODE_UNKNOWN_SESSION)
return return_logout_error(request, logout,
AUTHENTIC_STATUS_CODE_UNKNOWN_SESSION)
# Load session dump for the requesting provider
last_session = remote_provider_sessions.latest('creation')
set_session_dump_from_liberty_sessions(logout, [last_session])
@ -964,7 +981,7 @@ def slo(request):
logout.validateRequest()
except:
logger.exception('slo: slo error')
return return_logout_error(logout,
return return_logout_error(request, logout,
AUTHENTIC_STATUS_CODE_INTERNAL_SERVER_ERROR)
# Now clean sessions for this provider
LibertySession.objects.filter(provider_id=logout.remoteProviderId,

View File

@ -1,23 +0,0 @@
"""
This file demonstrates two different styles of tests (one doctest and one
unittest). These will both pass when you run "manage.py test".
Replace these with more appropriate tests for your application.
"""
from django.test import TestCase
class SimpleTest(TestCase):
def test_basic_addition(self):
"""
Tests that 1 + 1 always equals 2.
"""
self.failUnlessEqual(1 + 1, 2)
__test__ = {"doctest": """
Another way to test that 1 + 1 is equal to 2.
>>> 1 + 1 == 2
True
"""}