qommon/saml2: on artifact resolution error, show an error page with a retry button (#53362)
This commit is contained in:
parent
4c4a5e228f
commit
59f45ab29c
|
@ -10,6 +10,7 @@ try:
|
|||
except ImportError:
|
||||
lasso = None
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from quixote import get_session_manager
|
||||
from quixote.errors import RequestError
|
||||
|
@ -18,7 +19,7 @@ from wcs.qommon import x509utils
|
|||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.qommon.ident.idp import MethodAdminDirectory
|
||||
from wcs.qommon.misc import get_lasso_server
|
||||
from wcs.qommon.saml2 import Saml2Directory
|
||||
from wcs.qommon.saml2 import Saml2Directory, SOAPException
|
||||
|
||||
from .test_hobo_notify import PROFILE
|
||||
from .utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
|
@ -115,7 +116,11 @@ def test_login(pub):
|
|||
assert 'rsa-sha256' in req.response.headers['location']
|
||||
|
||||
|
||||
def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT):
|
||||
def get_authn_response_msg(
|
||||
pub,
|
||||
ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
|
||||
protocol_binding=lasso.SAML2_METADATA_BINDING_POST,
|
||||
):
|
||||
idp_metadata_filepath = os.path.join(pub.app_dir, 'idp-http-sso.example.net-saml2-metadata-metadata.xml')
|
||||
idp_key_filepath = os.path.join(pub.app_dir, 'idp-http-sso.example.net-saml2-metadata-privatekey.pem')
|
||||
idp = lasso.Server(idp_metadata_filepath, idp_key_filepath, None, None)
|
||||
|
@ -128,7 +133,7 @@ def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PER
|
|||
login.initIdpInitiatedAuthnRequest(pub.cfg['sp']['saml2_providerid'])
|
||||
login.request.nameIDPolicy.format = ni_format
|
||||
login.request.nameIDPolicy.allowCreate = True
|
||||
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
||||
login.request.protocolBinding = protocol_binding
|
||||
login.processAuthnRequestMsg(None)
|
||||
login.validateRequestMsg(True, True)
|
||||
login.buildAssertion(
|
||||
|
@ -178,8 +183,12 @@ def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PER
|
|||
attributes.append(role_slug_attribute)
|
||||
login.assertion.attributeStatement[0].attribute = attributes
|
||||
|
||||
login.buildAuthnResponseMsg()
|
||||
return login.msgBody
|
||||
if protocol_binding == lasso.SAML2_METADATA_BINDING_POST:
|
||||
login.buildAuthnResponseMsg()
|
||||
return login.msgBody
|
||||
else:
|
||||
login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET)
|
||||
return login.msgUrl
|
||||
|
||||
|
||||
def get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT):
|
||||
|
@ -340,6 +349,45 @@ def test_assertion_consumer_external_url_redirect_after_url(pub):
|
|||
saml2.assertionConsumerPost()
|
||||
|
||||
|
||||
def test_assertion_consumer_artifact_error(pub):
|
||||
def get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT):
|
||||
msg_url = get_authn_response_msg(pub, protocol_binding=lasso.SAML2_METADATA_BINDING_ARTIFACT)
|
||||
artifact = urllib.parse.parse_qs(urllib.parse.urlparse(msg_url).query)['SAMLart'][0]
|
||||
req = HTTPRequest(
|
||||
None,
|
||||
{
|
||||
'SERVER_NAME': 'example.net',
|
||||
'SCRIPT_NAME': '',
|
||||
'PATH_INFO': '/saml/assertionConsumerArtifact',
|
||||
'QUERY_STRING': urllib.parse.urlencode(
|
||||
{'SAMLart': artifact, 'RelayState': '/foobar/?test=ok'}
|
||||
),
|
||||
},
|
||||
)
|
||||
req.process_inputs()
|
||||
pub._set_request(req)
|
||||
pub.session_class.wipe()
|
||||
req.session = pub.session_class(id=1)
|
||||
assert req.session.user is None
|
||||
return req
|
||||
|
||||
with mock.patch('wcs.qommon.saml2.soap_call', side_effet=SOAPException()):
|
||||
req = get_assertion_consumer_request(pub)
|
||||
saml2 = Saml2Directory()
|
||||
saml2.assertionConsumerArtifact()
|
||||
assert req.response.status_code == 302
|
||||
assert req.response.headers['location'] == 'http://example.net/saml/error?RelayState=/foobar/%3Ftest%3Dok'
|
||||
|
||||
|
||||
def test_saml_error_page(pub):
|
||||
resp = get_app(pub).get('/saml/error?RelayState=/foobar/%3Ftest%3Dok')
|
||||
resp = resp.form.submit()
|
||||
assert resp.status_int == 302
|
||||
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['RelayState'] == [
|
||||
'/foobar/?test=ok'
|
||||
]
|
||||
|
||||
|
||||
def test_saml_login_page(pub):
|
||||
resp = get_app(pub).get('/login/')
|
||||
assert resp.status_int == 302
|
||||
|
|
|
@ -41,7 +41,7 @@ from quixote.http_request import parse_header
|
|||
|
||||
from . import _, errors, force_str, misc
|
||||
from .publisher import get_cfg, get_logger
|
||||
from .template import error_page
|
||||
from .template import QommonTemplateResponse, error_page, html_top
|
||||
|
||||
|
||||
class SOAPException(Exception):
|
||||
|
@ -120,6 +120,7 @@ class Saml2Directory(Directory):
|
|||
'metadata',
|
||||
('metadata.xml', 'metadata'),
|
||||
'public_key',
|
||||
'error',
|
||||
]
|
||||
|
||||
def _q_traverse(self, path):
|
||||
|
@ -168,7 +169,7 @@ class Saml2Directory(Directory):
|
|||
def login(self):
|
||||
return self.perform_login()
|
||||
|
||||
def perform_login(self, idp=None):
|
||||
def perform_login(self, idp=None, relay_state=None):
|
||||
server = misc.get_lasso_server()
|
||||
if not server:
|
||||
return error_page(_('SAML 2.0 support not yet configured.'))
|
||||
|
@ -185,10 +186,13 @@ class Saml2Directory(Directory):
|
|||
login.request.forceAuthn = get_request().form.get('forceAuthn') == 'true'
|
||||
login.request.isPassive = get_request().form.get('IsPassive') == 'true'
|
||||
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
|
||||
if isinstance(get_request().form.get('next'), str):
|
||||
login.msgRelayState = get_request().form.get('next')
|
||||
|
||||
next_url = login.msgRelayState or get_publisher().get_frontoffice_url()
|
||||
if not relay_state and isinstance(get_request().form.get('next'), str):
|
||||
relay_state = get_request().form.get('next')
|
||||
if relay_state:
|
||||
login.msgRelayState = relay_state
|
||||
|
||||
next_url = relay_state or get_publisher().get_frontoffice_url()
|
||||
parsed_url = urllib.parse.urlparse(next_url)
|
||||
request = get_request()
|
||||
scheme = parsed_url.scheme or request.get_scheme()
|
||||
|
@ -242,7 +246,11 @@ class Saml2Directory(Directory):
|
|||
try:
|
||||
soap_answer = soap_call(login.msgUrl, login.msgBody, client_cert=client_cert)
|
||||
except SOAPException:
|
||||
return error_page(_('Failure to communicate with identity provider'))
|
||||
relay_state = request.form.get('RelayState', None)
|
||||
path = '/saml/error'
|
||||
if relay_state:
|
||||
path += '?RelayState=' + urllib.parse.quote(relay_state)
|
||||
return redirect(path)
|
||||
|
||||
try:
|
||||
login.processResponseMsg(force_str(soap_answer))
|
||||
|
@ -786,6 +794,13 @@ class Saml2Directory(Directory):
|
|||
publickey = open(misc.get_abs_path(get_cfg('sp')['publickey'])).read()
|
||||
return publickey
|
||||
|
||||
def error(self):
|
||||
request = get_request()
|
||||
if request.get_method() == 'POST':
|
||||
return self.perform_login(relay_state=request.form.get('RelayState'))
|
||||
html_top(title=_('Authentication error'))
|
||||
return QommonTemplateResponse(templates=['qommon/saml-error.html'], context={})
|
||||
|
||||
# retain compatibility with old metadatas
|
||||
singleSignOnArtifact = assertionConsumerArtifact
|
||||
singleSignOnPost = assertionConsumerPost
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
{% extends template_base %}
|
||||
{% load i18n %}
|
||||
|
||||
{% block body %}
|
||||
<div class="warningnotice">{% trans "There was a temporary error during your authentication, please retry later." %}</div>
|
||||
|
||||
<form method="post">
|
||||
<button>{% trans "Retry" %}</button>
|
||||
</form>
|
||||
{% endblock %}
|
Loading…
Reference in New Issue