494 lines
19 KiB
Plaintext
494 lines
19 KiB
Plaintext
import os
|
|
import base64
|
|
import urllib
|
|
import lasso
|
|
|
|
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
|
|
from quixote.directory import Directory
|
|
from quixote.util import StaticDirectory
|
|
|
|
import admin
|
|
import backoffice
|
|
import liberty
|
|
from qommon import saml2
|
|
|
|
from qommon import errors
|
|
from qommon import get_logger
|
|
from qommon import get_cfg
|
|
from qommon import template
|
|
from qommon.form import *
|
|
import qommon.ident
|
|
|
|
from users import User
|
|
|
|
from qommon.tokens import Token
|
|
import subprocess
|
|
import logging
|
|
|
|
logging.basicConfig()
|
|
|
|
xmlindent = "/usr/bin/xmlindent"
|
|
|
|
def xml_format(str):
|
|
if os.path.exists(xmlindent):
|
|
p=subprocess.Popen(args=[xmlindent], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
return p.communicate(str)[0]
|
|
return str
|
|
|
|
class CookieGetterDirectory(Directory):
|
|
_q_exports = ['', 'spintro']
|
|
|
|
def _q_index [html] (self):
|
|
template.html_top()
|
|
_('This domain is not for humans, it is only used to get identity '
|
|
'provider discovery cookie.')
|
|
|
|
def spintro(self):
|
|
tok = get_request().form.get('tok')
|
|
token = Token.get(tok)
|
|
|
|
session = get_session_manager().get(token.session_id)
|
|
|
|
request = get_request()
|
|
try:
|
|
session.saml_idp_cookie = request.cookies['_saml_idp']
|
|
except KeyError:
|
|
session.saml_idp_cookie = ''
|
|
|
|
session.store()
|
|
token.remove_self()
|
|
|
|
return redirect(token.next_url)
|
|
|
|
|
|
class IdentDirectory(Directory):
|
|
def _q_lookup(self, component):
|
|
get_response().breadcrumb.append(('ident/', None))
|
|
return qommon.ident.get_method_directory(component)
|
|
|
|
|
|
class LoginDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def _q_index [html] (self):
|
|
get_logger().info('login')
|
|
ident_methods = get_cfg('identification', {}).get('methods', [])
|
|
|
|
if len(ident_methods) == 0:
|
|
idps = get_cfg('idp', {})
|
|
if len(idps) == 0:
|
|
return template.error_page(_('Authentication subsystem is not yet configured.'))
|
|
ident_methods = ['idp'] # fallback to old behaviour; liberty.
|
|
|
|
if len(ident_methods) == 1:
|
|
method = ident_methods[0]
|
|
return qommon.ident.login(method)
|
|
else:
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(RadiobuttonsWidget, 'method',
|
|
options = [(x.key, _(x.description)) \
|
|
for x in qommon.ident.get_method_classes() if \
|
|
x.key in ident_methods],
|
|
delim = '<br/>')
|
|
form.add_submit('submit', _('Submit'))
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
method = form.get_widget('method').parse()
|
|
if qommon.ident.base.ident_classes[method]().is_interactive():
|
|
return redirect('../ident/%s/login' % method)
|
|
else:
|
|
return qommon.ident.login(method)
|
|
else:
|
|
template.html_top(_('Login'))
|
|
'<p>%s</p>' % _('Select the identification method you want to use :')
|
|
form.render()
|
|
|
|
class RootDirectory(Directory):
|
|
_q_exports = ['', 'admin', 'backoffice', 'login', 'logout', 'liberty', 'saml',
|
|
'ident', 'register', 'info', 'encryption', 'replay_artifact']
|
|
|
|
def _q_index [html] (self):
|
|
get_response().add_css_include('../../css/prettify.css')
|
|
get_response().add_javascript([ str('../../js/prettify.js') ])
|
|
template.html_top('Lasso Conformance SP', onload = "prettyPrint()")
|
|
|
|
get_session().display_message()
|
|
|
|
if get_request().user:
|
|
self.loggedin_page()
|
|
else:
|
|
self.unlogged_page()
|
|
|
|
def info(self):
|
|
request = get_request()
|
|
http_accept = request.environ.get('HTTP_ACCEPT')
|
|
http_poas = request.environ.get('HTTP_PAOS')
|
|
|
|
if 'application/vnd.paos+xml' not in http_accept or \
|
|
'urn:liberty:paos:2003-08' not in http_poas:
|
|
return template.error_page(_('Invalid PAOS Request'))
|
|
|
|
server = misc.get_lasso_server(protocol = 'saml2')
|
|
if not server:
|
|
return template.error_page(_('SAML 2.0 support not yet configured.'))
|
|
login = lasso.Login(server)
|
|
|
|
login.initAuthnRequest(None, lasso.HTTP_METHOD_SOAP)
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
|
login.request.nameIDPolicy.allowCreate = True
|
|
# work around forced initialization in lasso
|
|
login.request.nameIDPolicy.spNameQualifier = None
|
|
login.request.forceAuthn = False
|
|
login.request.isPassive = False
|
|
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
|
|
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_PAOS
|
|
if False:
|
|
# NTT ECP requires this:
|
|
login.request.requestedAuthnContext = lasso.Samlp2RequestedAuthnContext()
|
|
t = lasso.NodeList()
|
|
ta = lasso.Saml2AuthnContextClassRef()
|
|
ta.content = 'urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract'
|
|
t.append(ta)
|
|
login.request.requestedAuthnContext.authnContextClassRef = t
|
|
login.buildAuthnRequestMsg()
|
|
|
|
response = get_response()
|
|
response.set_content_type('application/vnd.paos+xml', 'UTF-8')
|
|
response.set_header('PAOS', 'ver=urn:liberty:paos:2003-08;urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp')
|
|
return login.msgBody
|
|
|
|
def unlogged_page [html] (self):
|
|
form = Form(enctype='multipart/form-data', id = 'sso')
|
|
form.add(HtmlWidget, '<div id="sso-options">')
|
|
form.add(SingleSelectWidget, 'binding',
|
|
title = _('Protocol Binding for <Response>'),
|
|
options = [(None, ''), ('post', 'POST'), ('artifact', 'Artifact')])
|
|
form.add(CheckboxWidget, 'force_authn', value = False,
|
|
title = _('Force Authentication (ForceAuthn)'))
|
|
form.add(CheckboxWidget, 'is_passive',
|
|
title = _('No interaction (IsPassive)'))
|
|
form.add(CheckboxWidget, 'allow_create', value = True,
|
|
title = _('Allow new federation'))
|
|
form.add(SingleSelectWidget, 'nid_format',
|
|
title = _('Name Identifier Format'),
|
|
options = [('none', _('(none)')),
|
|
('persistent', _('Persistent')),
|
|
('transient', _('Transient')),
|
|
('encrypted', _('Encrypted')),
|
|
('email', _('Email')), ])
|
|
form.add(StringWidget, 'affiliation', title = _('Affiliation'))
|
|
form.add(SingleSelectWidget, 'consent',
|
|
title = _('Consent'),
|
|
options = ['', 'obtained', 'prior', 'current-implicit',
|
|
'current-explicit', 'unavailable', 'inapplicable'])
|
|
form.add(SingleSelectWidget, 'authn_context',
|
|
title = _('Authn Req Context'),
|
|
options = ['(empty)', 'password', 'password on protected transport',
|
|
'Client Certificate'],
|
|
disabled = 'disabled')
|
|
form.add(SingleSelectWidget, 'matching',
|
|
title = _('Matching Rule'),
|
|
options = ['(empty)', 'exact', 'minimum', 'maximum', 'better'],
|
|
disabled = 'disabled')
|
|
form.add(HtmlWidget, '</div>')
|
|
|
|
for kidp, idp in get_cfg('idp', {}).items():
|
|
form.add_submit(kidp, _('Log on %s') % kidp)
|
|
|
|
if get_session().saml_idp_cookie is None:
|
|
common_domain = get_cfg('sp', {}).get('common_domain')
|
|
if common_domain:
|
|
form.add_submit('intro', _('Get IdP via Introduction Cookie'))
|
|
elif get_session().saml_idp_cookie:
|
|
common_domain = get_cfg('sp', {}).get('common_domain')
|
|
intro_cookie_q = urllib.unquote_plus(get_session().saml_idp_cookie)
|
|
splitted_cookie = [x for x in intro_cookie_q.split(str(' ')) if x]
|
|
last_id = splitted_cookie[-1]
|
|
v = misc.get_provider_key(base64.decodestring(last_id))
|
|
form.add_submit('intro-%s' % v,
|
|
_('Log on using IdP discovered from IdP Introduction (%s)') % v)
|
|
|
|
if form.is_submitted():
|
|
return self.do_login(form)
|
|
|
|
form.render()
|
|
|
|
if not common_domain:
|
|
'<p><strong>'
|
|
_('Note: ')
|
|
'</strong>'
|
|
_('Service Provider not configured to use IdP Introduction Cookie')
|
|
'</p>'
|
|
|
|
'<p>'
|
|
'Sample <a href="/data/affiliations.xml">affiliations</a> file to download'
|
|
'</p>'
|
|
|
|
def loggedin_page [html] (self):
|
|
identity_dump = get_request().user.lasso_dump
|
|
session_dump = get_session().lasso_session_dump
|
|
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add_submit('logout', _('Local Logout'))
|
|
if session_dump:
|
|
form.add_submit('slo-soap', _('Single Logout (SOAP)'))
|
|
form.add_submit('slo-redirect', _('Single Logout (Redirect)'))
|
|
if identity_dump:
|
|
form.add_submit('fedterm-soap', _('Federation Termination (SOAP)'))
|
|
form.add_submit('fedterm-redirect', _('Federation Termination (Redirect)'))
|
|
|
|
if form.is_submitted():
|
|
if form.get_submit() == 'logout':
|
|
get_session_manager().expire_session()
|
|
return redirect('/')
|
|
if form.get_submit() == 'slo-soap':
|
|
return self.saml.slo_sp(lasso.HTTP_METHOD_SOAP)
|
|
if form.get_submit() == 'slo-redirect':
|
|
return self.saml.slo_sp(lasso.HTTP_METHOD_REDIRECT)
|
|
|
|
if form.get_submit() == 'fedterm-soap':
|
|
return self.saml.fedterm_sp(lasso.HTTP_METHOD_SOAP)
|
|
if form.get_submit() == 'fedterm-redirect':
|
|
return self.saml.fedterm_sp(lasso.HTTP_METHOD_REDIRECT)
|
|
|
|
return template.error_page(_('Unknown command'))
|
|
|
|
'<p>%s</p>' % _('Logged in (%s)') % get_request().user.display_name
|
|
if get_request().user.anonymous:
|
|
if identity_dump:
|
|
'<strong><a href="register">%s</a></strong>' % _('Register')
|
|
|
|
'<pre>'
|
|
get_session().lasso_identity_provider_id
|
|
'</pre>'
|
|
|
|
'<pre>'
|
|
get_session().name_identifier
|
|
'</pre>'
|
|
'<h1>Session dump</h1>'
|
|
'<pre class="prettyprint lang-xml">'
|
|
xml_format(get_session().lasso_session_dump)
|
|
'</pre>'
|
|
'<h1>Identity dump</h1>'
|
|
'<pre class="prettyprint lang-xml">'
|
|
xml_format(get_request().user.lasso_dump)
|
|
'</pre>'
|
|
'<div id="logged-in-options">'
|
|
form.render()
|
|
'</div>'
|
|
|
|
if os.path.exists(str('/tmp/artifact-msg-url')):
|
|
'<p>'
|
|
'<a href="replay_artifact">%s</a>' % _('Replay last artifact response')
|
|
'</p>'
|
|
|
|
def replay_artifact [html] (self):
|
|
msg_url = file(str('/tmp/artifact-msg-url')).read()
|
|
msg_body = file(str('/tmp/artifact-msg-body')).read()
|
|
from qommon.liberty import soap_call
|
|
soap_answer = soap_call(msg_url, msg_body)
|
|
open(str('/tmp/replayed-artifact.xml'), str('w')).write(soap_answer)
|
|
formatted = os.popen(str('xmllint --format /tmp/replayed-artifact.xml')).read()
|
|
|
|
template.html_top(_('Artifact Replayed'))
|
|
'<pre>'
|
|
formatted
|
|
'</pre>'
|
|
|
|
|
|
def register [html] (self):
|
|
if not get_request().user:
|
|
raise errors.AccessUnauthorizedError()
|
|
|
|
if not get_request().user.anonymous:
|
|
raise errors.AccessForbiddenError()
|
|
|
|
if not get_session().lasso_anonymous_identity_dump:
|
|
raise errors.AccessForbiddenError()
|
|
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(StringWidget, 'name', title = _('Name'), required = True, size=30)
|
|
form.add(EmailWidget, 'email', title = _('Email'), required = False, size=30)
|
|
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.get_submit() == 'cancel':
|
|
return redirect('/')
|
|
|
|
if form.get_submit() and not form.has_errors():
|
|
get_request().user.id = get_request().user.get_new_id()
|
|
get_request().user.name = form.get_widget('name').parse()
|
|
get_request().user.email = form.get_widget('email').parse()
|
|
get_request().user.anonymous = False
|
|
if get_publisher().user_class.count() == 0:
|
|
get_request().user.is_admin = True
|
|
get_request().user.lasso_dump = get_session().lasso_anonymous_identity_dump
|
|
get_session().lasso_anonymous_identity_dump = None
|
|
get_request().user.name_identifiers = [get_session().name_identifier]
|
|
get_request().user.store()
|
|
get_session().set_user(get_request().user.id)
|
|
return redirect('/')
|
|
|
|
template.html_top(_('Register'))
|
|
get_response().breadcrumb.append(('register', _('Register')))
|
|
form.render()
|
|
|
|
def do_login(self, form):
|
|
server = misc.get_lasso_server(protocol = 'saml2')
|
|
login = lasso.Login(server)
|
|
|
|
idp = form.get_submit()
|
|
if idp == 'intro':
|
|
common_domain_getter_url = get_cfg('sp', {}).get('common_domain_getter_url')
|
|
token = Token(expiration_delay = 600) # ten minutes
|
|
token.session_id = get_session().id
|
|
token.protocol = 'saml2'
|
|
token.next_url = get_request().get_url()
|
|
token.store()
|
|
return redirect(common_domain_getter_url + '?tok=%s' % token.id)
|
|
|
|
if idp and idp.startswith('intro-'):
|
|
idp = str(idp)[6:]
|
|
|
|
if idp:
|
|
p = misc.get_provider(idp)
|
|
idp = p.providerId
|
|
|
|
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT) # XXX: method must be an option
|
|
|
|
nid_format = form.get_widget('nid_format').parse()
|
|
if nid_format == 'persistent':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
|
elif nid_format == 'transient':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
|
|
elif nid_format == 'encrypted':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_ENCRYPTED
|
|
elif nid_format == 'email':
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL
|
|
elif nid_format == 'none':
|
|
login.request.nameIDPolicy.format = None
|
|
|
|
binding = form.get_widget('binding').parse()
|
|
if binding == 'artifact':
|
|
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_ARTIFACT
|
|
elif binding == 'post':
|
|
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
|
|
|
login.request.nameIDPolicy.allowCreate = form.get_widget('allow_create').parse()
|
|
# work around forced initialization in lasso
|
|
login.request.nameIDPolicy.spNameQualifier = None
|
|
login.request.forceAuthn = form.get_widget('force_authn').parse()
|
|
login.request.isPassive = form.get_widget('is_passive').parse()
|
|
|
|
consent = form.get_widget('consent').parse()
|
|
if consent:
|
|
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:%s' % consent
|
|
|
|
affiliation = form.get_widget('affiliation').parse()
|
|
if affiliation:
|
|
login.request.nameIDPolicy.spNameQualifier = affiliation
|
|
|
|
# XXX: authn_context
|
|
|
|
login.buildAuthnRequestMsg()
|
|
return redirect(login.msgUrl)
|
|
|
|
|
|
def logout(self):
|
|
get_logger().info('logout')
|
|
session = get_session()
|
|
if not session:
|
|
return redirect('/')
|
|
ident_methods = get_cfg('identification', {}).get('methods', [])
|
|
if not 'idp' in ident_methods:
|
|
get_session_manager().expire_session()
|
|
return redirect('/')
|
|
|
|
provider = misc.get_provider(
|
|
misc.get_provider_key(get_session().lasso_identity_provider_id))
|
|
|
|
if provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
|
|
return self.saml.slo_sp()
|
|
else:
|
|
return self.liberty.singleLogout()
|
|
|
|
def _q_traverse(self, path):
|
|
fn = os.path.join(get_publisher().app_dir, 'common_cookie')
|
|
if os.path.exists(fn):
|
|
# on special domain to set cookie, nothing else, let's change root
|
|
get_publisher().app_dir = open(fn).read()
|
|
get_request().user = None
|
|
return CookieGetterDirectory()._q_traverse(path)
|
|
|
|
session = get_session()
|
|
if session:
|
|
get_request().user = session.get_user()
|
|
else:
|
|
get_request().user = None
|
|
|
|
response = get_response()
|
|
response.filter = {}
|
|
if not hasattr(response, 'breadcrumb'):
|
|
response.breadcrumb = [ ('', _('Home')) ]
|
|
|
|
return Directory._q_traverse(self, path)
|
|
|
|
def encryption [html] (self):
|
|
form = Form(enctype='multipart/form-data')
|
|
options = []
|
|
for klp, lp in get_cfg('idp', {}).items():
|
|
try:
|
|
label = misc.get_provider_label(misc.get_provider(klp))
|
|
except KeyError:
|
|
continue
|
|
options.append((klp, label, klp))
|
|
options.sort()
|
|
|
|
for klp, label, klp2 in options:
|
|
form.add(HtmlWidget, '<h3>%s</h3>' % label)
|
|
form.add(CheckboxWidget, 'encrypt_nameid_%s' % klp,
|
|
title = _('Encrypt NameID'),
|
|
value = get_cfg('idp')[klp].get('encrypt_nameid'))
|
|
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.is_submitted():
|
|
self.encryption_submit(form, options)
|
|
return redirect('.')
|
|
|
|
template.html_top()
|
|
|
|
if not get_cfg('sp').has_key('encryption_privatekey'):
|
|
'<div class="errornotice">'
|
|
_('There is currently no encryption key set on this server.')
|
|
'</div>'
|
|
|
|
form.render()
|
|
|
|
def encryption_submit(self, form, options):
|
|
for klp, label, klp2 in options:
|
|
get_cfg('idp')[klp]['encrypt_nameid'] = form.get_widget(
|
|
'encrypt_nameid_%s' % klp).parse()
|
|
get_publisher().write_cfg()
|
|
|
|
def _q_lookup(self, component):
|
|
if component in ('css','js'):
|
|
dirname = os.path.join(get_publisher().data_dir, 'web', component)
|
|
return StaticDirectory(dirname, follow_symlinks = True)
|
|
if component == 'qo':
|
|
dirname = os.path.join(get_publisher().data_dir, 'qommon')
|
|
if not os.path.exists(dirname):
|
|
dirname = os.path.join(os.path.dirname(__file__), 'qommon/static')
|
|
return StaticDirectory(dirname, follow_symlinks = True)
|
|
raise errors.TraversalError()
|
|
|
|
admin = admin.RootDirectory()
|
|
backoffice = backoffice.RootDirectory()
|
|
saml = saml2.Saml2Directory()
|
|
liberty = liberty.LibertyDirectory()
|
|
login = LoginDirectory()
|
|
ident = IdentDirectory()
|
|
|