wcs/wcs/root.ptl

245 lines
8.4 KiB
Plaintext

import os
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.util import StaticDirectory
import forms.root
import liberty
from qommon import saml2
from qommon import errors
from qommon import get_cfg, get_logger
from qommon import template
from qommon import misc
from qommon.form import *
import qommon.ident
from qommon.afterjobs import AfterJobStatusDirectory
from categories import Category
from users import User
from formdef import FormDef
from formdata import FormData
from anonylink import AnonymityLink
class CompatibilityDirectory(Directory):
_q_exports = ['']
def _q_index(self):
return redirect('..')
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
try:
return qommon.ident.get_method_directory(component)
except KeyError:
raise errors.TraversalError()
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index [html] (self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
get_logger().info('no ident method defined')
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.login(method)
except KeyError:
get_logger().error('failed to login with method %s' % method)
return errors.TraversalError()
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = '<br/>')
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if get_publisher().ident_methods.get(method)().is_interactive():
return redirect('../ident/%s/login' % method)
else:
try:
return qommon.ident.login(method)
except KeyError:
get_logger().error('failed to login with method %s' % method)
return errors.TraversalError()
else:
template.html_top(_('Login'))
'<p>%s</p>' % _('Select the identification method you want to use :')
form.render()
class RegisterDirectory(Directory):
_q_exports = ['']
def _q_index [html] (self):
get_logger().info('register')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; liberty.
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.register(method)
except KeyError:
get_logger().error('failed to register with method %s' % method)
return errors.TraversalError()
else:
pass # XXX: register page when there is more than one ident method
def _q_lookup(self, component):
try:
return qommon.ident.get_method_directory(component)
except KeyError:
return errors.TraversalError()
class RootDirectory(Directory):
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'liberty', 'token', 'saml',
'ident', 'register', 'afterjobs', 'themes']
themes = template.ThemesDirectory()
def logout(self):
get_logger().info('logout')
session = get_session()
if not session:
return redirect(get_publisher().get_root_url())
ident_methods = get_cfg('identification', {}).get('methods', [])
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if not get_session().lasso_identity_provider_id:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
# add settings to disable single logout?
# (and to set it as none/get/soap?)
import lasso
if misc.get_current_protocol() == lasso.PROTOCOL_SAML_2_0:
return self.saml.slo_sp()
else:
return self.liberty.singleLogout()
def token [html] (self):
if not get_request().user:
raise errors.AccessUnauthorizedError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'token', title = _('Identification Token'),
required = True, size = 30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
template.html_top(_('Identification Token'))
'<p>' # XXX: include explanation about identification token (?)
_('Please enter your identification token.')
'</p>'
form.render()
else:
session = get_session()
if get_request().user:
lasso_dump = get_request().user.lasso_dump
else:
return template.error_page('No Lasso Identity Dump (???)')
token = form.get_widget('token').parse()
users_with_token = list(User.select(lambda x: x.identification_token == token))
if len(users_with_token) == 0:
return template.error_page(_('Unknown Token'))
user = users_with_token[0]
user.name_identifiers.append(session.name_identifier)
user.lasso_dump = str(lasso_dump)
user.identification_token = None
user.store()
old_name = session.user
session.set_user(user.id)
for anonylink in AnonymityLink.select(
lambda x: x.name_identifier == session.name_identifier):
if anonylink.formdata_type == 'form':
fdef = FormDef.get(anonylink.formdata_def_id)
else:
continue # ?
data = fdef.data_class().get(anonylink.formdata_id)
data.user_id = user.id
data.store()
anonylink.remove_self()
return redirect('.')
def _q_traverse(self, path):
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
if not self.admin:
self.admin = get_publisher().admin_directory_class()
if not self.backoffice:
self.backoffice = get_publisher().backoffice_directory_class()
try:
return Directory._q_traverse(self, path)
except errors.TraversalError:
pass
return forms.root.RootDirectory()._q_traverse(path)
def _q_lookup(self, component):
if component == 'qo':
dirname = os.path.join(get_publisher().data_dir, 'qommon')
return StaticDirectory(dirname, follow_symlinks = True)
# is this a category ?
try:
category = Category.get_by_urlname(component)
except KeyError:
pass
else:
return forms.root.RootDirectory(category)
# or a form ?
return forms.root.RootDirectory()._q_lookup(component)
admin = None
backoffice = None
saml = saml2.Saml2Directory()
liberty = liberty.LibertyDirectory()
forms = CompatibilityDirectory()
login = LoginDirectory()
register = RegisterDirectory()
ident = IdentDirectory()
afterjobs = AfterJobStatusDirectory()