wcs/wcs/root.py

461 lines
17 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import os
import re
import urllib.parse
from importlib import import_module
from quixote import get_publisher, get_request, get_response, get_session, get_session_manager, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from quixote.util import StaticDirectory
from . import portfolio
from .api import ApiDirectory
from .categories import Category
from .formdef import FormDef
from .forms import root
from .forms.actions import ActionsDirectory
from .forms.preview import PreviewDirectory
from .myspace import MyspaceDirectory
from .qommon import _, errors, get_cfg, get_logger, ident, saml2, template
from .qommon.afterjobs import AfterJobStatusDirectory
from .qommon.form import Form, RadiobuttonsWidget
from .qommon.pages import PagesDirectory
from .qommon.upload_storage import UploadStorageError
class CompatibilityDirectory(Directory):
_q_exports = ['']
def _q_index(self):
return redirect('..')
class IdentDirectory(Directory):
def _q_lookup(self, component):
get_response().breadcrumb.append(('ident/', None))
try:
return ident.get_method_directory(component)
except KeyError:
raise errors.TraversalError()
class LoginDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if get_request().form.get('ReturnUrl'):
get_request().form['next'] = get_request().form.pop('ReturnUrl')
if len(ident_methods) == 0:
get_logger().info('no ident method defined')
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
# if isPassive is given in query parameters, we restrict ourselves
# to saml login.
ident_methods = ['idp']
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError(msg)
else:
form = Form(enctype='multipart/form-data')
form.add(
RadiobuttonsWidget,
'method',
options=[
(x.key, _(x.description)) for x in ident.get_method_classes() if x.key in ident_methods
],
delim=htmltext('<br/>'),
)
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
if get_publisher().ident_methods.get(method)().is_interactive():
login_url = '../ident/%s/login' % method
if get_request().form.get('next'):
login_url += '?' + urllib.parse.urlencode({'next': get_request().form.get('next')})
return redirect(login_url)
else:
try:
return ident.login(method)
except KeyError:
msg = 'failed to login with method %s' % method
get_logger().error(msg)
return errors.TraversalError()
else:
template.html_top(_('Login'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the identification method you want to use:')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
try:
dir = ident.get_method_directory(component)
# set the register page as the index page, so the url can be
# /login/password/ instead of /login/password/login
dir._q_exports.append('')
dir._q_index = dir.login
return dir
except KeyError:
return errors.TraversalError()
class RegisterDirectory(Directory):
_q_exports = ['']
def _q_index(self):
get_logger().info('register')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return ident.register(method)
except KeyError:
get_logger().error('failed to register with method %s' % method)
return errors.TraversalError()
else:
form = Form(enctype='multipart/form-data')
form.add(
RadiobuttonsWidget,
'method',
options=[
(x.key, _(x.description)) for x in ident.get_method_classes() if x.key in ident_methods
],
delim=htmltext('<br/>'),
)
form.add_submit('submit', _('Submit'))
if form.is_submitted() and not form.has_errors():
method = form.get_widget('method').parse()
return redirect('../ident/%s/register' % method)
else:
template.html_top(_('Register'))
r = TemplateIO(html=True)
r += htmltext('<p>%s</p>') % _('Select the registration method you want to use:')
r += htmltext(form.render())
return r.getvalue()
def _q_lookup(self, component):
try:
dir = ident.get_method_directory(component)
if 'register' not in dir._q_exports:
return errors.TraversalError()
# set the register page as the index page, so the url can be
# /register/password/ instead of /register/password/register
dir._q_exports.append('')
dir._q_index = dir.register
return dir
except KeyError:
return errors.TraversalError()
class StaticsDirectory(Directory):
static_directories = {
'': ['web', 'qommon', 'django:gadjo', 'django:ckeditor'],
'xstatic': [
'xstatic:jquery',
'xstatic:jquery_ui',
'xstatic:font_awesome',
'xstatic:opensans',
'xstatic:leaflet',
'xstatic:leaflet_gesturehandling',
],
}
@classmethod
def resolve_static_directories(cls, prefix):
directories = cls.static_directories[prefix]
for directory in directories:
if directory[0] == '/':
yield directory
elif ':' not in directory:
yield os.path.join(get_publisher().data_dir, directory)
else:
directory_type, value = directory.split(':')
try:
if directory_type == 'xstatic':
module = import_module('xstatic.pkg.%s' % value)
yield module.BASE_DIR
elif directory_type == 'django':
module = import_module(value)
yield os.path.join(os.path.dirname(module.__file__), 'static')
except ImportError:
pass
def _q_traverse(self, path):
if path[0] in self.static_directories.keys():
prefix, rest = path[0], path[1:]
else:
prefix, rest = '', path
if not rest:
raise errors.AccessForbiddenError()
for directory in self.resolve_static_directories(prefix):
try:
return StaticDirectory(directory, follow_symlinks=True)._q_traverse(rest)
except errors.TraversalError:
continue
raise errors.TraversalError()
class RootDirectory(Directory):
_q_exports = [
'admin',
'backoffice',
'forms',
'login',
'logout',
'saml',
'ident',
'register',
'afterjobs',
'themes',
'myspace',
'user',
'roles',
'pages',
('tmp-upload', 'tmp_upload'),
'api',
'tryauth',
'auth',
'preview',
'fargo',
('i18n.js', 'i18n_js'),
'static',
'actions',
]
api = ApiDirectory()
themes = template.ThemesDirectory()
myspace = MyspaceDirectory()
pages = PagesDirectory()
fargo = portfolio.FargoDirectory()
static = StaticsDirectory()
actions = ActionsDirectory()
def tryauth(self):
return root.tryauth(get_publisher().get_root_url())
def auth(self):
return root.auth(get_publisher().get_root_url())
def logout(self):
get_logger().info('logout')
session = get_session()
if not session:
return redirect(get_publisher().get_root_url())
ident_methods = get_cfg('identification', {}).get('methods', [])
if (
'fc' in ident_methods
and session.extra_user_variables
and 'fc_id_token' in session.extra_user_variables
):
return get_publisher().ident_methods['fc']().logout()
if 'idp' not in ident_methods:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
if not get_session().lasso_identity_provider_id:
get_session_manager().expire_session()
return redirect(get_publisher().get_root_url())
# add settings to disable single logout?
# (and to set it as none/get/soap?)
return self.saml.slo_sp()
def user(self):
# endpoint for backward compatibility, new code should call /api/user/
if get_request().is_json():
return self.api.user._q_index()
return redirect('myspace/')
def roles(self):
# endpoint for backward compatibility, new code should call /api/roles
if not get_request().is_json():
return redirect('/')
return self.api.roles()
def tmp_upload(self):
results = []
storage = get_request().form.get('storage')
for v in get_request().form.values():
if hasattr(v, 'fp'):
try:
tempfile = get_session().add_tempfile(v, storage=storage)
results.append(
{
'name': tempfile.get('base_filename'),
'type': tempfile.get('content_type'),
'size': tempfile.get('size'),
'token': tempfile.get('token'),
}
)
except UploadStorageError as e:
get_logger().error('upload storage error: %s' % e)
results.append({'error': _('failed to store file (system error)')})
get_response().set_content_type('application/json')
useragent = get_request().get_header('User-agent') or ''
if re.findall(r'MSIE \d\.', useragent):
# hack around MSIE version < 10 as they do not have support for
# XmlHttpRequest 2 (hence the forced usage of an iframe to send
# a file in the background (cf jquery.iframe-transport.js); and
# they would propose the returned json content for download if
# it was served with the appropriate content type :/
get_response().set_content_type('text/plain')
return json.dumps(results)
def feed_substitution_parts(self):
get_publisher().substitutions.feed(get_session())
get_publisher().substitutions.feed(get_request().user)
def _q_traverse(self, path):
self.feed_substitution_parts()
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [('', _('Home'))]
if not self.admin:
self.admin = get_publisher().admin_directory_class()
if not self.backoffice:
self.backoffice = get_publisher().backoffice_directory_class()
try:
return Directory._q_traverse(self, path)
except errors.TraversalError:
pass
output = root.RootDirectory()._q_traverse(path)
return self.automatic_sso(output)
def automatic_sso(self, output):
request = get_request()
response = get_response()
publisher = get_publisher()
OPENED_SESSION_COOKIE = publisher.get_site_option('idp_session_cookie_name')
PASSIVE_TRIED_COOKIE = '%s-passive-auth-tried' % publisher.config.session_cookie_name
if OPENED_SESSION_COOKIE not in request.cookies and PASSIVE_TRIED_COOKIE in request.cookies:
response.expire_cookie(PASSIVE_TRIED_COOKIE)
return output
elif OPENED_SESSION_COOKIE in request.cookies and PASSIVE_TRIED_COOKIE not in request.cookies:
ident_methods = get_cfg('identification', {}).get('methods', [])
idps = get_cfg('idp', {})
if request.user:
return output
if len(idps) != 1:
return output
if ident_methods and 'idp' not in ident_methods:
return output
response.set_cookie(
PASSIVE_TRIED_COOKIE,
'1',
secure=1,
httponly=1,
path=publisher.config.session_cookie_path,
domain=publisher.config.session_cookie_domain,
)
url = request.get_url()
query = request.get_query()
if query:
url += '?' + query
return root.tryauth(url)
else:
return output
def _q_lookup(self, component):
# is this a category ?
try:
category = Category.get_by_urlname(component)
except KeyError:
pass
else:
# redirect to category if there's not a formdef with same slug
try:
FormDef.get_by_urlname(component)
except KeyError:
return root.RootDirectory(category)
# or a form ?
return root.RootDirectory()._q_lookup(component)
def i18n_js(self):
get_response().set_content_type('text/javascript')
strings = {
'confirmation': _('Are you sure?'),
'file_type_error': _('Invalid file type'),
'file_size_error': _('File size exceeds limits'),
'geoloc_unknown_error': _('Geolocation: unknown error'),
'geoloc_permission_denied': _('Geolocation: permission denied'),
'geoloc_position_unavailable': _('Geolocation: position unavailable'),
'geoloc_timeout': _('Geolocation: timeout'),
'map_zoom_in': _('Zoom in'),
'map_zoom_out': _('Zoom out'),
'map_display_position': _('Display my position'),
's2_errorloading': _('The results could not be loaded'),
's2_nomatches': _('No matches found'),
's2_tooshort': _('Please enter more characters'),
's2_loadmore': _('Loading more results...'),
's2_searching': _('Searching...'),
'close': _('Close'),
'email_domain_suggest': _('Did you want to write'),
'email_domain_fix': _('Apply fix'),
}
return 'WCS_I18N = %s;\n' % json.dumps(strings)
admin = None
backoffice = None
saml = saml2.Saml2Directory()
forms = CompatibilityDirectory()
login = LoginDirectory()
register = RegisterDirectory()
ident = IdentDirectory()
afterjobs = AfterJobStatusDirectory()
preview = PreviewDirectory()