387 lines
15 KiB
Python
387 lines
15 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
try:
|
|
from entrouvert.wsgi.middleware import VersionMiddleware
|
|
except ImportError:
|
|
VersionMiddleware = None
|
|
|
|
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager
|
|
from quixote.directory import Directory
|
|
from quixote.html import htmltext, TemplateIO
|
|
from quixote.util import StaticDirectory
|
|
|
|
import forms.root
|
|
import liberty
|
|
from qommon import saml2
|
|
|
|
from qommon import errors
|
|
from qommon import get_cfg, get_logger
|
|
from qommon import template
|
|
from qommon import misc
|
|
from qommon.form import *
|
|
import qommon.ident
|
|
import qommon.pages
|
|
|
|
from qommon.afterjobs import AfterJobStatusDirectory
|
|
|
|
from categories import Category
|
|
from formdef import FormDef
|
|
from anonylink import AnonymityLink
|
|
from roles import Role
|
|
from wcs.api import get_user_from_api_query_string
|
|
from myspace import MyspaceDirectory
|
|
|
|
|
|
class CompatibilityDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def _q_index(self):
|
|
return redirect('..')
|
|
|
|
|
|
class IdentDirectory(Directory):
|
|
def _q_lookup(self, component):
|
|
get_response().breadcrumb.append(('ident/', None))
|
|
try:
|
|
return qommon.ident.get_method_directory(component)
|
|
except KeyError:
|
|
raise errors.TraversalError()
|
|
|
|
|
|
class LoginDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def _q_index(self):
|
|
get_logger().info('login')
|
|
ident_methods = get_cfg('identification', {}).get('methods', [])
|
|
|
|
if get_request().form.get('ReturnUrl'):
|
|
get_session().after_url = get_request().form.get('ReturnUrl')
|
|
|
|
if len(ident_methods) == 0:
|
|
get_logger().info('no ident method defined')
|
|
idps = get_cfg('idp', {})
|
|
if len(idps) == 0:
|
|
return template.error_page(_('Authentication subsystem is not yet configured.'))
|
|
ident_methods = ['idp'] # fallback to old behaviour; liberty.
|
|
|
|
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
|
|
# if isPassive is given in query parameters, we restrict ourselves
|
|
# to saml login.
|
|
ident_methods = ['idp']
|
|
|
|
if len(ident_methods) == 1:
|
|
method = ident_methods[0]
|
|
try:
|
|
return qommon.ident.login(method)
|
|
except KeyError:
|
|
msg = 'failed to login with method %s' % method
|
|
get_logger().error(msg)
|
|
return errors.TraversalError(msg)
|
|
else:
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(RadiobuttonsWidget, 'method',
|
|
options = [(x.key, _(x.description)) \
|
|
for x in qommon.ident.get_method_classes() if \
|
|
x.key in ident_methods],
|
|
delim = htmltext('<br/>'))
|
|
form.add_submit('submit', _('Submit'))
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
method = form.get_widget('method').parse()
|
|
if get_publisher().ident_methods.get(method)().is_interactive():
|
|
return redirect('../ident/%s/login' % method)
|
|
else:
|
|
try:
|
|
return qommon.ident.login(method)
|
|
except KeyError:
|
|
msg = 'failed to login with method %s' % method
|
|
get_logger().error(msg)
|
|
return errors.TraversalError()
|
|
else:
|
|
template.html_top(_('Login'))
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<p>%s</p>') % _('Select the identification method you want to use:')
|
|
r += form.render()
|
|
return r.getvalue()
|
|
|
|
def _q_lookup(self, component):
|
|
try:
|
|
dir = qommon.ident.get_method_directory(component)
|
|
# set the register page as the index page, so the url can be
|
|
# /login/password/ instead of /login/password/login
|
|
dir._q_exports.append('')
|
|
dir._q_index = dir.login
|
|
return dir
|
|
except KeyError:
|
|
return errors.TraversalError()
|
|
|
|
|
|
class RegisterDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def _q_index(self):
|
|
get_logger().info('register')
|
|
ident_methods = get_cfg('identification', {}).get('methods', [])
|
|
|
|
if len(ident_methods) == 0:
|
|
idps = get_cfg('idp', {})
|
|
if len(idps) == 0:
|
|
return template.error_page(_('Authentication subsystem is not yet configured.'))
|
|
ident_methods = ['idp'] # fallback to old behaviour; liberty.
|
|
|
|
if len(ident_methods) == 1:
|
|
method = ident_methods[0]
|
|
try:
|
|
return qommon.ident.register(method)
|
|
except KeyError:
|
|
get_logger().error('failed to register with method %s' % method)
|
|
return errors.TraversalError()
|
|
else:
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(RadiobuttonsWidget, 'method',
|
|
options = [(x.key, _(x.description)) \
|
|
for x in qommon.ident.get_method_classes() if \
|
|
x.key in ident_methods],
|
|
delim = htmltext('<br/>'))
|
|
form.add_submit('submit', _('Submit'))
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
method = form.get_widget('method').parse()
|
|
return redirect('../ident/%s/register' % method)
|
|
else:
|
|
template.html_top(_('Register'))
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<p>%s</p>') % _('Select the registration method you want to use:')
|
|
r += htmltext(form.render())
|
|
return r.getvalue()
|
|
|
|
|
|
def _q_lookup(self, component):
|
|
try:
|
|
dir = qommon.ident.get_method_directory(component)
|
|
# set the register page as the index page, so the url can be
|
|
# /register/password/ instead of /register/password/register
|
|
dir._q_exports.append('')
|
|
dir._q_index = dir.register
|
|
return dir
|
|
except KeyError:
|
|
return errors.TraversalError()
|
|
|
|
|
|
class RootDirectory(Directory):
|
|
_q_exports = ['admin', 'backoffice', 'forms', 'login', 'logout', 'liberty', 'token', 'saml',
|
|
'ident', 'register', 'afterjobs', 'themes', 'myspace', 'user', 'roles',
|
|
'pages', ('tmp-upload', 'tmp_upload'), '__version__']
|
|
|
|
themes = template.ThemesDirectory()
|
|
myspace = MyspaceDirectory()
|
|
pages = qommon.pages.PagesDirectory()
|
|
|
|
def __version__(self):
|
|
if VersionMiddleware is None:
|
|
raise errors.TraversalError()
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps(VersionMiddleware(None).get_packages_version())
|
|
|
|
def logout(self):
|
|
get_logger().info('logout')
|
|
session = get_session()
|
|
if not session:
|
|
return redirect(get_publisher().get_root_url())
|
|
ident_methods = get_cfg('identification', {}).get('methods', [])
|
|
if not 'idp' in ident_methods:
|
|
get_session_manager().expire_session()
|
|
return redirect(get_publisher().get_root_url())
|
|
|
|
if not get_session().lasso_identity_provider_id:
|
|
get_session_manager().expire_session()
|
|
return redirect(get_publisher().get_root_url())
|
|
|
|
# add settings to disable single logout?
|
|
# (and to set it as none/get/soap?)
|
|
import lasso
|
|
if misc.get_current_protocol() == lasso.PROTOCOL_SAML_2_0:
|
|
return self.saml.slo_sp()
|
|
else:
|
|
return self.liberty.singleLogout()
|
|
|
|
def token(self):
|
|
if not get_request().user:
|
|
raise errors.AccessUnauthorizedError()
|
|
|
|
form = Form(enctype='multipart/form-data')
|
|
form.add(StringWidget, 'token', title = _('Identification Token'),
|
|
required = True, size = 30)
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
|
|
if form.get_widget('cancel').parse():
|
|
return redirect('.')
|
|
|
|
if not form.is_submitted() or form.has_errors():
|
|
template.html_top(_('Identification Token'))
|
|
# TODO: include explanation about identification token (?)
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<p>%s</p>') % _('Please enter your identification token.')
|
|
r += htmltext(form.render())
|
|
return r.getvalue()
|
|
else:
|
|
session = get_session()
|
|
if get_request().user:
|
|
lasso_dump = get_request().user.lasso_dump
|
|
else:
|
|
return template.error_page('No Lasso Identity Dump (???)')
|
|
token = form.get_widget('token').parse()
|
|
users_with_token = list(get_publisher().user_class.select(lambda x: x.identification_token == token))
|
|
if len(users_with_token) == 0:
|
|
return template.error_page(_('Unknown Token'))
|
|
|
|
user = users_with_token[0]
|
|
user.name_identifiers.append(session.name_identifier)
|
|
user.lasso_dump = str(lasso_dump)
|
|
user.identification_token = None
|
|
user.store()
|
|
|
|
session.set_user(user.id)
|
|
|
|
for anonylink in AnonymityLink.select(
|
|
lambda x: x.name_identifier == session.name_identifier):
|
|
if anonylink.formdata_type == 'form':
|
|
fdef = FormDef.get(anonylink.formdata_def_id)
|
|
else:
|
|
continue # ?
|
|
data = fdef.data_class().get(anonylink.formdata_id)
|
|
data.user_id = user.id
|
|
data.store()
|
|
anonylink.remove_self()
|
|
|
|
return redirect('.')
|
|
|
|
def user(self):
|
|
if get_request().is_json():
|
|
return self.user_json()
|
|
return redirect('myspace/')
|
|
|
|
def user_json(self):
|
|
get_response().set_content_type('application/json')
|
|
user = get_user_from_api_query_string() or get_request().user
|
|
if not user:
|
|
return errors.AccessForbiddenError()
|
|
user_info = user.get_substitution_variables(prefix='')
|
|
del user_info['user']
|
|
user_info['user_roles'] = []
|
|
for role_id in user.roles or []:
|
|
user_info['user_roles'].append(Role.get(role_id).name)
|
|
return json.dumps(user_info)
|
|
|
|
def roles(self):
|
|
if not get_request().is_json():
|
|
return redirect('/')
|
|
get_response().set_content_type('application/json')
|
|
if not (get_request().user and get_request().user.can_go_in_admin()) and \
|
|
not get_user_from_api_query_string():
|
|
raise errors.AccessForbiddenError()
|
|
list_roles = []
|
|
charset = get_publisher().site_charset
|
|
for role in Role.select():
|
|
list_roles.append({'text': unicode(role.name, charset),
|
|
'allows_backoffice_access': role.allows_backoffice_access,
|
|
'id': role.id})
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps({'data': list_roles})
|
|
|
|
def tmp_upload(self):
|
|
results = []
|
|
for k, v in get_request().form.items():
|
|
if hasattr(v, 'fp'):
|
|
token = get_session().add_tempfile(v)
|
|
tempfile = get_session().tempfiles[token]
|
|
results.append({'name': tempfile.get('base_filename'),
|
|
'type': tempfile.get('content_type'),
|
|
'size': tempfile.get('size'),
|
|
'token': token})
|
|
get_response().set_content_type('application/json')
|
|
useragent = get_request().get_header('User-agent')
|
|
if re.findall('MSIE \d\.', useragent):
|
|
# hack around MSIE version < 10 as they do not have support for
|
|
# XmlHttpRequest 2 (hence the forced usage of an iframe to send
|
|
# a file in the background (cf jquery.iframe-transport.js); and
|
|
# they would propose the returned json content for download if
|
|
# it was served with the appropriate content type :/
|
|
get_response().set_content_type('text/plain')
|
|
return json.dumps(results)
|
|
|
|
def _q_traverse(self, path):
|
|
response = get_response()
|
|
if not hasattr(response, 'filter'):
|
|
response.filter = {}
|
|
if not hasattr(response, 'breadcrumb'):
|
|
response.breadcrumb = [ ('', _('Home')) ]
|
|
|
|
get_publisher().substitutions.feed(get_request().user)
|
|
|
|
if not self.admin:
|
|
self.admin = get_publisher().admin_directory_class()
|
|
|
|
if not self.backoffice:
|
|
self.backoffice = get_publisher().backoffice_directory_class()
|
|
|
|
try:
|
|
return Directory._q_traverse(self, path)
|
|
except errors.TraversalError:
|
|
pass
|
|
|
|
return forms.root.RootDirectory()._q_traverse(path)
|
|
|
|
def _q_lookup(self, component):
|
|
if component in ('css','images'):
|
|
return StaticDirectory(os.path.join(get_publisher().data_dir, 'web', component), follow_symlinks = True)
|
|
if component == 'qo':
|
|
dirname = os.path.join(get_publisher().data_dir, 'qommon')
|
|
return StaticDirectory(dirname, follow_symlinks = True)
|
|
|
|
# maps /leaflet/ to the directory provided by the libjs-openlayers package
|
|
if component == 'leaflet':
|
|
return StaticDirectory('/usr/share/javascript/leaflet')
|
|
|
|
# is this a category ?
|
|
try:
|
|
category = Category.get_by_urlname(component)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
return forms.root.RootDirectory(category)
|
|
|
|
# or a form ?
|
|
return forms.root.RootDirectory()._q_lookup(component)
|
|
|
|
admin = None
|
|
backoffice = None
|
|
|
|
saml = saml2.Saml2Directory()
|
|
liberty = liberty.LibertyDirectory()
|
|
forms = CompatibilityDirectory()
|
|
login = LoginDirectory()
|
|
register = RegisterDirectory()
|
|
ident = IdentDirectory()
|
|
afterjobs = AfterJobStatusDirectory()
|