This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
lcs/lcs/root.ptl

494 lines
19 KiB
Plaintext

import os
import base64
import urllib
import lasso
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.util import StaticDirectory
import admin
import backoffice
import liberty
from qommon import saml2
from qommon import errors
from qommon import get_logger
from qommon import get_cfg
from qommon import template
from qommon.form import *
import qommon.ident
from users import User
from qommon.tokens import Token
import subprocess
import logging
logging.basicConfig()
xmlindent = "/usr/bin/xmlindent"
def xml_format(str):
if os.path.exists(xmlindent):
p=subprocess.Popen(args=[xmlindent], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
return p.communicate(str)[0]
return str
class CookieGetterDirectory(Directory):
_q_exports = ['', 'spintro']
def _q_index [html] (self):
template.html_top()
_('This domain is not for humans, it is only used to get identity '
'provider discovery cookie.')
def spintro(self):
tok = get_request().form.get('tok')
token = Token.get(tok)
session = get_session_manager().get(token.session_id)
request = get_request()
try:
session.saml_idp_cookie = request.cookies['_saml_idp']
except KeyError:
session.saml_idp_cookie = ''
session.store()
token.remove_self()
return redirect(token.next_url)
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
return qommon.ident.get_method_directory(component)
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index [html] (self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
if len(ident_methods) == 1:
method = ident_methods[0]
return qommon.ident.login(method)
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = '<br/>')
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if qommon.ident.base.ident_classes[method]().is_interactive():
return redirect('../ident/%s/login' % method)
else:
return qommon.ident.login(method)
else:
template.html_top(_('Login'))
'<p>%s</p>' % _('Select the identification method you want to use :')
form.render()
class RootDirectory(Directory):
_q_exports = ['', 'admin', 'backoffice', 'login', 'logout', 'liberty', 'saml',
'ident', 'register', 'info', 'encryption', 'replay_artifact']
def _q_index [html] (self):
get_response().add_css_include('../../css/prettify.css')
get_response().add_javascript([ str('../../js/prettify.js') ])
template.html_top('Lasso Conformance SP', onload = "prettyPrint()")
get_session().display_message()
if get_request().user:
self.loggedin_page()
else:
self.unlogged_page()
def info(self):
request = get_request()
http_accept = request.environ.get('HTTP_ACCEPT')
http_poas = request.environ.get('HTTP_PAOS')
if 'application/vnd.paos+xml' not in http_accept or \
'urn:liberty:paos:2003-08' not in http_poas:
return template.error_page(_('Invalid PAOS Request'))
server = misc.get_lasso_server(protocol = 'saml2')
if not server:
return template.error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
login.initAuthnRequest(None, lasso.HTTP_METHOD_SOAP)
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
login.request.nameIDPolicy.allowCreate = True
# work around forced initialization in lasso
login.request.nameIDPolicy.spNameQualifier = None
login.request.forceAuthn = False
login.request.isPassive = False
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_PAOS
if False:
# NTT ECP requires this:
login.request.requestedAuthnContext = lasso.Samlp2RequestedAuthnContext()
t = lasso.NodeList()
ta = lasso.Saml2AuthnContextClassRef()
ta.content = 'urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract'
t.append(ta)
login.request.requestedAuthnContext.authnContextClassRef = t
login.buildAuthnRequestMsg()
response = get_response()
response.set_content_type('application/vnd.paos+xml', 'UTF-8')
response.set_header('PAOS', 'ver=urn:liberty:paos:2003-08;urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp')
return login.msgBody
def unlogged_page [html] (self):
form = Form(enctype='multipart/form-data', id = 'sso')
form.add(HtmlWidget, '<div id="sso-options">')
form.add(SingleSelectWidget, 'binding',
title = _('Protocol Binding for <Response>'),
options = [(None, '(none)'), ('post', 'POST'), ('artifact', 'Artifact')])
form.add(CheckboxWidget, 'force_authn', value = False,
title = _('Force Authentication (ForceAuthn)'))
form.add(CheckboxWidget, 'is_passive',
title = _('No interaction (IsPassive)'))
form.add(CheckboxWidget, 'allow_create', value = True,
title = _('Allow new federation'))
form.add(SingleSelectWidget, 'nid_format',
title = _('Name Identifier Format'),
options = [('none', _('(none)')),
('persistent', _('Persistent')),
('transient', _('Transient')),
('encrypted', _('Encrypted')),
('email', _('Email')), ])
form.add(StringWidget, 'affiliation', title = _('Affiliation'))
form.add(SingleSelectWidget, 'consent',
title = _('Consent'),
options = ['', 'obtained', 'prior', 'current-implicit',
'current-explicit', 'unavailable', 'inapplicable'])
form.add(SingleSelectWidget, 'authn_context',
title = _('Authn Req Context'),
options = ['(empty)', 'password', 'password on protected transport',
'Client Certificate'],
disabled = 'disabled')
form.add(SingleSelectWidget, 'matching',
title = _('Matching Rule'),
options = ['(empty)', 'exact', 'minimum', 'maximum', 'better'],
disabled = 'disabled')
form.add(HtmlWidget, '</div>')
for kidp, idp in get_cfg('idp', {}).items():
form.add_submit(kidp, _('Log on %s') % kidp)
if get_session().saml_idp_cookie is None:
common_domain = get_cfg('sp', {}).get('common_domain')
if common_domain:
form.add_submit('intro', _('Get IdP via Introduction Cookie'))
elif get_session().saml_idp_cookie:
common_domain = get_cfg('sp', {}).get('common_domain')
intro_cookie_q = urllib.unquote_plus(get_session().saml_idp_cookie)
splitted_cookie = [x for x in intro_cookie_q.split(str(' ')) if x]
last_id = splitted_cookie[-1]
v = misc.get_provider_key(base64.decodestring(last_id))
form.add_submit('intro-%s' % v,
_('Log on using IdP discovered from IdP Introduction (%s)') % v)
if form.is_submitted():
return self.do_login(form)
form.render()
if not common_domain:
'<p><strong>'
_('Note: ')
'</strong>'
_('Service Provider not configured to use IdP Introduction Cookie')
'</p>'
'<p>'
'Sample <a href="/data/affiliations.xml">affiliations</a> file to download'
'</p>'
def loggedin_page [html] (self):
identity_dump = get_request().user.lasso_dump
session_dump = get_session().lasso_session_dump
form = Form(enctype='multipart/form-data')
form.add_submit('logout', _('Local Logout'))
if session_dump:
form.add_submit('slo-soap', _('Single Logout (SOAP)'))
form.add_submit('slo-redirect', _('Single Logout (Redirect)'))
if identity_dump:
form.add_submit('fedterm-soap', _('Federation Termination (SOAP)'))
form.add_submit('fedterm-redirect', _('Federation Termination (Redirect)'))
if form.is_submitted():
if form.get_submit() == 'logout':
get_session_manager().expire_session()
return redirect('/')
if form.get_submit() == 'slo-soap':
return self.saml.slo_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'slo-redirect':
return self.saml.slo_sp(lasso.HTTP_METHOD_REDIRECT)
if form.get_submit() == 'fedterm-soap':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'fedterm-redirect':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_REDIRECT)
return template.error_page(_('Unknown command'))
'<p>%s</p>' % _('Logged in (%s)') % get_request().user.display_name
if get_request().user.anonymous:
if identity_dump:
'<strong><a href="register">%s</a></strong>' % _('Register')
'<pre>'
get_session().lasso_identity_provider_id
'</pre>'
'<pre>'
get_session().name_identifier
'</pre>'
'<h1>Session dump</h1>'
'<pre class="prettyprint lang-xml">'
xml_format(get_session().lasso_session_dump)
'</pre>'
'<h1>Identity dump</h1>'
'<pre class="prettyprint lang-xml">'
xml_format(get_request().user.lasso_dump)
'</pre>'
'<div id="logged-in-options">'
form.render()
'</div>'
if os.path.exists(str('/tmp/artifact-msg-url')):
'<p>'
'<a href="replay_artifact">%s</a>' % _('Replay last artifact response')
'</p>'
def replay_artifact [html] (self):
msg_url = file(str('/tmp/artifact-msg-url')).read()
msg_body = file(str('/tmp/artifact-msg-body')).read()
from qommon.liberty import soap_call
soap_answer = soap_call(msg_url, msg_body)
open(str('/tmp/replayed-artifact.xml'), str('w')).write(soap_answer)
formatted = os.popen(str('xmllint --format /tmp/replayed-artifact.xml')).read()
template.html_top(_('Artifact Replayed'))
'<pre>'
formatted
'</pre>'
def register [html] (self):
if not get_request().user:
raise errors.AccessUnauthorizedError()
if not get_request().user.anonymous:
raise errors.AccessForbiddenError()
if not get_session().lasso_anonymous_identity_dump:
raise errors.AccessForbiddenError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title = _('Name'), required = True, size=30)
form.add(EmailWidget, 'email', title = _('Email'), required = False, size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('/')
if form.get_submit() and not form.has_errors():
get_request().user.id = get_request().user.get_new_id()
get_request().user.name = form.get_widget('name').parse()
get_request().user.email = form.get_widget('email').parse()
get_request().user.anonymous = False
if get_publisher().user_class.count() == 0:
get_request().user.is_admin = True
get_request().user.lasso_dump = get_session().lasso_anonymous_identity_dump
get_session().lasso_anonymous_identity_dump = None
get_request().user.name_identifiers = [get_session().name_identifier]
get_request().user.store()
get_session().set_user(get_request().user.id)
return redirect('/')
template.html_top(_('Register'))
get_response().breadcrumb.append(('register', _('Register')))
form.render()
def do_login(self, form):
server = misc.get_lasso_server(protocol = 'saml2')
login = lasso.Login(server)
idp = form.get_submit()
if idp == 'intro':
common_domain_getter_url = get_cfg('sp', {}).get('common_domain_getter_url')
token = Token(expiration_delay = 600) # ten minutes
token.session_id = get_session().id
token.protocol = 'saml2'
token.next_url = get_request().get_url()
token.store()
return redirect(common_domain_getter_url + '?tok=%s' % token.id)
if idp and idp.startswith('intro-'):
idp = str(idp)[6:]
if idp:
p = misc.get_provider(idp)
idp = p.providerId
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT) # XXX: method must be an option
nid_format = form.get_widget('nid_format').parse()
if nid_format == 'persistent':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
elif nid_format == 'transient':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
elif nid_format == 'encrypted':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_ENCRYPTED
elif nid_format == 'email':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL
elif nid_format == 'none':
login.request.nameIDPolicy.format = None
binding = form.get_widget('binding').parse()
if binding == 'artifact':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_ARTIFACT
elif binding == 'post':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
login.request.nameIDPolicy.allowCreate = form.get_widget('allow_create').parse()
# work around forced initialization in lasso
login.request.nameIDPolicy.spNameQualifier = None
login.request.forceAuthn = form.get_widget('force_authn').parse()
login.request.isPassive = form.get_widget('is_passive').parse()
consent = form.get_widget('consent').parse()
if consent:
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:%s' % consent
affiliation = form.get_widget('affiliation').parse()
if affiliation:
login.request.nameIDPolicy.spNameQualifier = affiliation
# XXX: authn_context
login.buildAuthnRequestMsg()
return redirect(login.msgUrl)
def logout(self):
get_logger().info('logout')
session = get_session()
if not session:
return redirect('/')
ident_methods = get_cfg('identification', {}).get('methods', [])
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect('/')
provider = misc.get_provider(
misc.get_provider_key(get_session().lasso_identity_provider_id))
if provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
return self.saml.slo_sp()
else:
return self.liberty.singleLogout()
def _q_traverse(self, path):
fn = os.path.join(get_publisher().app_dir, 'common_cookie')
if os.path.exists(fn):
# on special domain to set cookie, nothing else, let's change root
get_publisher().app_dir = open(fn).read()
get_request().user = None
return CookieGetterDirectory()._q_traverse(path)
session = get_session()
if session:
get_request().user = session.get_user()
else:
get_request().user = None
response = get_response()
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
return Directory._q_traverse(self, path)
def encryption [html] (self):
form = Form(enctype='multipart/form-data')
options = []
for klp, lp in get_cfg('idp', {}).items():
try:
label = misc.get_provider_label(misc.get_provider(klp))
except KeyError:
continue
options.append((klp, label, klp))
options.sort()
for klp, label, klp2 in options:
form.add(HtmlWidget, '<h3>%s</h3>' % label)
form.add(CheckboxWidget, 'encrypt_nameid_%s' % klp,
title = _('Encrypt NameID'),
value = get_cfg('idp')[klp].get('encrypt_nameid'))
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.is_submitted():
self.encryption_submit(form, options)
return redirect('.')
template.html_top()
if not get_cfg('sp').has_key('encryption_privatekey'):
'<div class="errornotice">'
_('There is currently no encryption key set on this server.')
'</div>'
form.render()
def encryption_submit(self, form, options):
for klp, label, klp2 in options:
get_cfg('idp')[klp]['encrypt_nameid'] = form.get_widget(
'encrypt_nameid_%s' % klp).parse()
get_publisher().write_cfg()
def _q_lookup(self, component):
if component in ('css','js'):
dirname = os.path.join(get_publisher().data_dir, 'web', component)
return StaticDirectory(dirname, follow_symlinks = True)
if component == 'qo':
dirname = os.path.join(get_publisher().data_dir, 'qommon')
if not os.path.exists(dirname):
dirname = os.path.join(os.path.dirname(__file__), 'qommon/static')
return StaticDirectory(dirname, follow_symlinks = True)
raise errors.TraversalError()
admin = admin.RootDirectory()
backoffice = backoffice.RootDirectory()
saml = saml2.Saml2Directory()
liberty = liberty.LibertyDirectory()
login = LoginDirectory()
ident = IdentDirectory()