misc: make logout work with transient NameID (#69740)
Implementation of transient NameID is special, the transient NameID is ignored and an attribut value is used as the federation key. But in order to producre a proper NameID for the logout request we need the transient NameID value. To work around this problem we add a transient_name_id attribute to the SessionIndex model representing the current SSO session, and we modify the session dump template to use this value instead of UserSAMLIdentifier.name_id if transient_name_id is not None.
This commit is contained in:
parent
7f9602c528
commit
218afde9cd
|
@ -316,6 +316,7 @@ class DefaultAdapter:
|
|||
transient_federation_attribute,
|
||||
)
|
||||
return None
|
||||
saml_attributes['transient_name_id_content'] = name_id
|
||||
else:
|
||||
if self.request:
|
||||
messages.warning(
|
||||
|
@ -459,8 +460,11 @@ class DefaultAdapter:
|
|||
return None
|
||||
|
||||
def _link_user(self, idp, saml_attributes, user):
|
||||
name_id_content = saml_attributes['name_id_content']
|
||||
if saml_attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT:
|
||||
name_id_content = saml_attributes['transient_name_id_content']
|
||||
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
|
||||
name_id=saml_attributes['name_id_content'],
|
||||
name_id=name_id_content,
|
||||
issuer=models_utils.get_issuer(saml_attributes['issuer']),
|
||||
defaults={
|
||||
'user': user,
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
# Generated by Django 2.2.26 on 2022-10-03 15:23
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mellon', '0006_nameid_attributes'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='sessionindex',
|
||||
name='transient_name_id',
|
||||
field=models.TextField(null=True, verbose_name='Transient NameID'),
|
||||
),
|
||||
]
|
|
@ -46,6 +46,7 @@ class UserSAMLIdentifier(models.Model):
|
|||
class SessionIndex(models.Model):
|
||||
session_index = models.TextField(_('SAML SessionIndex'))
|
||||
session_key = models.CharField(_('Django session key'), max_length=40)
|
||||
transient_name_id = models.TextField(verbose_name=_('Transient NameID'), null=True)
|
||||
saml_identifier = models.ForeignKey(
|
||||
verbose_name=_('SAML identifier'), to=UserSAMLIdentifier, on_delete=models.CASCADE
|
||||
)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
<ns0:Session xmlns:ns0="http://www.entrouvert.org/namespaces/lasso/0.0" xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion" Version="2">{% for session_index in session_indexes %}{% with nameid=session_index.saml_identifier %}
|
||||
<ns0:NidAndSessionIndex AssertionID="" ProviderID="{{ nameid.issuer.entity_id }}" SessionIndex="{{ session_index.session_index }}">
|
||||
<ns1:NameID Format="{{ nameid.nid_format }}"{% if nameid.nid_name_qualifier %} NameQualifier="{{ nameid.nid_name_qualifier }}"{% endif %}{% if nameid.nid_sp_name_qualifier %} SPNameQualifier="{{ nameid.nid_sp_name_qualifier }}"{% endif %}{% if nameid.nid_sp_provided_id %} SPProvidedId="{{ nameid.nid_sp_provided_id}}"{% endif %}>{{ nameid.name_id }}</ns1:NameID>
|
||||
<ns1:NameID Format="{{ nameid.nid_format }}"{% if nameid.nid_name_qualifier %} NameQualifier="{{ nameid.nid_name_qualifier }}"{% endif %}{% if nameid.nid_sp_name_qualifier %} SPNameQualifier="{{ nameid.nid_sp_name_qualifier }}"{% endif %}{% if nameid.nid_sp_provided_id %} SPProvidedId="{{ nameid.nid_sp_provided_id}}"{% endif %}>{% firstof session_index.transient_name_id nameid.name_id %}</ns1:NameID>
|
||||
</ns0:NidAndSessionIndex>{% endwith %}{% endfor %}
|
||||
</ns0:Session>
|
||||
|
|
|
@ -355,6 +355,10 @@ class LoginView(ProfileMixin, LogMixin, View):
|
|||
saml_identifier=user.saml_identifier,
|
||||
session_key=self.request.session.session_key,
|
||||
session_index=session_index,
|
||||
# keep transient nameid to be able to produce logout requests
|
||||
transient_name_id=attributes['name_id_content']
|
||||
if attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
|
||||
else None,
|
||||
)
|
||||
self.log.info('user %s (NameID is %r) logged in using SAML', user, attributes['name_id_content'])
|
||||
self.request.session['mellon_session'] = utils.flatten_datetime(attributes)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import base64
|
||||
import datetime
|
||||
import logging
|
||||
import re
|
||||
import urllib.parse as urlparse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
@ -33,6 +34,7 @@ from httmock import HTTMock, all_requests
|
|||
from httmock import response as mock_response
|
||||
from pytest import fixture
|
||||
|
||||
from mellon import models
|
||||
from mellon.utils import create_metadata
|
||||
from mellon.views import lasso_decode
|
||||
|
||||
|
@ -82,7 +84,7 @@ class MockIdp:
|
|||
session_dump = None
|
||||
identity_dump = None
|
||||
|
||||
def __init__(self, idp_metadata, private_key, sp_metadata):
|
||||
def __init__(self, idp_metadata, private_key, sp_metadata, name_id=None):
|
||||
self.server = server = lasso.Server.newFromBuffers(idp_metadata, private_key)
|
||||
self.server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
|
||||
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, sp_metadata)
|
||||
|
@ -90,7 +92,7 @@ class MockIdp:
|
|||
def reset_session_dump(self):
|
||||
self.session_dump = None
|
||||
|
||||
def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None):
|
||||
def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None, name_id=None):
|
||||
login = self.login = lasso.Login(self.server)
|
||||
if self.identity_dump:
|
||||
login.setIdentityFromDump(self.identity_dump)
|
||||
|
@ -121,6 +123,8 @@ class MockIdp:
|
|||
datetime.datetime.now().isoformat(),
|
||||
datetime.datetime.now().isoformat(),
|
||||
)
|
||||
for key in name_id or {}:
|
||||
setattr(login.assertion.subject.nameID, key, name_id[key])
|
||||
|
||||
def add_attribute(name, *values, **kwargs):
|
||||
fmt = kwargs.get('fmt', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC)
|
||||
|
@ -833,3 +837,30 @@ def test_force_authn(db, app, idp, caplog, sp_settings):
|
|||
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert app.session['mellon_session']['force_authn']
|
||||
|
||||
|
||||
def test_sso_slo_transient_name_identifier(db, app, idp, caplog, sp_settings):
|
||||
caplog.set_level(logging.WARNING)
|
||||
sp_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
|
||||
response = app.get('/login/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(
|
||||
response['Location'],
|
||||
name_id={
|
||||
'format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT,
|
||||
'content': '1234',
|
||||
},
|
||||
)
|
||||
response = app.post('/login/', params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
|
||||
usi = models.UserSAMLIdentifier.objects.get()
|
||||
assert usi.name_id == 'john.doe@gmail.com'
|
||||
session_index = models.SessionIndex.objects.get(saml_identifier=usi)
|
||||
assert session_index.transient_name_id == '1234'
|
||||
|
||||
response = app.get('/logout/')
|
||||
assert urlparse.urlparse(response['Location']).path == '/singleLogout'
|
||||
url = idp.process_logout_request_redirect(response.location)
|
||||
caplog.clear()
|
||||
response = app.get(url)
|
||||
assert len(caplog.records) == 0, 'logout failed'
|
||||
assert response.location == '/'
|
||||
|
|
Loading…
Reference in New Issue