This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
lcs/lcs/root.ptl

482 lines
18 KiB
Plaintext

import os
import base64
import urllib
import lasso
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.util import StaticDirectory
import admin
import backoffice
import liberty
from qommon import saml2
from qommon import errors
from qommon import logger
from qommon import get_cfg
from qommon import template
from qommon.form import *
import qommon.ident
from users import User
from qommon.tokens import Token
import subprocess
xmlindent = "/usr/bin/xmlindent"
def xml_format(str):
if os.path.exists(xmlindent):
p=subprocess.Popen(args=[xmlindent], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
return p.communicate(str)[0]
return str
class CookieGetterDirectory(Directory):
_q_exports = ['', 'spintro']
def _q_index [html] (self):
template.html_top()
_('This domain is not for humans, it is only used to get identity '
'provider discovery cookie.')
def spintro(self):
tok = get_request().form.get('tok')
token = Token.get(tok)
session = get_session_manager().get(token.session_id)
request = get_request()
try:
session.saml_idp_cookie = request.cookies['_saml_idp']
except KeyError:
session.saml_idp_cookie = ''
session.store()
token.remove_self()
return redirect(token.next_url)
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
return qommon.ident.get_method_directory(component)
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index [html] (self):
logger.info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
if len(ident_methods) == 1:
method = ident_methods[0]
return qommon.ident.login(method)
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = '<br/>')
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if qommon.ident.base.ident_classes[method]().is_interactive():
return redirect('../ident/%s/login' % method)
else:
return qommon.ident.login(method)
else:
template.html_top(_('Login'))
'<p>%s</p>' % _('Select the identification method you want to use :')
form.render()
class RootDirectory(Directory):
_q_exports = ['', 'admin', 'backoffice', 'login', 'logout', 'liberty', 'saml',
'ident', 'register', 'info', 'encryption', 'replay_artifact']
def _q_index [html] (self):
get_response().add_css_include('../../css/prettify.css')
get_response().add_javascript([ str('../../js/prettify.js') ])
template.html_top('Lasso Conformance SP', onload = "prettyPrint()")
if get_request().user:
self.loggedin_page()
else:
self.unlogged_page()
def info(self):
request = get_request()
http_accept = request.environ.get('HTTP_ACCEPT')
http_poas = request.environ.get('HTTP_PAOS')
if 'application/vnd.paos+xml' not in http_accept or \
'urn:liberty:paos:2003-08' not in http_poas:
return template.error_page(_('Invalid PAOS Request'))
server = misc.get_lasso_server(protocol = 'saml2')
if not server:
return template.error_page(_('SAML 2.0 support not yet configured.'))
login = lasso.Login(server)
login.initAuthnRequest(None, lasso.HTTP_METHOD_SOAP)
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
login.request.nameIDPolicy.allowCreate = True
login.request.forceAuthn = False
login.request.isPassive = False
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_PAOS
if False:
# NTT ECP requires this:
login.request.requestedAuthnContext = lasso.Samlp2RequestedAuthnContext()
t = lasso.NodeList()
ta = lasso.Saml2AuthnContextClassRef()
ta.content = 'urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract'
t.append(ta)
login.request.requestedAuthnContext.authnContextClassRef = t
login.buildAuthnRequestMsg()
response = get_response()
response.set_content_type('application/vnd.paos+xml', 'UTF-8')
response.set_header('PAOS', 'ver=urn:liberty:paos:2003-08;urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp')
return login.msgBody
def unlogged_page [html] (self):
form = Form(enctype='multipart/form-data', id = 'sso')
form.add(HtmlWidget, '<div id="sso-options">')
form.add(SingleSelectWidget, 'binding',
title = _('Protocol Binding for <Response>'),
options = [(None, ''), ('post', 'POST'), ('artifact', 'Artifact')])
form.add(CheckboxWidget, 'force_authn', value = False,
title = _('Force Authentication (ForceAuthn)'))
form.add(CheckboxWidget, 'is_passive',
title = _('No interaction (IsPassive)'))
form.add(CheckboxWidget, 'allow_create', value = True,
title = _('Allow new federation'))
form.add(SingleSelectWidget, 'nid_format',
title = _('Name Identifier Format'),
options = [('persistent', _('Persistent')),
('transient', _('Transient')),
('encrypted', _('Encrypted')),
('none', _('(none)'))])
form.add(StringWidget, 'affiliation', title = _('Affiliation'))
form.add(SingleSelectWidget, 'consent',
title = _('Consent'),
options = ['', 'obtained', 'prior', 'current-implicit',
'current-explicit', 'unavailable', 'inapplicable'])
form.add(SingleSelectWidget, 'authn_context',
title = _('Authn Req Context'),
options = ['(empty)', 'password', 'password on protected transport',
'Client Certificate'],
disabled = 'disabled')
form.add(SingleSelectWidget, 'matching',
title = _('Matching Rule'),
options = ['(empty)', 'exact', 'minimum', 'maximum', 'better'],
disabled = 'disabled')
form.add(HtmlWidget, '</div>')
for kidp, idp in get_cfg('idp', {}).items():
form.add_submit(kidp, _('Log on %s') % kidp)
if get_session().saml_idp_cookie is None:
common_domain = get_cfg('sp', {}).get('common_domain')
if common_domain:
form.add_submit('intro', _('Get IdP via Introduction Cookie'))
elif get_session().saml_idp_cookie:
common_domain = get_cfg('sp', {}).get('common_domain')
intro_cookie_q = urllib.unquote_plus(get_session().saml_idp_cookie)
splitted_cookie = [x for x in intro_cookie_q.split(str(' ')) if x]
last_id = splitted_cookie[-1]
v = misc.get_provider_key(base64.decodestring(last_id))
form.add_submit('intro-%s' % v,
_('Log on using IdP discovered from IdP Introduction (%s)') % v)
if form.is_submitted():
return self.do_login(form)
form.render()
if not common_domain:
'<p><strong>'
_('Note: ')
'</strong>'
_('Service Provider not configured to use IdP Introduction Cookie')
'</p>'
'<p>'
'Sample <a href="/data/affiliations.xml">affiliations</a> file to download'
'</p>'
def loggedin_page [html] (self):
identity_dump = get_request().user.lasso_dump
session_dump = get_session().lasso_session_dump
form = Form(enctype='multipart/form-data')
form.add_submit('logout', _('Local Logout'))
if session_dump:
form.add_submit('slo-soap', _('Single Logout (SOAP)'))
form.add_submit('slo-redirect', _('Single Logout (Redirect)'))
if identity_dump:
form.add_submit('fedterm-soap', _('Federation Termination (SOAP)'))
form.add_submit('fedterm-redirect', _('Federation Termination (Redirect)'))
if form.is_submitted():
if form.get_submit() == 'logout':
get_session_manager().expire_session()
return redirect('/')
if form.get_submit() == 'slo-soap':
return self.saml.slo_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'slo-redirect':
return self.saml.slo_sp(lasso.HTTP_METHOD_REDIRECT)
if form.get_submit() == 'fedterm-soap':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'fedterm-redirect':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_REDIRECT)
return template.error_page(_('Unknown command'))
'<p>%s</p>' % _('Logged in (%s)') % get_request().user.display_name
if get_request().user.anonymous:
if identity_dump:
'<strong><a href="register">%s</a></strong>' % _('Register')
'<pre>'
get_session().lasso_identity_provider_id
'</pre>'
'<pre>'
get_session().name_identifier
'</pre>'
'<h1>Session dump</h1>'
'<pre class="prettyprint lang-xml">'
xml_format(get_session().lasso_session_dump)
'</pre>'
'<h1>Identity dump</h1>'
'<pre class="prettyprint lang-xml">'
xml_format(get_request().user.lasso_dump)
'</pre>'
'<div id="logged-in-options">'
form.render()
'</div>'
if os.path.exists(str('/tmp/artifact-msg-url')):
'<p>'
'<a href="replay_artifact">%s</a>' % _('Replay last artifact response')
'</p>'
def replay_artifact [html] (self):
msg_url = file(str('/tmp/artifact-msg-url')).read()
msg_body = file(str('/tmp/artifact-msg-body')).read()
from qommon.liberty import soap_call
soap_answer = soap_call(msg_url, msg_body)
open(str('/tmp/replayed-artifact.xml'), str('w')).write(soap_answer)
formatted = os.popen(str('xmllint --format /tmp/replayed-artifact.xml')).read()
template.html_top(_('Artifact Replayed'))
'<pre>'
formatted
'</pre>'
def register [html] (self):
if not get_request().user:
raise errors.AccessUnauthorizedError()
if not get_request().user.anonymous:
raise errors.AccessForbiddenError()
if not get_session().lasso_anonymous_identity_dump:
raise errors.AccessForbiddenError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title = _('Name'), required = True, size=30)
form.add(EmailWidget, 'email', title = _('Email'), required = False, size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('/')
if form.get_submit() and not form.has_errors():
get_request().user.id = get_request().user.get_new_id()
get_request().user.name = form.get_widget('name').parse()
get_request().user.email = form.get_widget('email').parse()
get_request().user.anonymous = False
if get_publisher().user_class.count() == 0:
get_request().user.is_admin = True
get_request().user.lasso_dump = get_session().lasso_anonymous_identity_dump
get_session().lasso_anonymous_identity_dump = None
get_request().user.name_identifiers = [get_session().name_identifier]
get_request().user.store()
get_session().set_user(get_request().user.id)
return redirect('/')
template.html_top(_('Register'))
get_response().breadcrumb.append(('register', _('Register')))
form.render()
def do_login(self, form):
server = misc.get_lasso_server(protocol = 'saml2')
login = lasso.Login(server)
idp = form.get_submit()
if idp == 'intro':
common_domain_getter_url = get_cfg('sp', {}).get('common_domain_getter_url')
token = Token(expiration_delay = 600) # ten minutes
token.session_id = get_session().id
token.protocol = 'saml2'
token.next_url = get_request().get_url()
token.store()
return redirect(common_domain_getter_url + '?tok=%s' % token.id)
if idp and idp.startswith('intro-'):
idp = str(idp)[6:]
if idp:
p = misc.get_provider(idp)
idp = p.providerId
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT) # XXX: method must be an option
nid_format = form.get_widget('nid_format').parse()
if nid_format == 'persistent':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
elif nid_format == 'transient':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
elif nid_format == 'encrypted':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_ENCRYPTED
elif nid_format == 'none':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_NONE
binding = form.get_widget('binding').parse()
if binding == 'artifact':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_ARTIFACT
elif binding == 'post':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
login.request.nameIDPolicy.allowCreate = form.get_widget('allow_create').parse()
login.request.forceAuthn = form.get_widget('force_authn').parse()
login.request.isPassive = form.get_widget('is_passive').parse()
consent = form.get_widget('consent').parse()
if consent:
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:%s' % consent
affiliation = form.get_widget('affiliation').parse()
if affiliation:
login.request.nameIDPolicy.spNameQualifier = affiliation
# XXX: authn_context
login.buildAuthnRequestMsg()
return redirect(login.msgUrl)
def logout(self):
logger.info('logout')
session = get_session()
if not session:
return redirect('/')
ident_methods = get_cfg('identification', {}).get('methods', [])
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect('/')
provider = misc.get_provider(
misc.get_provider_key(get_session().lasso_identity_provider_id))
if provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
return self.saml.slo_sp()
else:
return self.liberty.singleLogout()
def _q_traverse(self, path):
fn = os.path.join(get_publisher().app_dir, 'common_cookie')
if os.path.exists(fn):
# on special domain to set cookie, nothing else, let's change root
get_publisher().app_dir = open(fn).read()
get_request().user = None
return CookieGetterDirectory()._q_traverse(path)
session = get_session()
if session:
get_request().user = session.get_user()
else:
get_request().user = None
response = get_response()
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
return Directory._q_traverse(self, path)
def encryption [html] (self):
form = Form(enctype='multipart/form-data')
options = []
for klp, lp in get_cfg('idp', {}).items():
try:
label = misc.get_provider_label(misc.get_provider(klp))
except KeyError:
continue
options.append((klp, label, klp))
options.sort()
for klp, label, klp2 in options:
form.add(HtmlWidget, '<h3>%s</h3>' % label)
form.add(CheckboxWidget, 'encrypt_nameid_%s' % klp,
title = _('Encrypt NameID'),
value = get_cfg('idp')[klp].get('encrypt_nameid'))
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.is_submitted():
self.encryption_submit(form, options)
return redirect('.')
template.html_top()
if not get_cfg('sp').has_key('encryption_privatekey'):
'<div class="errornotice">'
_('There is currently no encryption key set on this server.')
'</div>'
form.render()
def encryption_submit(self, form, options):
for klp, label, klp2 in options:
get_cfg('idp')[klp]['encrypt_nameid'] = form.get_widget(
'encrypt_nameid_%s' % klp).parse()
get_publisher().write_cfg()
def _q_lookup(self, component):
if component in ('css','js'):
dirname = os.path.join(get_publisher().data_dir, 'web', component)
return StaticDirectory(dirname, follow_symlinks = True)
if component == 'qo':
dirname = os.path.join(get_publisher().data_dir, 'qommon')
if not os.path.exists(dirname):
dirname = os.path.join(os.path.dirname(__file__), 'qommon/static')
return StaticDirectory(dirname, follow_symlinks = True)
raise errors.TraversalError()
admin = admin.RootDirectory()
backoffice = backoffice.RootDirectory()
saml = saml2.Saml2Directory()
liberty = liberty.LibertyDirectory()
login = LoginDirectory()
ident = IdentDirectory()