wcs/wcs/root.py

393 lines
15 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from importlib import import_module
import json
import os
import re
import urllib
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
from quixote.directory import Directory
from quixote.html import htmltext, TemplateIO
from quixote.util import StaticDirectory
import forms.root
from qommon import saml2
from qommon import _
from qommon import errors
from qommon import get_cfg, get_logger
from qommon import template
from qommon.form import *
import qommon.ident
import qommon.pages
from qommon.afterjobs import AfterJobStatusDirectory
from categories import Category
from formdef import FormDef
from data_sources import NamedDataSource
from wscalls import NamedWsCall
from wcs.api import ApiDirectory
from myspace import MyspaceDirectory
from forms.preview import PreviewDirectory
from wcs.forms.actions import ActionsDirectory
from wcs.scripts import Script
from wcs import portfolio
class CompatibilityDirectory(Directory):
_q_exports = ['']
def _q_index(self):
return redirect('..')
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
try:
return qommon.ident.get_method_directory(component)
except KeyError:
raise errors.TraversalError()
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if get_request().form.get('ReturnUrl'):
get_request().form['next'] = get_request().form.pop('ReturnUrl')
if len(ident_methods) == 0:
get_logger().info('no ident method defined')
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
# if isPassive is given in query parameters, we restrict ourselves
# to saml login.
ident_methods = ['idp']
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError(msg)
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = htmltext('<br/>'))
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if get_publisher().ident_methods.get(method)().is_interactive():
login_url = '../ident/%s/login' % method
if get_request().form.get('next'):
login_url += '?' + urllib.urlencode(
{'next': get_request().form.get('next')})
return redirect(login_url)
else:
try:
return qommon.ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError()
else:
template.html_top(_('Login'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the identification method you want to use:')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
try:
dir = qommon.ident.get_method_directory(component)
# set the register page as the index page, so the url can be
# /login/password/ instead of /login/password/login
dir._q_exports.append('')
dir._q_index = dir.login
return dir
except KeyError:
return errors.TraversalError()
class RegisterDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('register')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return qommon.ident.register(method)
except KeyError:
get_logger().error('failed to register with method %s' % method)
return errors.TraversalError()
else:
form = Form(enctype='multipart/form-data')
form.add(RadiobuttonsWidget, 'method',
options = [(x.key, _(x.description)) \
for x in qommon.ident.get_method_classes() if \
x.key in ident_methods],
delim = htmltext('<br/>'))
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
return redirect('../ident/%s/register' % method)
else:
template.html_top(_('Register'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the registration method you want to use:')
r += htmltext(form.render())
return r.getvalue()
def _q_lookup(self, component):
try:
dir = qommon.ident.get_method_directory(component)
# set the register page as the index page, so the url can be
# /register/password/ instead of /register/password/register
dir._q_exports.append('')
dir._q_index = dir.register
return dir
except KeyError:
return errors.TraversalError()
class StaticsDirectory(Directory):
static_directories = {
'': ['web', 'qommon', 'django:gadjo', 'django:ckeditor'],
'xstatic': ['xstatic:jquery', 'xstatic:jquery_ui',
'xstatic:font_awesome', 'xstatic:opensans',
'xstatic:leaflet',],
}
@classmethod
def resolve_static_directories(cls, prefix):
directories = cls.static_directories[prefix]
for directory in directories:
if directory[0] == '/':
yield directory
elif not ':' in directory:
yield os.path.join(get_publisher().data_dir, directory)
else:
directory_type, value = directory.split(':')
try:
if directory_type == 'xstatic':
module = import_module('xstatic.pkg.%s' % value)
yield module.BASE_DIR
elif directory_type == 'django':
module = import_module(value)
yield os.path.join(os.path.dirname(module.__file__), 'static')
except ImportError:
pass
def _q_traverse(self, path):
if path[0] in self.static_directories.keys():
prefix, rest = path[0], path[1:]
else:
prefix, rest = '', path
if not rest:
raise errors.AccessForbiddenError()
for directory in self.resolve_static_directories(prefix):
try:
return StaticDirectory(directory, follow_symlinks=True)._q_traverse(rest)
except errors.TraversalError:
continue
raise errors.TraversalError()
class RootDirectory(Directory):
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'saml',
'ident', 'register', 'afterjobs', 'themes', 'myspace', 'user', 'roles',
'pages', ('tmp-upload', 'tmp_upload'), 'api',
'tryauth', 'auth', 'preview', ('reload-top', 'reload_top'),
'fargo', ('i18n.js', 'i18n_js'), 'static', 'actions']
api = ApiDirectory()
themes = template.ThemesDirectory()
myspace = MyspaceDirectory()
pages = qommon.pages.PagesDirectory()
fargo = portfolio.FargoDirectory()
static = StaticsDirectory()
actions = ActionsDirectory()
def tryauth(self):
return forms.root.tryauth(get_publisher().get_root_url())
def auth(self):
return forms.root.auth(get_publisher().get_root_url())
def logout(self):
get_logger().info('logout')
session = get_session()
if not session:
return redirect(get_publisher().get_root_url())
ident_methods = get_cfg('identification', {}).get('methods', [])
if 'fc' in ident_methods and session.extra_user_variables and 'fc_id_token' in session.extra_user_variables:
return get_publisher().ident_methods['fc']().logout()
if not 'idp' in ident_methods:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if not get_session().lasso_identity_provider_id:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
# add settings to disable single logout?
# (and to set it as none/get/soap?)
return self.saml.slo_sp()
def user(self):
# endpoint for backward compatibility, new code should call /api/user/
if get_request().is_json():
return self.api.user._q_index()
return redirect('myspace/')
def roles(self):
# endpoint for backward compatibility, new code should call /api/roles
if not get_request().is_json():
return redirect('/')
return self.api.roles()
def tmp_upload(self):
results = []
for k, v in get_request().form.items():
if hasattr(v, 'fp'):
tempfile = get_session().add_tempfile(v)
results.append({'name': tempfile.get('base_filename'),
'type': tempfile.get('content_type'),
'size': tempfile.get('size'),
'token': tempfile.get('token')})
get_response().set_content_type('application/json')
useragent = get_request().get_header('User-agent')
if re.findall(r'MSIE \d\.', useragent):
# hack around MSIE version < 10 as they do not have support for
# XmlHttpRequest 2 (hence the forced usage of an iframe to send
# a file in the background (cf jquery.iframe-transport.js); and
# they would propose the returned json content for download if
# it was served with the appropriate content type :/
get_response().set_content_type('text/plain')
return json.dumps(results)
def feed_substitution_parts(self):
get_publisher().substitutions.feed(get_session())
get_publisher().substitutions.feed(get_request().user)
get_publisher().substitutions.feed(NamedDataSource)
get_publisher().substitutions.feed(NamedWsCall)
get_publisher().substitutions.feed(Script)
def _q_traverse(self, path):
self.feed_substitution_parts()
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
if not self.admin:
self.admin = get_publisher().admin_directory_class()
if not self.backoffice:
self.backoffice = get_publisher().backoffice_directory_class()
try:
return Directory._q_traverse(self, path)
except errors.TraversalError:
pass
return forms.root.RootDirectory()._q_traverse(path)
def _q_lookup(self, component):
# is this a category ?
try:
category = Category.get_by_urlname(component)
except KeyError:
pass
else:
# redirect to category if there's not a formdef with same slug
try:
formdef = FormDef.get_by_urlname(component)
except KeyError:
return forms.root.RootDirectory(category)
# or a form ?
return forms.root.RootDirectory()._q_lookup(component)
def reload_top(self):
get_response().filter = {}
return htmltext('<html><body><script>window.top.document.location.reload();</script></body></html>')
def i18n_js(self):
get_response().set_content_type('text/javascript')
strings = {
'confirmation': _('Are you sure?'),
'geoloc_unknown_error': _('Geolocation: unknown error'),
'geoloc_permission_denied': _('Geolocation: permission denied'),
'geoloc_position_unavailable': _('Geolocation: position unavailable'),
'geoloc_timeout': _('Geolocation: timeout'),
'map_zoom_in': _('Zoom in'),
'map_zoom_out': _('Zoom out'),
'map_display_position': _('Display my position'),
's2_errorloading': _('The results could not be loaded'),
's2_nomatches': _('No matches found'),
's2_tooshort': _('Please enter more characters'),
's2_loadmore': _('Loading more results...'),
's2_searching': _('Searching...'),
}
return 'WCS_I18N = %s;\n' % json.dumps(strings)
admin = None
backoffice = None
saml = saml2.Saml2Directory()
forms = CompatibilityDirectory()
login = LoginDirectory()
register = RegisterDirectory()
ident = IdentDirectory()
afterjobs = AfterJobStatusDirectory()
preview = PreviewDirectory()