317 lines
12 KiB
Plaintext
317 lines
12 KiB
Plaintext
import lasso
|
|
|
|
from quixote import get_request, get_response, get_session, get_session_manager, get_publisher, redirect
|
|
import authentic.root
|
|
from authentic.form import *
|
|
from qommon import template
|
|
from qommon.publisher import get_cfg
|
|
import authentic.identities as identities
|
|
import authentic.misc as misc
|
|
|
|
from authentic.liberty import saml2
|
|
|
|
if not set:
|
|
from Sets import Set
|
|
set = Set
|
|
|
|
OldRootDirectory = authentic.root.RootDirectory
|
|
|
|
class AlternateRootDirectory(OldRootDirectory):
|
|
_q_exports = set(OldRootDirectory._q_exports + ['', 'admin', 'liberty', 'login', 'logout', 'change_password', 'register',
|
|
'forgot_password', 'update_info', 'saml', 'singleLogout', 'fedterm', 'sso',
|
|
'encryption', 'destroy_federations', 'destroy_sessions', 'destroy_user_federations'])
|
|
|
|
def _q_index [html] (self):
|
|
session = get_session()
|
|
if not session or not session.user:
|
|
return self.login()
|
|
|
|
identities.get_store().load_identities()
|
|
identity = identities.get_store().get_identity(session.user)
|
|
|
|
form = Form(enctype='multipart/form-data', action = 'sso')
|
|
options = []
|
|
for klp, lp in get_cfg('providers', {}).items():
|
|
if lp['role'] == lasso.PROVIDER_ROLE_IDP:
|
|
continue # only allows initiated login to service providers
|
|
if lp.get('idp_initiated_sso', True) is False:
|
|
continue
|
|
try:
|
|
provider, label = misc.get_provider_and_label(klp)
|
|
except KeyError:
|
|
continue
|
|
if provider.getProtocolConformance() != lasso.PROTOCOL_SAML_2_0:
|
|
continue
|
|
options.append((klp, label, klp))
|
|
options.sort()
|
|
|
|
if options:
|
|
form.add(SingleSelectWidget, 'sp', title = _('Service Provider'),
|
|
options = options)
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [
|
|
('default', '(default)'),
|
|
('artifact', 'Artifact'),
|
|
('post', 'HTTP-POST'), ])
|
|
form.add(SingleSelectWidget, 'nid_format',
|
|
title = _('Name Identifier Format'),
|
|
options = [('persistent', _('Persistent')),
|
|
('transient', _('Transient')),
|
|
('encrypted', _('Encrypted')),
|
|
('none', _('(none)'))])
|
|
form.add_submit('sso', _('Log on'))
|
|
|
|
if form.is_submitted():
|
|
pass
|
|
|
|
try:
|
|
identities.get_store().load_identities()
|
|
except identities.IdentityStoreException:
|
|
return template.error_page(_('Failed to connect to identities storage.'))
|
|
identities.get_store().connect(session)
|
|
try:
|
|
identity = identities.get_store().get_identity(session.user)
|
|
except KeyError:
|
|
# identity no longer available; perhaps identity store changed ?
|
|
return self.logout()
|
|
|
|
template.html_top()
|
|
|
|
if options:
|
|
form.render()
|
|
|
|
'<h3>%s</h3>' % _('Logout')
|
|
|
|
'<p><a href="/logout">%s</a></p>' % _('Local Logout')
|
|
|
|
if get_session().lasso_session_dump:
|
|
form = Form(enctype='multipart/form-data', action = 'singleLogout')
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [('redirect', 'HTTP-Redirect'),
|
|
('get', 'HTTP GET'),
|
|
('soap', 'SOAP')])
|
|
form.add_submit('slo', _('Single Logout'))
|
|
|
|
form.render()
|
|
|
|
if identity.lasso_dump:
|
|
|
|
server = misc.get_lasso_server(protocol = 'saml2')
|
|
lasso_identity = lasso.Identity.newFromDump(identity.lasso_dump)
|
|
options = []
|
|
for k in lasso_identity.providerIds:
|
|
klp = misc.get_provider_key(k)
|
|
try:
|
|
provider, label = misc.get_provider_and_label(klp)
|
|
except KeyError:
|
|
continue
|
|
if provider.getProtocolConformance() != lasso.PROTOCOL_SAML_2_0:
|
|
continue
|
|
options.append((klp, label, klp))
|
|
options.sort()
|
|
|
|
if options:
|
|
form = Form(enctype='multipart/form-data', action = 'fedterm')
|
|
form.add(SingleSelectWidget, 'sp', title = _('Service Provider'),
|
|
options = options)
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [('redirect', 'HTTP-Redirect'),
|
|
('soap', 'SOAP')])
|
|
form.add_submit('fedterm', _('Terminate Federation'))
|
|
|
|
'<h3>%s</h3>' % _('Federation Termination')
|
|
form.render()
|
|
|
|
self.options()
|
|
|
|
'<p>%s</p>' % identity.ecp_id
|
|
|
|
|
|
|
|
def options [html] (self):
|
|
'<h3>%s</h3>' % _('Options')
|
|
|
|
'<ul>'
|
|
'<li><a href="encryption">%s</a></li>' % _('Encryption')
|
|
if get_session() and get_session().user:
|
|
'<li><a href="destroy_user_federations">%s</a></li>' % _('Destroy My Federations')
|
|
'<li><a href="destroy_federations">%s</a></li>' % _('Destroy All Federations')
|
|
'<li><a href="destroy_sessions">%s</a></li>' % _('Destroy All Current Sessions')
|
|
'</ul>'
|
|
|
|
def more_login_text(self):
|
|
return self.options()
|
|
|
|
|
|
def destroy_federations(self):
|
|
identities_list = identities.get_store().values()
|
|
for identity in identities_list:
|
|
identity.lasso_dump = None
|
|
identity.resource_id = None
|
|
identity.entry_id = None
|
|
identity.lasso_proxy_dump = None
|
|
identity.proxied_identity_origin = None
|
|
identity.ecp_id = None
|
|
identities.get_store().save(identity)
|
|
|
|
return redirect('.')
|
|
|
|
def destroy_user_federations(self):
|
|
identities_list = identities.get_store().values()
|
|
current_user_id = get_session().user
|
|
for identity in identities_list:
|
|
if identity.id != current_user_id:
|
|
continue
|
|
identity.lasso_dump = None
|
|
identity.resource_id = None
|
|
identity.entry_id = None
|
|
identity.lasso_proxy_dump = None
|
|
identity.proxied_identity_origin = None
|
|
identity.ecp_id = None
|
|
identities.get_store().save(identity)
|
|
|
|
return redirect('.')
|
|
|
|
def destroy_sessions(self):
|
|
manager = get_session_manager()
|
|
manager.expire_session()
|
|
for session_key in manager.keys():
|
|
try:
|
|
session = manager.get(session_key)
|
|
except AttributeError:
|
|
del manager[session_key]
|
|
continue
|
|
del manager[session_key]
|
|
|
|
return redirect('.')
|
|
|
|
|
|
def encryption [html] (self):
|
|
form = Form(enctype='multipart/form-data')
|
|
|
|
options = []
|
|
for klp, lp in get_cfg('providers', {}).items():
|
|
if lp['role'] == lasso.PROVIDER_ROLE_IDP:
|
|
continue # only allows initiated login to service providers
|
|
try:
|
|
provider, label = misc.get_provider_and_label(klp)
|
|
except KeyError:
|
|
continue
|
|
if provider.getProtocolConformance() != lasso.PROTOCOL_SAML_2_0:
|
|
continue
|
|
options.append((klp, label, klp))
|
|
options.sort()
|
|
|
|
for klp, label, klp2 in options:
|
|
form.add(HtmlWidget, '<h3>%s</h3>' % label)
|
|
form.add(CheckboxWidget, 'encrypt_nameid_%s' % klp,
|
|
title = _('Encrypt NameID'),
|
|
value = get_cfg('providers')[klp].get('encrypt_nameid'))
|
|
form.add(CheckboxWidget, 'encrypt_assertion_%s' % klp,
|
|
title = _('Encrypt Assertion'),
|
|
value = get_cfg('providers')[klp].get('encrypt_assertion'))
|
|
form.add(SingleSelectWidget, 'sym_key_type_%s' % klp, title = 'Symetric key type',
|
|
options = [('aes256', 'AES 256'),
|
|
('aes128', 'AES 128'),
|
|
('3des', 'Triple DES')],
|
|
value = get_cfg('providers')[klp].get('sym_key_type'))
|
|
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Back'))
|
|
|
|
if form.is_submitted():
|
|
self.encryption_submit(form, options)
|
|
return redirect('encryption')
|
|
|
|
template.html_top()
|
|
|
|
if not get_cfg('idp').has_key('encryption_privatekey'):
|
|
'<div class="errornotice">'
|
|
_('There is currently no encryption key set on this server.')
|
|
'</div>'
|
|
|
|
form.render()
|
|
|
|
def encryption_submit(self, form, options):
|
|
for klp, label, klp2 in options:
|
|
get_cfg('providers')[klp]['encrypt_nameid'] = form.get_widget(
|
|
'encrypt_nameid_%s' % klp).parse()
|
|
get_cfg('providers')[klp]['encrypt_assertion'] = form.get_widget(
|
|
'encrypt_assertion_%s' % klp).parse()
|
|
get_cfg('providers')[klp]['sym_key_type'] = str(form.get_widget(
|
|
'sym_key_type_%s' % klp).parse())
|
|
|
|
get_publisher().write_cfg()
|
|
|
|
|
|
def singleLogout [html] (self):
|
|
form = Form(enctype='multipart/form-data', action = 'slo')
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [
|
|
('get', 'HTTP GET'),
|
|
('redirect', 'HTTP-Redirect'),
|
|
('soap', 'SOAP')])
|
|
form.add_submit('slo', _('Single Logout'))
|
|
if form.is_submitted():
|
|
binding = form.get_widget('binding').parse()
|
|
if binding == 'get':
|
|
return self.saml.slo_idp(method = lasso.HTTP_METHOD_GET)
|
|
if binding == 'redirect':
|
|
return self.saml.slo_idp(method = lasso.HTTP_METHOD_REDIRECT)
|
|
if binding == 'soap':
|
|
return self.saml.slo_idp(method = lasso.HTTP_METHOD_SOAP)
|
|
|
|
template.html_top(_('Single Logout'))
|
|
form.render()
|
|
|
|
def fedterm [html] (self):
|
|
form = Form(enctype='multipart/form-data', action = 'fedterm')
|
|
form.add(StringWidget, 'sp')
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [('redirect', 'HTTP-Redirect'), ('soap', 'SOAP')])
|
|
form.add_submit('fedterm', _('Terminate Federation'))
|
|
|
|
if form.is_submitted():
|
|
binding = form.get_widget('binding').parse()
|
|
if binding == 'redirect':
|
|
method = lasso.HTTP_METHOD_REDIRECT
|
|
if binding == 'soap':
|
|
method = lasso.HTTP_METHOD_SOAP
|
|
klp = form.get_widget('sp').parse()
|
|
spui = saml2.SpUI(klp)
|
|
return spui.terminate(method)
|
|
|
|
|
|
def sso(self):
|
|
form = Form(enctype='multipart/form-data', action = 'sso')
|
|
form.add(StringWidget, 'sp')
|
|
form.add(SingleSelectWidget, 'binding', title = 'Binding',
|
|
options = [('default', '(default)'),
|
|
('artifact', 'Artifact'),
|
|
('post', 'HTTP-POST'), ])
|
|
form.add(SingleSelectWidget, 'nid_format',
|
|
title = _('Name Identifier Format'),
|
|
options = [('persistent', _('Persistent')),
|
|
('transient', _('Transient')),
|
|
('encrypted', _('Encrypted')),
|
|
('none', _('(none)'))])
|
|
|
|
if form.is_submitted():
|
|
klp = form.get_widget('sp').parse()
|
|
spui = saml2.SpUI(klp)
|
|
binding = form.get_widget('binding').parse()
|
|
if binding == 'post':
|
|
method = lasso.SAML2_METADATA_BINDING_POST
|
|
elif binding == 'artifact':
|
|
method = lasso.SAML2_METADATA_BINDING_ARTIFACT
|
|
else:
|
|
method = None
|
|
|
|
nid_format = form.get_widget('nid_format').parse()
|
|
return spui.login(method = method, nid_format = nid_format)
|
|
|
|
return redirect('/')
|
|
|
|
from qommon.publisher import get_publisher_class
|
|
get_publisher_class().root_directory_class = AlternateRootDirectory
|