770 lines
32 KiB
Python
770 lines
32 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import time
|
|
import sys
|
|
from xml.sax.saxutils import escape
|
|
|
|
try:
|
|
import lasso
|
|
except ImportError:
|
|
lasso = None
|
|
|
|
from django.utils.encoding import force_text
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from quixote import get_request, get_response, redirect, get_field, get_publisher
|
|
from quixote.http_request import parse_header
|
|
from quixote.directory import Directory
|
|
from quixote import get_session, get_session_manager
|
|
|
|
from . import misc
|
|
from .publisher import get_cfg, get_logger
|
|
from . import _, force_str
|
|
from .template import error_page
|
|
from wcs.roles import Role
|
|
|
|
from . import errors
|
|
|
|
|
|
class SOAPException(Exception):
|
|
url = None
|
|
|
|
def __init__(self, url=None):
|
|
self.url = url
|
|
|
|
|
|
def does_idp_authentication():
|
|
methods = get_cfg('identification', {}).get('methods')
|
|
return 'idp' in methods
|
|
|
|
|
|
def soap_call(url, msg, client_cert = None):
|
|
try:
|
|
response, status, data, auth_header = misc.http_post_request(
|
|
url, msg,
|
|
headers={'Content-Type': 'text/xml'},
|
|
cert_file=client_cert)
|
|
except errors.ConnectionError as err:
|
|
# exception could be raised by request
|
|
get_logger().warn('SOAP error (on %s): %s' % (url, err))
|
|
raise SOAPException(url)
|
|
if status not in (200, 204): # 204 ok for federation termination
|
|
get_logger().warn('SOAP error (%s) (on %s)' % (status, url))
|
|
raise SOAPException(url)
|
|
return data
|
|
|
|
|
|
def soap_endpoint(method):
|
|
def f(*args, **kwargs):
|
|
if get_request().get_method() != 'POST':
|
|
raise errors.TraversalError()
|
|
response = get_response()
|
|
response.set_content_type('text/xml', 'utf-8')
|
|
try:
|
|
return method(*args, **kwargs)
|
|
except Exception as e:
|
|
get_logger().error('Exception in method %r: %s; returning a SOAP error' % (method, e))
|
|
fault = lasso.SoapFault.newFull('Internal Server Error', str(e))
|
|
body = lasso.SoapBody()
|
|
body.any = [ fault ]
|
|
envelope = lasso.SoapEnvelope(body)
|
|
return envelope.exportToXml()
|
|
return f
|
|
|
|
|
|
def saml2_status_summary(response):
|
|
if not response.status or not response.status.statusCode:
|
|
return 'No status or status code'
|
|
code = response.status.statusCode.value
|
|
if response.status.statusCode.statusCode:
|
|
code += ':' + response.status.statusCode.statusCode.value
|
|
return code
|
|
|
|
|
|
def get_remote_provider_cfg(profile):
|
|
'''Lookup the configuration for a remote provider given a profile'''
|
|
remote_provider_key = misc.get_provider_key(profile.remoteProviderId)
|
|
return get_cfg('idp', {}).get(remote_provider_key)
|
|
|
|
|
|
class Saml2Directory(Directory):
|
|
_q_exports = ['login',
|
|
'singleSignOnArtifact', 'singleSignOnPost', 'singleSignOnRedirect',
|
|
'assertionConsumerArtifact', 'assertionConsumerPost', 'assertionConsumerRedirect',
|
|
'singleLogout', 'singleLogoutReturn', 'singleLogoutSOAP',
|
|
'metadata', ('metadata.xml', 'metadata'), 'public_key']
|
|
|
|
def _q_traverse(self, path):
|
|
# if lasso is not installed, hide the saml endpoints
|
|
if lasso is None:
|
|
if does_idp_authentication():
|
|
rel_path = os.path.join('/saml', *path)
|
|
get_logger().error(
|
|
'%s unavailable - lasso is not installed' % rel_path)
|
|
raise errors.TraversalError()
|
|
return Directory._q_traverse(self, path)
|
|
|
|
def log_profile_error(self, profile, error, what):
|
|
get_logger().info('%(what)s from %(provider)s failed: %(message)s' % {
|
|
'provider': profile.remoteProviderId,
|
|
'message': error[1], 'what': what })
|
|
|
|
def postForm(self, profile = None, url = None, body = None):
|
|
if profile:
|
|
url = profile.msgUrl
|
|
body = profile.msgBody
|
|
# XXX: translate message in the form
|
|
return """<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
|
|
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
|
|
<html xmlns="http://www.w3.org/1999/xhtml">
|
|
<head>
|
|
<title>Authentication Request</title>
|
|
</head>
|
|
<body onload="document.forms[0].submit()">
|
|
<h1>Authentication Request</h1>
|
|
<form action="%(url)s" method="post">
|
|
<p>You should be automaticaly redirected to the identity provider.</p>
|
|
<p>If this page is still visible after a few seconds, press the <em>Send</em> button below.</p>
|
|
<input type="hidden" name="SAMLRequest" value="%(body)s" />
|
|
<div class="buttons-bar">
|
|
<input type="submit" name="sendButton" value="Send" />
|
|
</div>
|
|
</form>
|
|
</body>
|
|
</html>
|
|
""" % { 'url': url, 'body': body}
|
|
|
|
def login(self):
|
|
return self.perform_login()
|
|
|
|
def perform_login(self, idp = None):
|
|
server = misc.get_lasso_server()
|
|
if not server:
|
|
return error_page(_('SAML 2.0 support not yet configured.'))
|
|
login = lasso.Login(server)
|
|
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT)
|
|
idp_options = get_remote_provider_cfg(login)
|
|
if idp_options.get('nameidformat') == 'unspecified':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
|
|
elif idp_options.get('nameidformat') == 'email':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL
|
|
else:
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
|
login.request.nameIDPolicy.allowCreate = True
|
|
login.request.forceAuthn = get_request().form.get('forceAuthn') == 'true'
|
|
login.request.isPassive = get_request().form.get('IsPassive') == 'true'
|
|
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
|
|
if isinstance(get_request().form.get('next'), str):
|
|
login.msgRelayState = get_request().form.get('next')
|
|
|
|
next_url = login.msgRelayState or get_publisher().get_frontoffice_url()
|
|
samlp_extensions = '''<samlp:Extensions
|
|
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
|
|
xmlns:eo="https://www.entrouvert.com/">
|
|
<eo:next_url>%s</eo:next_url>
|
|
</samlp:Extensions>''' % escape(next_url)
|
|
# work around lasso bug https://dev.entrouvert.org/issues/23001
|
|
if hasattr(lasso.Samlp2Extensions, 'any'):
|
|
login.request.extensions = lasso.Node.newFromXmlNode(samlp_extensions)
|
|
else:
|
|
login.request.extensions = lasso.Samlp2Extensions()
|
|
login.request.extensions.setOriginalXmlnode(samlp_extensions)
|
|
|
|
login.buildAuthnRequestMsg()
|
|
return redirect(login.msgUrl)
|
|
|
|
def assertionConsumerArtifact(self):
|
|
server = misc.get_lasso_server()
|
|
if not server:
|
|
return error_page(_('SAML 2.0 support not yet configured.'))
|
|
login = lasso.Login(server)
|
|
request = get_request()
|
|
try:
|
|
if request.get_method() == 'GET':
|
|
message, method = request.get_query(), lasso.HTTP_METHOD_ARTIFACT_GET
|
|
elif request.get_method() == 'POST':
|
|
message, method = request.form.get('SAMLart',None), lasso.HTTP_METHOD_ARTIFACT_POST
|
|
else:
|
|
get_logger().info('Bad HTTP method on assertionConsumerArtifact endpoint')
|
|
return error_page(_('Invalid authentication response'))
|
|
login.initRequest(force_str(message), method)
|
|
except lasso.Error as error:
|
|
self.log_profile_error(login, error, 'login.initRequest')
|
|
return error_page(_('Invalid authentication response'))
|
|
|
|
login.buildRequestMsg()
|
|
remote_provider_cfg = get_cfg('idp', {}).get(misc.get_provider_key(login.remoteProviderId))
|
|
client_cert = remote_provider_cfg.get('clientcertificate')
|
|
|
|
try:
|
|
soap_answer = soap_call(login.msgUrl, login.msgBody, client_cert = client_cert)
|
|
except SOAPException:
|
|
return error_page(_('Failure to communicate with identity provider'))
|
|
|
|
try:
|
|
login.processResponseMsg(force_str(soap_answer))
|
|
except lasso.Error as error:
|
|
return self.assertion_consumer_process_response_error(login, error)
|
|
return self.sso_after_response(login)
|
|
|
|
def assertion_consumer_process_response_error(self, login, error):
|
|
if isinstance(error, lasso.DsError):
|
|
message = _('Signature verification failed')
|
|
elif error[0] in (lasso.LOGIN_ERROR_STATUS_NOT_SUCCESS,
|
|
lasso.PROFILE_ERROR_STATUS_NOT_SUCCESS):
|
|
try:
|
|
# Passive login failed, just continue
|
|
if login.response.status.statusCode.statusCode.value == \
|
|
lasso.SAML2_STATUS_CODE_NO_PASSIVE:
|
|
return self.continue_to_after_url()
|
|
# if error code is request denied, it's probably because
|
|
# the user pressed 'cancel' on the identity provider
|
|
# authentication form.
|
|
if login.response.status.statusCode.statusCode.value == \
|
|
lasso.SAML2_STATUS_CODE_REQUEST_DENIED:
|
|
return redirect(get_publisher().get_root_url())
|
|
except AttributeError:
|
|
pass
|
|
message = _('Authentication failure %s') % saml2_status_summary(login.response)
|
|
elif error[0] == lasso.SERVER_ERROR_PROVIDER_NOT_FOUND:
|
|
message = _('Request from unknown provider ID')
|
|
elif error[0] == lasso.LOGIN_ERROR_UNKNOWN_PRINCIPAL:
|
|
message = _('Authentication failure; unknown principal')
|
|
elif error[0] == lasso.LOGIN_ERROR_FEDERATION_NOT_FOUND:
|
|
message = _('Authentication failure; federation not found')
|
|
elif error[0] == lasso.PROFILE_ERROR_MISSING_RESPONSE:
|
|
message = _('Authentication failure; failed to get response')
|
|
else:
|
|
message = _('Unknown error')
|
|
self.log_profile_error(login, error, 'assertion consumer error: %r' % message)
|
|
return error_page(message)
|
|
|
|
def sso_after_response(self, login):
|
|
try:
|
|
assertion = login.response.assertion[0]
|
|
last_slash = get_request().get_url().rfind("/")
|
|
if assertion.subject.subjectConfirmation.subjectConfirmationData.recipient != \
|
|
get_cfg('sp',{}).get('saml2_base_url') + get_request().get_url()[last_slash:]:
|
|
return error_page('SubjectConfirmation Recipient Mismatch')
|
|
except Exception as e:
|
|
get_publisher().notify_of_exception(sys.exc_info(), context='[SAML]')
|
|
return error_page('Error checking SubjectConfirmation Recipient')
|
|
|
|
assertions_dir = os.path.join(get_publisher().app_dir, 'assertions')
|
|
if not os.path.exists(assertions_dir):
|
|
os.mkdir(assertions_dir)
|
|
|
|
assertion_fn = os.path.join(assertions_dir, assertion.iD)
|
|
if os.path.exists(assertion_fn):
|
|
return error_page('Assertion replay')
|
|
open(assertion_fn, 'w').close()
|
|
|
|
try:
|
|
if assertion.subject.subjectConfirmation.method != \
|
|
'urn:oasis:names:tc:SAML:2.0:cm:bearer':
|
|
return error_page('Unknown SubjectConfirmation Method')
|
|
except Exception as e:
|
|
return error_page('Error checking SubjectConfirmation Method')
|
|
|
|
try:
|
|
audience_ok = False
|
|
for audience_restriction in assertion.conditions.audienceRestriction:
|
|
if audience_restriction.audience != login.server.providerId:
|
|
return error_page('Incorrect AudienceRestriction')
|
|
audience_ok = True
|
|
if not audience_ok:
|
|
return error_page('Incorrect AudienceRestriction')
|
|
except Exception as e:
|
|
return error_page('Error checking AudienceRestriction')
|
|
|
|
try:
|
|
current_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
|
not_before = assertion.subject.subjectConfirmation.subjectConfirmationData.notBefore
|
|
not_on_or_after = assertion.subject.subjectConfirmation.subjectConfirmationData.notOnOrAfter
|
|
if not_before and current_time < not_before:
|
|
return error_page('Assertion received too early')
|
|
if not_on_or_after and current_time > not_on_or_after:
|
|
return error_page('Assertion expired')
|
|
except Exception as e:
|
|
get_publisher().notify_of_exception(sys.exc_info(), context='[SAML]')
|
|
return error_page('Error checking Assertion Time')
|
|
|
|
# TODO: check for unknown conditions
|
|
|
|
login.acceptSso()
|
|
session = get_session()
|
|
if login.isSessionDirty:
|
|
if login.session:
|
|
session.lasso_session_dump = login.session.dump()
|
|
else:
|
|
session.lasso_session_dump = None
|
|
|
|
if assertion.authnStatement[0].authnContext and \
|
|
assertion.authnStatement[0].authnContext.authnContextClassRef:
|
|
session.saml_authn_context = assertion.authnStatement[0].authnContext.authnContextClassRef
|
|
if assertion.authnStatement[0].sessionIndex:
|
|
session.lasso_session_index = assertion.authnStatement[0].sessionIndex
|
|
|
|
if assertion.authnStatement[0].sessionNotOnOrAfter:
|
|
try:
|
|
t = misc.parse_isotime(assertion.authnStatement[0].sessionNotOnOrAfter)
|
|
except ValueError:
|
|
return error_page('Error extracting SessionNotOnOrAfter')
|
|
session.set_expire(t)
|
|
|
|
user = self.lookup_user(session, login)
|
|
if user:
|
|
session.set_user(user.id)
|
|
else:
|
|
session.set_user('anonymous-%s' % login.nameIdentifier.content)
|
|
if login.identity:
|
|
session.lasso_anonymous_identity_dump = login.identity.dump()
|
|
else:
|
|
# XXX: this situation happened with SSO initiated by IdP, this
|
|
# is not normal
|
|
pass
|
|
session.lasso_identity_provider_id = login.remoteProviderId
|
|
session.message = None
|
|
return self.continue_to_after_url()
|
|
|
|
def continue_to_after_url(self):
|
|
request = get_request()
|
|
relay_state = request.form.get('RelayState', None)
|
|
session = get_session()
|
|
response = get_response()
|
|
if relay_state == 'backoffice':
|
|
after_url = get_publisher().get_backoffice_url()
|
|
elif relay_state:
|
|
parsed_url = urlparse.urlparse(relay_state)
|
|
scheme = parsed_url.scheme or request.get_scheme()
|
|
netloc = parsed_url.netloc or request.get_server()
|
|
after_url = urlparse.urlunsplit((scheme, netloc, parsed_url.path, parsed_url.query,
|
|
parsed_url.fragment))
|
|
if not (after_url.startswith(get_publisher().get_backoffice_url()) or
|
|
after_url.startswith(get_publisher().get_frontoffice_url())):
|
|
raise errors.RequestError()
|
|
else:
|
|
after_url = get_publisher().get_frontoffice_url()
|
|
response.set_status(303)
|
|
response.headers['location'] = after_url
|
|
response.content_type = 'text/plain'
|
|
return "Your browser should redirect you"
|
|
|
|
|
|
def assertionConsumerPost(self):
|
|
message = get_field('SAMLResponse')
|
|
if not message:
|
|
return error_page(_('No SAML Response'))
|
|
return self.assertion_consumer_process(message)
|
|
|
|
def assertionConsumerRedirect(self):
|
|
query_string = get_request().get_query()
|
|
if not query_string:
|
|
return error_page(_('No SAML Response in query string'))
|
|
return self.assertion_consumer_process(query_string)
|
|
|
|
def assertion_consumer_process(self, message):
|
|
server = misc.get_lasso_server()
|
|
if not server:
|
|
return error_page(_('SAML 2.0 support not yet configured.'))
|
|
login = lasso.Login(server)
|
|
try:
|
|
login.processAuthnResponseMsg(message)
|
|
except lasso.Error as error:
|
|
return self.assertion_consumer_process_response_error(login, error)
|
|
return self.sso_after_response(login)
|
|
|
|
def fill_user_attributes(self, session, login, user):
|
|
'''Fill user fields from SAML2 assertion attributes'''
|
|
logger = get_logger()
|
|
|
|
save = False
|
|
idp = get_remote_provider_cfg(login)
|
|
# lookup for attributes in assertion and automatically create identity
|
|
lasso_session = lasso.Session.newFromDump(session.lasso_session_dump)
|
|
try:
|
|
assertion = lasso_session.getAssertions(None)[0]
|
|
except:
|
|
get_logger().warn('no assertion')
|
|
return None
|
|
|
|
d = {}
|
|
m = {}
|
|
try:
|
|
for attribute in assertion.attributeStatement[0].attribute:
|
|
# always mark the attribute as being present, even if it won't
|
|
# have any value, as an empty value (role-slug) must not be
|
|
# ignored.
|
|
m.setdefault(attribute.name, [])
|
|
try:
|
|
d[attribute.name] = attribute.attributeValue[0].any[0].content
|
|
for attribute_value in attribute.attributeValue:
|
|
l = m[attribute.name]
|
|
l.append(attribute_value.any[0].content)
|
|
except IndexError:
|
|
pass
|
|
except IndexError:
|
|
pass
|
|
logger.debug('fill_user_attributes: received attributes %r', m)
|
|
admin_attributes = idp.get('admin-attributes', {'local-admin': 'true'}) or {}
|
|
if admin_attributes:
|
|
is_admin = False
|
|
for key, matching_value in admin_attributes.items():
|
|
for value in m.get(key, []):
|
|
if value == matching_value:
|
|
is_admin = True
|
|
if user.is_admin != is_admin:
|
|
user.is_admin = is_admin
|
|
if user.is_admin:
|
|
logger.info('giving user %s the admin rights', user.id)
|
|
else:
|
|
logger.info('taking user %s the admin rights', user.id)
|
|
save = True
|
|
attribute_mapping = idp.get('attribute-mapping') or {}
|
|
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef(publisher=get_publisher())
|
|
if formdef:
|
|
dict_fields = {x.id: x for x in formdef.fields}
|
|
else:
|
|
dict_fields = {}
|
|
|
|
if user.form_data is None:
|
|
user.form_data = {}
|
|
for key, field_id in attribute_mapping.items():
|
|
if not key in d:
|
|
continue
|
|
field_value = d[key]
|
|
field = dict_fields.get(field_id)
|
|
if field and field.convert_value_from_anything:
|
|
try:
|
|
field_value = field.convert_value_from_anything(field_value)
|
|
except ValueError:
|
|
get_publisher().notify_of_exception(sys.exc_info(), context='[SAML]')
|
|
continue
|
|
if user.form_data.get(field_id) != field_value:
|
|
user.form_data[field_id] = field_value
|
|
logger.info('setting field %s of user %s to value %r', field_id, user.id, field_value)
|
|
save = True
|
|
|
|
# update user roles from role-slug
|
|
if 'role-slug' in m:
|
|
role_ids = []
|
|
names = []
|
|
# uuid are in a role-slug attribute, it's historical, as at some
|
|
# point roles in authentic where provisionned from w.c.s. and join
|
|
# was done on the slug field.
|
|
for uuid in m['role-slug']:
|
|
role = Role.resolve(uuid)
|
|
if not role:
|
|
logger.warn('role uuid %s is unknown', uuid)
|
|
continue
|
|
role_ids.append(force_str(role.id))
|
|
names.append(force_str(role.name))
|
|
if set(user.roles or []) != set(role_ids):
|
|
user.roles = role_ids
|
|
logger.info('enrolling user %s in %s', user.id, ', '.join(names))
|
|
save = True
|
|
|
|
# verified attributes
|
|
verified_attributes = m.get('verified_attributes')
|
|
if verified_attributes is not None:
|
|
if verified_attributes:
|
|
# If there are any verified attributes we consider
|
|
# first and last names are also verified. This is to work
|
|
# around the fact that those attributes are handled
|
|
# differently in authentic and cannot be marked as
|
|
# verified.
|
|
verified_attributes.extend(['first_name', 'last_name'])
|
|
verified_fields = []
|
|
verified_fieldnames = []
|
|
if user.get_formdef() and user.get_formdef().fields:
|
|
for field in user.get_formdef().fields:
|
|
if field.varname in verified_attributes:
|
|
verified_fields.append(field.id)
|
|
verified_fieldnames.append(field.varname)
|
|
if set(user.verified_fields or []) != set(verified_fields):
|
|
user.verified_fields = verified_fields
|
|
logger.info('verified attributes for user %s are %s', user.id,
|
|
', '.join(verified_fieldnames))
|
|
save = True
|
|
|
|
if save:
|
|
user.store()
|
|
|
|
def lookup_user(self, session, login):
|
|
if not login.nameIdentifier or not login.nameIdentifier.content:
|
|
return None
|
|
user_class = get_publisher().user_class
|
|
ni = login.nameIdentifier.content
|
|
session.name_identifier = ni
|
|
while True:
|
|
users = sorted(user_class.get_users_with_name_identifier(ni),
|
|
key=lambda u: (u.last_seen or 0, -int(u.id)))
|
|
if users:
|
|
# if multiple users, use the more recently used or the younger
|
|
user = users[-1]
|
|
else:
|
|
user = get_publisher().user_class(ni)
|
|
user.name_identifiers = [ni]
|
|
if login.identity:
|
|
user.lasso_dump = login.identity.dump()
|
|
user.store()
|
|
|
|
others = user_class.get_users_with_name_identifier(ni)
|
|
# there is an user mapping to the same id with a younger id:
|
|
# try again.
|
|
if any(int(other.id) < int(user.id) for other in others):
|
|
user.remove_self()
|
|
continue
|
|
break
|
|
|
|
self.fill_user_attributes(session, login, user)
|
|
|
|
if user.form_data:
|
|
user.set_attributes_from_formdata(user.form_data)
|
|
|
|
if not user.name:
|
|
# we didn't get useful attributes, forget it.
|
|
get_logger().warn('failed to get useful attributes from the assertion')
|
|
return None
|
|
|
|
user.store()
|
|
return user
|
|
|
|
def slo_sp(self, method = None):
|
|
if method is None:
|
|
method = lasso.HTTP_METHOD_REDIRECT
|
|
|
|
logout = lasso.Logout(misc.get_lasso_server())
|
|
session = get_session()
|
|
|
|
if session.lasso_session_dump:
|
|
logout.setSessionFromDump(session.lasso_session_dump)
|
|
|
|
user = get_request().user
|
|
if user and user.lasso_dump:
|
|
logout.setIdentityFromDump(user.lasso_dump)
|
|
session.user = None
|
|
|
|
if method == lasso.HTTP_METHOD_REDIRECT:
|
|
return self.slo_sp_redirect(logout)
|
|
|
|
if method == lasso.HTTP_METHOD_SOAP:
|
|
return self.slo_sp_soap(logout)
|
|
|
|
def slo_sp_redirect(self, logout):
|
|
session = get_session()
|
|
try:
|
|
logout.initRequest(None, lasso.HTTP_METHOD_REDIRECT)
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.initRequest')
|
|
get_session_manager().expire_session()
|
|
return redirect(get_publisher().get_root_url())
|
|
if session.lasso_session_index:
|
|
logout.request.sessionIndex = session.lasso_session_index
|
|
logout.buildRequestMsg()
|
|
return redirect(logout.msgUrl)
|
|
|
|
|
|
def singleLogoutReturn(self):
|
|
logout = lasso.Logout(misc.get_lasso_server())
|
|
if get_session().lasso_session_dump:
|
|
logout.setSessionFromDump(get_session().lasso_session_dump)
|
|
message = get_request().get_query()
|
|
return self.slo_return(logout, message)
|
|
|
|
def slo_return(self, logout, message):
|
|
session = get_session()
|
|
|
|
try:
|
|
logout.processResponseMsg(force_str(message))
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.processResponseMsg')
|
|
if error[0] == lasso.LOGOUT_ERROR_UNKNOWN_PRINCIPAL:
|
|
get_logger().warn('Unknown principal on logout, probably session stopped already on IdP')
|
|
# XXX: wouldn't work when logged on two IdP
|
|
session.lasso_session_dump = None
|
|
else:
|
|
get_logger().info('Successful logout from %s' % logout.remoteProviderId)
|
|
if logout.isSessionDirty:
|
|
if logout.session:
|
|
session.lasso_session_dump = logout.session.dump()
|
|
else:
|
|
session.lasso_session_dump = None
|
|
|
|
get_session_manager().expire_session()
|
|
return redirect(get_publisher().get_root_url())
|
|
|
|
|
|
def slo_sp_soap(self, logout):
|
|
try:
|
|
logout.initRequest(None, lasso.HTTP_METHOD_SOAP)
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.initRequest SOAP')
|
|
get_session_manager().expire_session()
|
|
get_session().message = ('error', _('Could not send logout request to the identity provider'))
|
|
return redirect(get_publisher().get_root_url())
|
|
else:
|
|
logout.buildRequestMsg()
|
|
remote_provider_cfg = get_cfg('idp', {})\
|
|
.get(misc.get_provider_key(logout.remoteProviderId))
|
|
client_cert = remote_provider_cfg.get('clientcertificate')
|
|
try:
|
|
soap_answer = soap_call(logout.msgUrl, logout.msgBody, client_cert = client_cert)
|
|
except SOAPException:
|
|
return error_page(_('Failure to communicate with identity provider'))
|
|
|
|
return self.slo_return(logout, soap_answer)
|
|
|
|
def get_soap_message(self):
|
|
request = get_request()
|
|
ctype = request.environ.get('CONTENT_TYPE')
|
|
if not ctype:
|
|
get_logger().warn('SOAP Endpoint got message without content-type')
|
|
raise SOAPException()
|
|
|
|
ctype, ctype_params = parse_header(ctype)
|
|
if ctype not in ('text/xml', 'application/vnd.paos+xml'):
|
|
get_logger().warn('SOAP Endpoint got message with wrong content-type (%s)' % ctype)
|
|
raise SOAPException()
|
|
|
|
length = int(request.environ.get('CONTENT_LENGTH'))
|
|
return request.stdin.read(length)
|
|
|
|
#
|
|
# SLO IdP Section
|
|
#
|
|
def kill_sessions(self, nameid, session_indexes=(), not_current_session = False):
|
|
if not not_current_session:
|
|
get_session_manager().expire_session()
|
|
# session has not been found, this may be because the user has
|
|
# its browser configured so that cookies are not sent for
|
|
# remote queries and IdP is using image-based SLO.
|
|
# so we look up a session with the appropriate name identifier
|
|
name_identifier = nameid.content
|
|
sessions = get_session_manager().get_sessions_for_saml(nameid.content,
|
|
session_indexes)
|
|
session_manager = get_session_manager()
|
|
for session in sessions:
|
|
if session.id:
|
|
id = session.id
|
|
session.id = None
|
|
try:
|
|
del session_manager[id]
|
|
except KeyError as e:
|
|
pass
|
|
|
|
def singleLogoutPOST(self):
|
|
message = get_field('SAMLRequest')
|
|
return self.slo_idp(message)
|
|
|
|
@soap_endpoint
|
|
def singleLogoutSOAP(self):
|
|
try:
|
|
soap_message = self.get_soap_message()
|
|
except:
|
|
return
|
|
return self.slo_idp(soap_message, soap = True)
|
|
|
|
def singleLogout(self):
|
|
return self.slo_idp(get_request().get_query())
|
|
|
|
def slo_idp(self, message, soap = False):
|
|
logout = lasso.Logout(misc.get_lasso_server())
|
|
try:
|
|
logout.processRequestMsg(force_str(message))
|
|
except lasso.Error as error:
|
|
# XXX: add option to ignore signature errors for a specific sp
|
|
self.log_profile_error(logout, error, 'logout.processRequestMsg')
|
|
return self.slo_idp_finish(logout, soap)
|
|
|
|
try:
|
|
sessions = get_session_manager().get_sessions_for_saml(
|
|
logout.nameIdentifier.content, logout.request.sessionIndexes)
|
|
sessions = list(sessions)
|
|
if sessions:
|
|
logout.setSessionFromDump(sessions[0].lasso_session_dump)
|
|
else:
|
|
get_logger().info('No session open for nameid %s', logout.nameIdentifier.content)
|
|
return self.slo_idp_finish(logout, soap)
|
|
user = sessions[0] and sessions[0].get_user()
|
|
if user and user.lasso_dump:
|
|
logout.setIdentityFromDump(user.lasso_dump)
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.setSessionFromDump')
|
|
return self.slo_idp_finish(logout, soap)
|
|
|
|
# Request is good (no problem of signature) kill all sessions
|
|
try:
|
|
self.kill_sessions(logout.nameIdentifier, logout.request.sessionIndexes)
|
|
except Exception as error:
|
|
get_session_manager().expire_session()
|
|
get_logger().info('kill_session: %s' % error)
|
|
return self.slo_idp_finish(logout, soap)
|
|
|
|
# if session is still not good, clean it
|
|
try:
|
|
assertion = logout.session.getAssertions(logout.remoteProviderId)[0]
|
|
if logout.request.sessionIndex and (
|
|
assertion.authnStatement[0].sessionIndex not in logout.request.sessionIndexes):
|
|
logout.session.removeAssertion(logout.remoteProviderId)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
logout.validateRequest()
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.validateRequest')
|
|
return self.slo_idp_finish(logout, soap)
|
|
|
|
def slo_idp_finish(self, logout, soap):
|
|
try:
|
|
logout.buildResponseMsg()
|
|
except lasso.Error as error:
|
|
self.log_profile_error(logout, error, 'logout.buildResponseMsg')
|
|
else:
|
|
if soap or (logout.msgBody and not logout.msgUrl):
|
|
get_response().set_content_type('text/xml', 'utf-8')
|
|
return logout.msgBody
|
|
elif logout.msgBody and logout.msgUrl:
|
|
return self.postForm(logout)
|
|
else:
|
|
return redirect(logout.msgUrl)
|
|
|
|
def metadata(self):
|
|
try:
|
|
metadata = force_text(open(misc.get_abs_path(
|
|
get_cfg('sp')['saml2_metadata'])).read(), 'utf-8')
|
|
except KeyError:
|
|
raise errors.TraversalError()
|
|
response = get_response()
|
|
response.set_content_type('text/xml', 'utf-8')
|
|
return metadata
|
|
|
|
def public_key(self):
|
|
response = get_response()
|
|
response.set_content_type('application/octet-stream')
|
|
publickey = open(misc.get_abs_path(get_cfg('sp')['publickey'])).read()
|
|
return publickey
|
|
|
|
# retain compatibility with old metadatas
|
|
singleSignOnArtifact = assertionConsumerArtifact
|
|
singleSignOnPost = assertionConsumerPost
|
|
singleSignOnRedirect = assertionConsumerRedirect
|