wcs/wcs/root.py

321 lines
12 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import os
import re
try:
from hobo.scrutiny.wsgi.middleware import VersionMiddleware
except ImportError:
# also try older location
try:
from entrouvert.wsgi.middleware import VersionMiddleware
except ImportError:
VersionMiddleware = None
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.html import htmltext, TemplateIO
from quixote.util import StaticDirectory
import forms.root
from qommon import saml2
from qommon import errors
from qommon import get_cfg, get_logger
from qommon import template
from qommon.form import *
import qommon.ident
import qommon.pages
from qommon.afterjobs import AfterJobStatusDirectory
from categories import Category
from data_sources import NamedDataSource
from wcs.api import ApiDirectory
from myspace import MyspaceDirectory
from forms.preview import PreviewDirectory
class CompatibilityDirectory(Directory):
_q_exports = ['']
def _q_index(self):
return redirect('..')
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
try:
return qommon.ident.get_method_directory(component)
except KeyError:
raise errors.TraversalError()
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if get_request().form.get('ReturnUrl'):
get_session().after_url = get_request().form.get('ReturnUrl')
if len(ident_methods) == 0:
get_logger().info('no ident method defined')
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
# if isPassive is given in query parameters, we restrict ourselves
# to saml login.
ident_methods = ['idp']
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError(msg)
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = htmltext('<br/>'))
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if get_publisher().ident_methods.get(method)().is_interactive():
return redirect('../ident/%s/login' % method)
else:
try:
return qommon.ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError()
else:
template.html_top(_('Login'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the identification method you want to use:')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
try:
dir = qommon.ident.get_method_directory(component)
# set the register page as the index page, so the url can be
# /login/password/ instead of /login/password/login
dir._q_exports.append('')
dir._q_index = dir.login
return dir
except KeyError:
return errors.TraversalError()
class RegisterDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('register')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.register(method)
except KeyError:
get_logger().error('failed to register with method %s' % method)
return errors.TraversalError()
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = htmltext('<br/>'))
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
return redirect('../ident/%s/register' % method)
else:
template.html_top(_('Register'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the registration method you want to use:')
r += htmltext(form.render())
return r.getvalue()
def _q_lookup(self, component):
try:
dir = qommon.ident.get_method_directory(component)
# set the register page as the index page, so the url can be
# /register/password/ instead of /register/password/register
dir._q_exports.append('')
dir._q_index = dir.register
return dir
except KeyError:
return errors.TraversalError()
class RootDirectory(Directory):
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'saml',
'ident', 'register', 'afterjobs', 'themes', 'myspace', 'user', 'roles',
'pages', ('tmp-upload', 'tmp_upload'), 'api', '__version__',
'tryauth', 'auth', 'preview']
api = ApiDirectory()
themes = template.ThemesDirectory()
myspace = MyspaceDirectory()
pages = qommon.pages.PagesDirectory()
def tryauth(self):
return forms.root.tryauth(get_publisher().get_root_url())
def auth(self):
return forms.root.auth(get_publisher().get_root_url())
def __version__(self):
if VersionMiddleware is None:
raise errors.TraversalError()
get_response().set_content_type('application/json')
return json.dumps(VersionMiddleware(None).get_packages_version())
def logout(self):
get_logger().info('logout')
session = get_session()
if not session:
return redirect(get_publisher().get_root_url())
ident_methods = get_cfg('identification', {}).get('methods', [])
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if not get_session().lasso_identity_provider_id:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
# add settings to disable single logout?
# (and to set it as none/get/soap?)
return self.saml.slo_sp()
def user(self):
# endpoint for backward compatibility, new code should call /api/user/
if get_request().is_json():
return self.api.user._q_index()
return redirect('myspace/')
def roles(self):
# endpoint for backward compatibility, new code should call /api/roles
if not get_request().is_json():
return redirect('/')
return self.api.roles()
def tmp_upload(self):
results = []
for k, v in get_request().form.items():
if hasattr(v, 'fp'):
token = get_session().add_tempfile(v)
tempfile = get_session().tempfiles[token]
results.append({'name': tempfile.get('base_filename'),
'type': tempfile.get('content_type'),
'size': tempfile.get('size'),
'token': token})
get_response().set_content_type('application/json')
useragent = get_request().get_header('User-agent')
if re.findall(r'MSIE \d\.', useragent):
# hack around MSIE version < 10 as they do not have support for
# XmlHttpRequest 2 (hence the forced usage of an iframe to send
# a file in the background (cf jquery.iframe-transport.js); and
# they would propose the returned json content for download if
# it was served with the appropriate content type :/
get_response().set_content_type('text/plain')
return json.dumps(results)
def _q_traverse(self, path):
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
get_publisher().substitutions.feed(get_session())
get_publisher().substitutions.feed(get_request().user)
get_publisher().substitutions.feed(NamedDataSource)
if not self.admin:
self.admin = get_publisher().admin_directory_class()
if not self.backoffice:
self.backoffice = get_publisher().backoffice_directory_class()
try:
return Directory._q_traverse(self, path)
except errors.TraversalError:
pass
return forms.root.RootDirectory()._q_traverse(path)
def _q_lookup(self, component):
if component in ('css','images'):
return StaticDirectory(os.path.join(get_publisher().data_dir, 'web', component), follow_symlinks = True)
if component == 'qo':
dirname = os.path.join(get_publisher().data_dir, 'qommon')
return StaticDirectory(dirname, follow_symlinks = True)
# maps /leaflet/ to the directory provided by the libjs-openlayers package
if component == 'leaflet':
return StaticDirectory('/usr/share/javascript/leaflet')
# is this a category ?
try:
category = Category.get_by_urlname(component)
except KeyError:
pass
else:
return forms.root.RootDirectory(category)
# or a form ?
return forms.root.RootDirectory()._q_lookup(component)
admin = None
backoffice = None
saml = saml2.Saml2Directory()
forms = CompatibilityDirectory()
login = LoginDirectory()
register = RegisterDirectory()
ident = IdentDirectory()
afterjobs = AfterJobStatusDirectory()
preview = PreviewDirectory()