misc: proxy passive SSO from SAML2 services to OIDC idps (#27135)
Behaviour of the SAML2 when receiving a Passive AuthnRequest and not user is logged is modified. Before an immediate response with StatusCode no-passive was returned. Now if one authenticator with the method passive_login is found, the request is transferred to this authentication source.
This commit is contained in:
parent
9d0d83b0e5
commit
e524c5f94d
|
@ -114,6 +114,7 @@ from authentic2.utils.misc import get_backends as get_idp_backends
|
|||
from authentic2.utils.misc import login_require, make_url
|
||||
from authentic2.utils.service import set_service
|
||||
from authentic2.utils.view_decorators import check_view_restriction, enable_view_restriction
|
||||
from authentic2.views import passive_login
|
||||
|
||||
from . import app_settings
|
||||
|
||||
|
@ -629,17 +630,28 @@ def sso(request):
|
|||
return sso_after_process_request(request, login, nid_format=nid_format)
|
||||
|
||||
|
||||
def make_continue_url(login, nid_format):
|
||||
nonce = login.request.id or get_nonce()
|
||||
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
||||
return make_url(continue_sso, params={NONCE_FIELD_NAME: nonce})
|
||||
|
||||
|
||||
def saml_passive_login(request, login, nid_format):
|
||||
return passive_login(
|
||||
request,
|
||||
next_url=make_continue_url(login, nid_format),
|
||||
login_hint=get_login_hints_extension(login),
|
||||
)
|
||||
|
||||
|
||||
def need_login(request, login, nid_format):
|
||||
"""Redirect to the login page with a nonce parameter to verify later that
|
||||
the login form was submitted
|
||||
"""
|
||||
nonce = login.request.id or get_nonce()
|
||||
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
||||
next_url = make_url(continue_sso, params={NONCE_FIELD_NAME: nonce})
|
||||
logger.debug('redirect to login page with next url %s', next_url)
|
||||
return login_require(
|
||||
request,
|
||||
next_url=next_url,
|
||||
next_url=make_continue_url(login, nid_format),
|
||||
params={NONCE_FIELD_NAME: nonce},
|
||||
login_hint=get_login_hints_extension(login),
|
||||
)
|
||||
|
@ -722,6 +734,7 @@ def continue_sso(request):
|
|||
consent_obtained=consent_obtained,
|
||||
consent_attribute_answer=consent_attribute_answer,
|
||||
nid_format=nid_format,
|
||||
passive_login=False,
|
||||
)
|
||||
|
||||
|
||||
|
@ -769,6 +782,7 @@ def sso_after_process_request(
|
|||
user=None,
|
||||
nid_format='transient',
|
||||
return_profile=False,
|
||||
passive_login=True,
|
||||
):
|
||||
"""Common path for sso and idp_initiated_sso.
|
||||
|
||||
|
@ -802,7 +816,12 @@ def sso_after_process_request(
|
|||
return need_login(request, login, nid_format)
|
||||
|
||||
# No user is authenticated and passive is True, deny request
|
||||
if passive and user.is_anonymous:
|
||||
if passive and not user.is_authenticated:
|
||||
# passive_login is false if caller is continue_sso (after a failed passive login)
|
||||
if passive_login:
|
||||
passive_login_response = saml_passive_login(request, login, nid_format)
|
||||
if passive_login_response is not None:
|
||||
return passive_login_response
|
||||
logger.debug('no user connected and passive request, returning NoPassive')
|
||||
set_saml2_response_responder_status_code(login.response, lasso.SAML2_STATUS_CODE_NO_PASSIVE)
|
||||
return finish_sso(request, login)
|
||||
|
|
|
@ -331,6 +331,38 @@ class EmailChangeVerifyView(TemplateView):
|
|||
email_change_verify = EmailChangeVerifyView.as_view()
|
||||
|
||||
|
||||
def passive_login(request, *, next_url, login_hint=None):
|
||||
'''View to use in IdP backends to implement passive login toward IdPs'''
|
||||
service = get_service(request)
|
||||
authenticators = utils_misc.get_authenticators()
|
||||
|
||||
login_hint = login_hint or {}
|
||||
show_ctx = make_condition_context(request=request, login_hint=login_hint)
|
||||
if service:
|
||||
show_ctx['service_ou_slug'] = service.ou and service.ou.slug
|
||||
show_ctx['service_slug'] = service.slug
|
||||
show_ctx['service'] = service
|
||||
else:
|
||||
show_ctx['service_ou_slug'] = ''
|
||||
show_ctx['service_slug'] = ''
|
||||
show_ctx['service'] = None
|
||||
visible_authenticators = [
|
||||
authenticator
|
||||
for authenticator in authenticators
|
||||
if (authenticator.shown(ctx=show_ctx) and getattr(authenticator, 'passive_login', None))
|
||||
]
|
||||
|
||||
if not visible_authenticators:
|
||||
return None
|
||||
|
||||
unique_authenticator = visible_authenticators[0]
|
||||
return unique_authenticator.passive_login(
|
||||
request,
|
||||
block_id=unique_authenticator.get_identifier(),
|
||||
next_url=next_url,
|
||||
)
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@ensure_csrf_cookie
|
||||
@never_cache
|
||||
|
|
|
@ -237,6 +237,11 @@ class OIDCProvider(BaseAuthenticator):
|
|||
|
||||
return views.oidc_login(request, pk=self.pk, next_url=next_url)
|
||||
|
||||
def passive_login(self, request, block_id, next_url):
|
||||
from . import views
|
||||
|
||||
return views.oidc_login(request, pk=self.pk, next_url=next_url, passive=True)
|
||||
|
||||
def login(self, request, *args, **kwargs):
|
||||
context = kwargs.get('context', {}).copy()
|
||||
context['provider'] = self
|
||||
|
|
|
@ -42,7 +42,7 @@ def make_nonce(state):
|
|||
return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest()
|
||||
|
||||
|
||||
def oidc_login(request, pk, next_url=None, *args, **kwargs):
|
||||
def oidc_login(request, pk, next_url=None, passive=None, *args, **kwargs):
|
||||
provider = get_provider(pk)
|
||||
scopes = set(provider.scopes.split()) | {'openid'}
|
||||
state_id = str(uuid.uuid4())
|
||||
|
@ -58,6 +58,11 @@ def oidc_login(request, pk, next_url=None, *args, **kwargs):
|
|||
}
|
||||
if next_url:
|
||||
state_content['next'] = next_url
|
||||
if passive is True or passive is False:
|
||||
if passive:
|
||||
prompt.add('none')
|
||||
else:
|
||||
prompt.add('login')
|
||||
params = {
|
||||
'client_id': provider.client_id,
|
||||
'scope': ' '.join(scopes),
|
||||
|
|
|
@ -21,6 +21,7 @@ import random
|
|||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
|
@ -45,6 +46,7 @@ from authentic2.utils.misc import last_authentication_event
|
|||
from authentic2_auth_oidc.backends import OIDCBackend
|
||||
from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider
|
||||
from authentic2_auth_oidc.utils import IDToken, IDTokenError, parse_id_token, register_issuer
|
||||
from authentic2_auth_oidc.views import oidc_login
|
||||
|
||||
from . import utils
|
||||
|
||||
|
@ -1394,3 +1396,77 @@ def test_double_link(app, caplog, code, simple_user, oidc_provider_jwkset):
|
|||
warnings = response.pyquery('.warning')
|
||||
assert len(warnings) == 1
|
||||
assert 'Your email is already linked' in warnings.text()
|
||||
|
||||
|
||||
@mock.patch('authentic2_auth_oidc.views.get_provider')
|
||||
def test_oidc_login(get_provider, rf):
|
||||
AUTHORIZE_URL = 'https://op.example.com/authorize'
|
||||
SCOPES = {'profile'}
|
||||
|
||||
provider = OIDCProvider(
|
||||
pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES)
|
||||
)
|
||||
get_provider.return_value = provider
|
||||
|
||||
url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/').url
|
||||
assert url
|
||||
prefix, query = url.split('?', 1)
|
||||
assert prefix == AUTHORIZE_URL
|
||||
qs = dict(urllib.parse.parse_qsl(query))
|
||||
assert qs['client_id'] == '1234'
|
||||
assert qs['nonce']
|
||||
assert qs['state']
|
||||
assert qs['redirect_uri'] == 'https://testserver/accounts/oidc/callback/'
|
||||
assert qs['ui_locales'] == 'en'
|
||||
assert set(qs['scope'].split()) == {'profile', 'openid'}
|
||||
assert 'prompt' not in qs
|
||||
|
||||
# passive
|
||||
url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/', passive=True).url
|
||||
prefix, query = url.split('?', 1)
|
||||
qs = dict(urllib.parse.parse_qsl(query))
|
||||
assert qs['prompt'] == 'none'
|
||||
|
||||
# not passive
|
||||
url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/', passive=False).url
|
||||
prefix, query = url.split('?', 1)
|
||||
qs = dict(urllib.parse.parse_qsl(query))
|
||||
assert qs['prompt'] == 'login'
|
||||
|
||||
|
||||
@mock.patch('authentic2_auth_oidc.views.get_provider')
|
||||
def test_autorun(get_provider, rf):
|
||||
AUTHORIZE_URL = 'https://op.example.com/authorize'
|
||||
SCOPES = {'profile'}
|
||||
|
||||
provider = OIDCProvider(
|
||||
pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES)
|
||||
)
|
||||
get_provider.return_value = provider
|
||||
req = rf.get('/?next=/idp/x/')
|
||||
req.user = mock.Mock()
|
||||
req.user.is_authenticated = False
|
||||
|
||||
url = provider.autorun(req, block_id=1, next_url='/').url
|
||||
_, query = url.split('?', 1)
|
||||
qs = dict(urllib.parse.parse_qsl(query))
|
||||
assert 'prompt' not in qs
|
||||
|
||||
|
||||
@mock.patch('authentic2_auth_oidc.views.get_provider')
|
||||
def test_passive_login(get_provider, rf):
|
||||
AUTHORIZE_URL = 'https://op.example.com/authorize'
|
||||
SCOPES = {'profile'}
|
||||
|
||||
provider = OIDCProvider(
|
||||
pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES)
|
||||
)
|
||||
get_provider.return_value = provider
|
||||
req = rf.get('/?next=/idp/x/')
|
||||
req.user = mock.Mock()
|
||||
req.user.is_authenticated = False
|
||||
|
||||
url = provider.passive_login(req, block_id=1, next_url='/').url
|
||||
_, query = url.split('?', 1)
|
||||
qs = dict(urllib.parse.parse_qsl(query))
|
||||
assert qs['prompt'] == 'none'
|
||||
|
|
|
@ -27,6 +27,7 @@ import lasso
|
|||
import pytest
|
||||
from django.contrib.auth import REDIRECT_FIELD_NAME
|
||||
from django.core.files import File
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.template import Context, Template
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
|
@ -1055,3 +1056,99 @@ def test_sso_view_restriction(app, idp, user, cgu_attribute):
|
|||
scenario.launch_authn_request()
|
||||
scenario.login(user=user)
|
||||
assert scenario.idp_response.location.startswith('/accounts/edit/required/?')
|
||||
|
||||
|
||||
def test_sso_is_passive(app, idp, user, cgu_attribute, caplog):
|
||||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
)
|
||||
scenario.launch_authn_request()
|
||||
|
||||
with pytest.raises(lasso.ProfileStatusNotSuccessError):
|
||||
scenario.handle_post_response()
|
||||
|
||||
assert (
|
||||
scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Responder'
|
||||
)
|
||||
assert (
|
||||
scenario.sp.login.response.status.statusCode.statusCode.value
|
||||
== 'urn:oasis:names:tc:SAML:2.0:status:NoPassive'
|
||||
)
|
||||
|
||||
|
||||
def test_sso_with_authenticator_passive_sso_canceled(app, idp):
|
||||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
)
|
||||
|
||||
authenticator = mock.Mock()
|
||||
authenticator.show.return_value = True
|
||||
|
||||
mock_passive_login = mock.Mock()
|
||||
|
||||
def passive_login(request, block_id, next_url, passive=None):
|
||||
mock_passive_login(request=request, block_id=block_id, next_url=next_url, passive=passive)
|
||||
return HttpResponseRedirect('https://idp.example.com/?passive')
|
||||
|
||||
authenticator.passive_login = passive_login
|
||||
|
||||
with mock.patch('authentic2.utils.misc.get_authenticators', return_value=[authenticator]):
|
||||
scenario.launch_authn_request()
|
||||
|
||||
assert scenario.idp_response.location == 'https://idp.example.com/?passive'
|
||||
assert mock_passive_login.call_args[1]['next_url'].startswith('/idp/saml2/continue?nonce=')
|
||||
|
||||
# check NoPassive status code response after if conitnue is called and still no user is logged in
|
||||
response = app.get(mock_passive_login.call_args[1]['next_url'])
|
||||
scenario.idp_response = response
|
||||
with pytest.raises(lasso.ProfileStatusNotSuccessError):
|
||||
scenario.handle_post_response()
|
||||
assert (
|
||||
scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Responder'
|
||||
)
|
||||
assert (
|
||||
scenario.sp.login.response.status.statusCode.statusCode.value
|
||||
== 'urn:oasis:names:tc:SAML:2.0:status:NoPassive'
|
||||
)
|
||||
|
||||
|
||||
def test_sso_with_authenticator_passive_sso_authenticated(app, idp, user, monkeypatch):
|
||||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
)
|
||||
|
||||
authenticator = mock.Mock()
|
||||
authenticator.show.return_value = True
|
||||
|
||||
mock_passive_login = mock.Mock()
|
||||
|
||||
def passive_login(request, block_id, next_url, passive=None):
|
||||
mock_passive_login(request=request, block_id=block_id, next_url=next_url, passive=passive)
|
||||
return HttpResponseRedirect('https://idp.example.com/?passive')
|
||||
|
||||
authenticator.passive_login = passive_login
|
||||
|
||||
with mock.patch('authentic2.utils.misc.get_authenticators', return_value=[authenticator]):
|
||||
scenario.launch_authn_request()
|
||||
|
||||
assert scenario.idp_response.location == 'https://idp.example.com/?passive'
|
||||
assert mock_passive_login.call_args[1]['next_url'].startswith('/idp/saml2/continue?nonce=')
|
||||
|
||||
# check a successfull response is returned if a user is logged in
|
||||
app.set_user(user.username)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(
|
||||
'django_webtest.backends.WebtestUserBackend.get_saml2_authn_context',
|
||||
mock.Mock(return_value='webtest'),
|
||||
raising=False,
|
||||
)
|
||||
response = app.get(mock_passive_login.call_args[1]['next_url'])
|
||||
scenario.idp_response = response
|
||||
scenario.handle_post_response()
|
||||
assert scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Success'
|
||||
|
|
|
@ -26,6 +26,7 @@ from django.utils.html import escape
|
|||
from authentic2.custom_user.models import DeletedUser, User
|
||||
from authentic2.forms.passwords import PasswordChangeForm, SetPasswordForm
|
||||
from authentic2.models import Attribute
|
||||
from authentic2.views import passive_login
|
||||
|
||||
from .utils import assert_event, get_link_from_mail, login, logout
|
||||
|
||||
|
@ -419,3 +420,23 @@ def test_redirected_views(app):
|
|||
assert (
|
||||
app.get('/accounts/password/reset/confirm/abcd1234/').location == '/password/reset/confirm/abcd1234/'
|
||||
)
|
||||
|
||||
|
||||
def test_passive_login(rf):
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
|
||||
req = rf.get('/')
|
||||
SessionMiddleware(lambda x: None).process_request(req)
|
||||
assert passive_login(req, next_url='/', login_hint={'pop'}) is None
|
||||
|
||||
authenticator1 = mock.Mock()
|
||||
authenticator1.show.return_value = True
|
||||
authenticator1.passive_login.return_value = 'response1'
|
||||
authenticator2 = mock.Mock()
|
||||
authenticator2.show.return_value = True
|
||||
authenticator2.passive_login.return_value = 'response2'
|
||||
|
||||
with mock.patch(
|
||||
'authentic2.utils.misc.get_authenticators', return_value=[authenticator1, authenticator2]
|
||||
):
|
||||
assert passive_login(req, next_url='/', login_hint={'pop'}) == 'response1'
|
||||
|
|
Loading…
Reference in New Issue