This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
lcs/lcs/root.ptl

283 lines
11 KiB
Plaintext

import os
import lasso
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.util import StaticDirectory
import admin
import backoffice
import liberty
from qommon import saml2
from qommon import errors
from qommon import logger
from qommon import get_cfg
from qommon import template
from qommon.form import *
import qommon.ident
from users import User
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
return qommon.ident.get_method_directory(component)
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index [html] (self):
logger.info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
if len(ident_methods) == 1:
method = ident_methods[0]
return qommon.ident.login(method)
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = '<br/>')
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if qommon.ident.base.ident_classes[method]().is_interactive():
return redirect('../ident/%s/login' % method)
else:
return qommon.ident.login(method)
else:
template.html_top(_('Login'))
'<p>%s</p>' % _('Select the identification method you want to use :')
form.render()
class RootDirectory(Directory):
_q_exports = ['', 'admin', 'backoffice', 'login', 'logout', 'liberty', 'saml',
'ident', 'register']
def _q_index [html] (self):
template.html_top('Lasso Conformance SP')
if get_request().user:
self.loggedin_page()
else:
self.unlogged_page()
def unlogged_page [html] (self):
form = Form(enctype='multipart/form-data', id = 'sso')
form.add(HtmlWidget, '<div id="sso-options">')
form.add(SingleSelectWidget, 'binding',
title = _('Protocol Binding for <Response>'),
options = [(None, ''), ('post', 'POST'), ('artifact', 'Artifact')])
form.add(CheckboxWidget, 'force_authn', value = False,
title = _('Force Authentication (ForceAuthn)'))
form.add(CheckboxWidget, 'is_passive',
title = _('No interaction (IsPassive)'))
form.add(CheckboxWidget, 'allow_create', value = True,
title = _('Allow new federation'))
form.add(SingleSelectWidget, 'nid_format',
title = _('Name Identifier Format'),
options = [('persistent', _('Persistent')),
('transient', _('Transient')),
('none', _('(none'))])
# XXX: affiliation
form.add(SingleSelectWidget, 'consent',
title = _('Consent'),
options = ['', 'obtained', 'prior', 'current-implicit',
'current-explicit', 'unavailable', 'inapplicable'])
form.add(SingleSelectWidget, 'authn_context',
title = _('Authn Req Context'),
options = ['(empty)', 'password', 'password on protected transport',
'Client Certificate'],
disabled = 'disabled')
form.add(SingleSelectWidget, 'matching',
title = _('Matching Rule'),
options = ['(empty)', 'exact', 'minimum', 'maximum', 'better'],
disabled = 'disabled')
form.add(HtmlWidget, '</div>')
for kidp, idp in get_cfg('idp', {}).items():
form.add_submit(kidp, _('Log on %s') % kidp)
if form.is_submitted():
return self.do_login(form)
form.render()
def loggedin_page [html] (self):
identity_dump = get_request().user.lasso_dump
session_dump = get_session().lasso_session_dump
form = Form(enctype='multipart/form-data')
form.add_submit('logout', _('Local Logout'))
if session_dump:
form.add_submit('slo-soap', _('Single Logout (SOAP)'))
form.add_submit('slo-redirect', _('Single Logout (Redirect)'))
if identity_dump:
form.add_submit('fedterm-soap', _('Federation Termination (SOAP)'))
form.add_submit('fedterm-redirect', _('Federation Termination (Redirect)'))
if form.is_submitted():
if form.get_submit() == 'logout':
get_session_manager().expire_session()
return redirect('/')
if form.get_submit() == 'slo-soap':
return self.saml.slo_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'slo-redirect':
return self.saml.slo_sp(lasso.HTTP_METHOD_REDIRECT)
if form.get_submit() == 'fedterm-soap':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_SOAP)
if form.get_submit() == 'fedterm-redirect':
return self.saml.fedterm_sp(lasso.HTTP_METHOD_REDIRECT)
return template.error_page(_('Unknown command'))
'<p>%s</p>' % _('Logged in (%s)') % get_request().user.display_name
if get_request().user.anonymous:
'<a href="register">%s</a>' % _('Register')
'<pre>'
get_session().lasso_identity_provider_id
'</pre>'
'<div id="logged-in-options">'
form.render()
'</div>'
def register [html] (self):
if not get_request().user:
raise errors.AccessUnauthorizedError()
if not get_request().user.anonymous:
raise errors.AccessForbiddenError()
if not get_session().lasso_anonymous_identity_dump:
raise errors.AccessForbiddenError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title = _('Name'), required = True, size=30)
form.add(EmailWidget, 'email', title = _('Email'), required = False, size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('/')
if form.get_submit() and not form.has_errors():
get_request().user.id = get_request().user.get_new_id()
get_request().user.name = form.get_widget('name').parse()
get_request().user.email = form.get_widget('email').parse()
get_request().user.anonymous = False
if get_publisher().user_class.count() == 0:
get_request().user.is_admin = True
get_request().user.lasso_dump = get_session().lasso_anonymous_identity_dump
get_session().lasso_anonymous_identity_dump = None
get_request().user.name_identifiers = [get_session().name_identifier]
get_request().user.store()
get_session().set_user(get_request().user.id)
return redirect('/')
template.html_top(_('Register'))
get_response().breadcrumb.append(('register', _('Register')))
form.render()
def do_login(self, form):
server = misc.get_lasso_server(protocol = 'saml2')
login = lasso.Login(server)
idp = form.get_submit()
if idp:
p = misc.get_provider(idp)
idp = p.providerId
login.initAuthnRequest(idp, lasso.HTTP_METHOD_REDIRECT) # XXX: method must be an option
nid_format = form.get_widget('nid_format').parse()
if nid_format == 'persistent':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
elif nid_format == 'transient':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
elif nid_format == 'none':
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_NONE
binding = form.get_widget('binding').parse()
if binding == 'artifact':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_ARTIFACT
elif binding == 'post':
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
login.request.nameIDPolicy.allowCreate = form.get_widget('allow_create').parse()
login.request.forceAuthn = form.get_widget('force_authn').parse()
login.request.isPassive = form.get_widget('is_passive').parse()
consent = form.get_widget('consent').parse()
if consent:
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:%s' % consent
# XXX: authn_context
login.buildAuthnRequestMsg()
return redirect(login.msgUrl)
def logout(self):
logger.info('logout')
session = get_session()
if not session:
return redirect('/')
ident_methods = get_cfg('identification', {}).get('methods', [])
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect('/')
provider = misc.get_provider(
misc.get_provider_key(get_session().lasso_identity_provider_id))
if provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
return self.saml.slo_sp()
else:
return self.liberty.singleLogout()
def _q_traverse(self, path):
session = get_session()
if session:
get_request().user = session.get_user()
else:
get_request().user = None
response = get_response()
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
return Directory._q_traverse(self, path)
def _q_lookup(self, component):
if component == 'themes':
dirname = os.path.join(get_publisher().data_dir, 'themes')
return StaticDirectory(dirname, follow_symlinks = True)
raise errors.TraversalError()
admin = admin.RootDirectory()
backoffice = backoffice.RootDirectory()
saml = saml2.Saml2Directory()
liberty = liberty.LibertyDirectory()
login = LoginDirectory()
ident = IdentDirectory()