This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
auquotidien/auquotidien/modules/root.py

615 lines
24 KiB
Python

from quixote import get_publisher, get_response, get_request, redirect, get_session
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from wcs.qommon import _
from wcs.qommon.misc import get_variadic_url, simplify
import os
import re
import string
from django.utils.six.moves.urllib import parse as urlparse
try:
import lasso
except ImportError:
pass
import wcs
import wcs.root
from wcs import qommon
from wcs.forms.root import RootDirectory as FormsRootDirectory
from wcs.qommon import get_cfg, get_logger
from wcs.qommon import template
from wcs.qommon import errors
from wcs.qommon.form import *
from wcs.qommon import logger
from wcs.roles import logged_users_role
from wcs.qommon import emails
from wcs.qommon.sms import SMS
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource
from wcs.qommon.tokens import Token
from wcs.qommon.admin.emails import EmailsDirectory
from wcs.qommon.admin.texts import TextsDirectory
from .myspace import MyspaceDirectory
from .payments import PublicPaymentDirectory
from .payments_ui import InvoicesDirectory
from . import admin
import wcs.forms.root
from wcs.workflows import Workflow
from wcs.forms.preview import PreviewDirectory
from .saml2 import Saml2Directory
OldRootDirectory = wcs.root.RootDirectory
import wcs.qommon.ident.password
import wcs.qommon.ident.idp
def category_get_homepage_position(self):
if hasattr(self, 'homepage_position') and self.homepage_position:
return self.homepage_position
if self.url_name == 'consultations':
return '2nd'
return '1st'
Category.get_homepage_position = category_get_homepage_position
def category_get_limit(self):
if hasattr(self, 'limit') and self.limit is not None:
return self.limit
return 7
Category.get_limit = category_get_limit
Category.TEXT_ATTRIBUTES = ['name', 'url_name', 'description', 'homepage_position']
Category.INT_ATTRIBUTES = ['position', 'limit']
OldRegisterDirectory = wcs.root.RegisterDirectory
class AlternateRegisterDirectory(OldRegisterDirectory):
def _q_traverse(self, path):
get_response().filter['bigdiv'] = 'new_member'
return OldRegisterDirectory._q_traverse(self, path)
def _q_index(self):
get_logger().info('register')
ident_methods = get_cfg('identification', {}).get('methods', [])
if len(ident_methods) == 0:
idps = get_cfg('idp', {})
if len(idps) == 0:
return template.error_page(_('Authentication subsystem is not yet configured.'))
ident_methods = ['idp'] # fallback to old behaviour; saml.
if len(ident_methods) == 1:
method = ident_methods[0]
else:
method = 'password'
return wcs.qommon.ident.register(method)
OldLoginDirectory = wcs.root.LoginDirectory
class AlternateLoginDirectory(OldLoginDirectory):
def _q_traverse(self, path):
get_response().filter['bigdiv'] = 'member'
return OldLoginDirectory._q_traverse(self, path)
def _q_index(self):
get_logger().info('login')
ident_methods = get_cfg('identification', {}).get('methods', [])
if get_request().form.get('ReturnUrl'):
get_request().form['next'] = get_request().form.pop('ReturnUrl')
if 'IsPassive' in get_request().form and 'idp' in ident_methods:
# if isPassive is given in query parameters, we restrict ourselves
# to saml login.
ident_methods = ['idp']
if len(ident_methods) > 1 and 'idp' in ident_methods:
# if there is more than one identification method, and there is a
# possibility of SSO, if we got there as a consequence of an access
# unauthorized url on admin/ or backoffice/, then idp auth method
# is chosen forcefully.
after_url = get_request().form.get('next')
if after_url:
root_url = get_publisher().get_root_url()
after_path = urlparse.urlparse(after_url)[2]
after_path = after_path[len(root_url):]
if after_path.startswith(str('admin')) or \
after_path.startswith(str('backoffice')):
ident_methods = ['idp']
# don't display authentication system choice
if len(ident_methods) == 1:
method = ident_methods[0]
try:
return wcs.qommon.ident.login(method)
except KeyError:
get_logger().error('failed to login with method %s' % method)
return errors.TraversalError()
if sorted(ident_methods) == ['idp', 'password']:
r = TemplateIO(html=True)
get_response().breadcrumb.append(('login', _('Login')))
identities_cfg = get_cfg('identities', {})
form = Form(enctype = 'multipart/form-data', id = 'login-form', use_tokens = False)
if identities_cfg.get('email-as-username', False):
form.add(StringWidget, 'username', title = _('Email'), size=25, required=True)
else:
form.add(StringWidget, 'username', title = _('Username'), size=25, required=True)
form.add(PasswordWidget, 'password', title = _('Password'), size=25, required=True)
form.add_submit('submit', _('Connect'))
if form.is_submitted() and not form.has_errors():
tmp = wcs.qommon.ident.password.MethodDirectory().login_submit(form)
if not form.has_errors():
return tmp
r += htmltext('<div id="login-password">')
r += get_session().display_message()
r += form.render()
base_url = get_publisher().get_root_url()
r += htmltext('<p><a href="%sident/password/forgotten">%s</a></p>') % (
base_url, _('Forgotten password ?'))
r += htmltext('</div>')
# XXX: this part only supports a single IdP
r += htmltext('<div id="login-sso">')
r += TextsDirectory.get_html_text('aq-sso-text')
form = Form(enctype='multipart/form-data',
action = '%sident/idp/login' % base_url)
form.add_hidden('method', 'idp')
for kidp, idp in get_cfg('idp', {}).items():
p = lasso.Provider(lasso.PROVIDER_ROLE_IDP,
misc.get_abs_path(idp['metadata']),
misc.get_abs_path(idp.get('publickey')), None)
form.add_hidden('idp', p.providerId)
break
form.add_submit('submit', _('Connect'))
r += form.render()
r += htmltext('</div>')
get_request().environ['REQUEST_METHOD'] = 'GET'
r += htmltext("""<script type="text/javascript">
document.getElementById('login-form')['username'].focus();
</script>""")
return r.getvalue()
else:
return OldLoginDirectory._q_index(self)
OldIdentDirectory = wcs.root.IdentDirectory
class AlternateIdentDirectory(OldIdentDirectory):
def _q_traverse(self, path):
get_response().filter['bigdiv'] = 'member'
return OldIdentDirectory._q_traverse(self, path)
class AlternatePreviewDirectory(PreviewDirectory):
def _q_traverse(self, path):
get_response().filter['bigdiv'] = 'rub_service'
return super(AlternatePreviewDirectory, self)._q_traverse(path)
class AlternateRootDirectory(OldRootDirectory):
_q_exports = ['', 'admin', 'backoffice', 'forms', 'login', 'logout',
'saml', 'register', 'ident', 'afterjobs',
('informations-editeur', 'informations_editeur'),
'myspace', 'services', 'categories', 'user',
('tmp-upload', 'tmp_upload'), 'json', '__version__',
'themes', 'pages', 'payment', 'invoices', 'roles',
'api', 'code', 'fargo', 'tryauth', 'auth', 'preview',
('reload-top', 'reload_top'), 'static',
('i18n.js', 'i18n_js'), 'actions',]
register = AlternateRegisterDirectory()
login = AlternateLoginDirectory()
ident = AlternateIdentDirectory()
myspace = MyspaceDirectory()
saml = Saml2Directory()
payment = PublicPaymentDirectory()
invoices = InvoicesDirectory()
code = wcs.forms.root.TrackingCodesDirectory()
preview = AlternatePreviewDirectory()
def get_substitution_variables(self):
return {'links': ''}
def _q_traverse(self, path):
self.feed_substitution_parts()
# set app_label to Publik if none was specified (this is used in
# backoffice header top line)
if not get_publisher().get_site_option('app_label'):
if not get_publisher().site_options.has_section('options'):
get_publisher().site_options.add_section('options')
get_publisher().site_options.set('options', 'app_label', 'Publik')
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
response.filter['auquotidien'] = True
if not path or (path[0] not in ('api', 'backoffice') and not get_request().is_json()):
# api & backoffice have no use for a side box
response.filter['gauche'] = self.box_side(path)
response.filter['keywords'] = template.get_current_theme().get('keywords')
get_publisher().substitutions.feed(self)
response.breadcrumb = [ ('', _('Home')) ]
if not self.admin:
self.admin = get_publisher().admin_directory_class()
if not self.backoffice:
self.backoffice = get_publisher().backoffice_directory_class()
try:
return Directory._q_traverse(self, path)
except errors.TraversalError as e:
try:
f = FormDef.get_by_urlname(path[0])
except KeyError:
pass
else:
base_url = get_publisher().get_root_url()
uri_rest = get_request().environ.get('REQUEST_URI')
if not uri_rest:
# REQUEST_URI doesn't exist when using internal HTTP server
# (--http)
uri_rest = get_request().get_path()
if get_request().get_query():
uri_rest += '?' + get_request().get_query()
if uri_rest.startswith(base_url):
uri_rest = uri_rest[len(base_url):]
if f.category:
if f.category.url_name == f.url_name:
return FormsRootDirectory(f.category)._q_traverse(path[1:])
return redirect('%s%s/%s' % (base_url, f.category.url_name, uri_rest))
raise e
def _q_lookup(self, component):
# is this a category ?
try:
category = Category.get_by_urlname(component)
except KeyError:
category = None
# is this a formdef ?
try:
formdef = FormDef.get_by_urlname(component)
except KeyError:
if category:
return FormsRootDirectory(category)
else:
# if the form has no category, or the request is a POST, or the
# slug matches both a category and a formdef, directly call
# into FormsRootDirectory.
if formdef.category_id is None or get_request().get_method() == 'POST' or (
formdef and category):
get_response().filter['bigdiv'] = 'rub_service'
return FormsRootDirectory()._q_lookup(component)
# if there is category, let it fall back to raise TraversalError,
# it will get caught in _q_traverse that will redirect it to an
# URL embedding the category
if component in ('accessibility', 'contact', 'help'):
if TextsDirectory.get_html_text('aq-' + component):
return getattr(self, component)()
return None
def json(self):
return FormsRootDirectory().json()
def categories(self):
return FormsRootDirectory().categories()
def _q_index(self):
if get_request().is_json():
return FormsRootDirectory().json()
root_url = get_publisher().get_root_url()
if get_request().user and get_request().user.anonymous and get_request().user.lasso_dump:
return redirect('%smyspace/new' % root_url)
redirect_url = get_cfg('misc', {}).get('homepage-redirect-url')
if redirect_url:
return redirect(misc.get_variadic_url(redirect_url,
get_publisher().substitutions.get_context_variables()))
template.html_top()
r = TemplateIO(html=True)
get_response().filter['is_index'] = True
if not 'auquotidien-welcome-in-services' in get_response().filter.get('keywords', []):
t = TextsDirectory.get_html_text('aq-home-page')
if not t:
if get_request().user:
t = TextsDirectory.get_html_text('welcome-logged')
else:
t = TextsDirectory.get_html_text('welcome-unlogged')
if t:
r += htmltext('<div id="home-page-intro">')
r += t
r += htmltext('</div>')
r += htmltext('<div id="centre">')
r += self.box_services(position='1st')
r += htmltext('</div>')
r += htmltext('<div id="droite">')
r += self.myspace_snippet()
r += self.box_services(position='2nd')
r += self.consultations()
r += htmltext('</div>')
user = get_request().user
if user and user.can_go_in_backoffice():
get_response().filter['backoffice'] = True
return r.getvalue()
def services(self):
template.html_top()
get_response().filter['bigdiv'] = 'rub_service'
return self.box_services(level = 2)
def box_services(self, level=3, position=None):
## Services
if get_request().user and get_request().user.roles:
accepted_roles = get_request().user.roles
else:
accepted_roles = []
cats = Category.select(order_by = 'name')
cats = [x for x in cats if x.url_name != 'consultations']
Category.sort_by_position(cats)
all_formdefs = FormDef.select(lambda x: not x.is_disabled() or x.disabled_redirection,
order_by = 'name')
if position:
t = self.display_list_of_formdefs(
[x for x in cats if x.get_homepage_position() == position],
all_formdefs, accepted_roles)
else:
t = self.display_list_of_formdefs(cats, all_formdefs, accepted_roles)
if not t:
return
r = TemplateIO(html=True)
if position == '2nd':
r += htmltext('<div id="services-2nd">')
else:
r += htmltext('<div id="services">')
if level == 2:
r += htmltext('<h2>%s</h2>') % _('Services')
else:
r += htmltext('<h3>%s</h3>') % _('Services')
if 'auquotidien-welcome-in-services' in get_response().filter.get('keywords', []):
homepage_text = TextsDirectory.get_html_text('aq-home-page')
if homepage_text:
r += htmltext('<div id="home-page-intro">')
r += homepage_text
r += htmltext('</div>')
r += htmltext('<ul>')
r += t
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def display_list_of_formdefs(self, cats, all_formdefs, accepted_roles):
r = TemplateIO(html=True)
for category in cats:
if category.url_name == 'consultations':
self.consultations_category = category
continue
formdefs = [x for x in all_formdefs if str(x.category_id) == str(category.id)]
formdefs_advertise = []
for formdef in formdefs[:]:
if formdef.is_disabled(): # is a redirection
continue
if not formdef.roles:
continue
if not get_request().user:
if formdef.always_advertise:
formdefs_advertise.append(formdef)
formdefs.remove(formdef)
continue
if logged_users_role().id in formdef.roles:
continue
for q in accepted_roles:
if q in formdef.roles:
break
else:
if formdef.always_advertise:
formdefs_advertise.append(formdef)
formdefs.remove(formdef)
if not formdefs and not formdefs_advertise:
continue
keywords = {}
for formdef in formdefs:
for keyword in formdef.keywords_list:
keywords[keyword] = True
r += htmltext('<li id="category-%s" data-keywords="%s">') % (
category.url_name, ' '.join(keywords))
r += htmltext('<strong>')
r += htmltext('<a href="%s/">') % category.url_name
r += category.name
r += htmltext('</a></strong>\n')
r += category.get_description_html_text()
r += htmltext('<ul>')
limit = category.get_limit()
for formdef in formdefs[:limit]:
r += htmltext('<li data-keywords="%s">') % ' '.join(formdef.keywords_list)
classes = []
if formdef.is_disabled() and formdef.disabled_redirection:
classes.append('redirection')
r += htmltext('<a class="%s" href="%s/%s/">%s</a>') % (
' '.join(classes), category.url_name, formdef.url_name, formdef.name)
r += htmltext('</li>\n')
if len(formdefs) < limit:
for formdef in formdefs_advertise[:limit-len(formdefs)]:
r += htmltext('<li class="required-authentication">')
r += htmltext('<a href="%s/%s/">%s</a>') % (category.url_name, formdef.url_name, formdef.name)
r += htmltext('<span> (%s)</span>') % _('authentication required')
r += htmltext('</li>\n')
if (len(formdefs)+len(formdefs_advertise)) > limit:
r += htmltext('<li class="all-forms"><a href="%s/" title="%s">%s</a></li>') % (category.url_name,
_('Access to all forms of the "%s" category') % category.name,
_('Access to all forms in this category'))
r += htmltext('</ul>')
r += htmltext('</li>\n')
return r.getvalue()
def consultations(self):
cats = [x for x in Category.select() if x.url_name == 'consultations']
if not cats:
return
consultations_category = cats[0]
formdefs = FormDef.select(lambda x: (
str(x.category_id) == str(consultations_category.id) and
(not x.is_disabled() or x.disabled_redirection)),
order_by = 'name')
if not formdefs:
return
## Consultations
r = TemplateIO(html=True)
r += htmltext('<div id="consultations">')
r += htmltext('<h3>%s</h3>') % _('Consultations')
r += consultations_category.get_description_html_text()
r += htmltext('<ul>')
for formdef in formdefs:
r += htmltext('<li>')
r += htmltext('<a href="%s/%s/">%s</a>') % (consultations_category.url_name,
formdef.url_name, formdef.name)
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def box_side(self, path):
r = TemplateIO(html=True)
root_url = get_publisher().get_root_url()
if (path == [''] and
'include-tracking-code-form' in get_response().filter.get('keywords', []) and
self.has_anonymous_access_codes()):
r += htmltext('<form id="follow-form" action="%scode/load">') % root_url
r += htmltext('<h3>%s</h3>') % _('Tracking code')
r += htmltext('<input size="12" name="code" placeholder="%s"/>') % _('ex: RPQDFVCD')
r += htmltext('<input type="submit" value="%s"/>') % _('Load')
r += htmltext('</form>')
cats = Category.select(order_by = 'name')
cats = [x for x in cats if x.url_name != 'consultations' and x.get_homepage_position() == 'side']
Category.sort_by_position(cats)
if cats:
r += htmltext('<div id="side-services">')
r += htmltext('<h3>%s</h3>') % _('Services')
r += htmltext('<ul>')
for cat in cats:
r += htmltext('<li><a href="%s/">%s</a></li>') % (cat.url_name, cat.name)
r += htmltext('</ul>')
r += htmltext('</div>')
v = r.getvalue()
if v:
r = TemplateIO(html=True)
r += htmltext('<div id="sidebox">')
r += v
r += htmltext('</div>')
return r.getvalue()
return None
def has_anonymous_access_codes(self):
return any((x for x in FormDef.select() if x.enable_tracking_codes))
def myspace_snippet(self):
r = TemplateIO(html=True)
r += htmltext('<div id="myspace">')
r += htmltext('<h3>%s</h3>') % _('My Space')
r += htmltext('<ul>')
if get_request().user and not get_request().user.anonymous:
r += htmltext(' <li><a href="myspace/" id="member">%s</a></li>') % _('Access to your personal space')
r += htmltext(' <li><a href="logout" id="logout">%s</a></li>') % _('Logout')
else:
r += htmltext(' <li><a href="register/" id="inscr">%s</a></li>') % _('Registration')
r += htmltext(' <li><a href="login/" id="login">%s</a></li>') % _('Login')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def page_view(self, key, title, urlname = None):
if not urlname:
urlname = key[3:].replace(str('_'), str('-'))
get_response().breadcrumb.append((urlname, title))
template.html_top(title)
r = TemplateIO(html=True)
r += htmltext('<div class="article">')
r += htmltext(TextsDirectory.get_html_text(key))
r += htmltext('</div>')
return r.getvalue()
def informations_editeur(self):
get_response().filter['bigdiv'] = 'info'
return self.page_view('aq-editor-info', _('Editor Informations'),
urlname = 'informations_editeur')
def accessibility(self):
get_response().filter['bigdiv'] = 'accessibility'
return self.page_view('aq-accessibility', _('Accessibility Statement'))
def contact(self):
get_response().filter['bigdiv'] = 'contact'
return self.page_view('aq-contact', _('Contact'))
def help(self):
get_response().filter['bigdiv'] = 'help'
return self.page_view('aq-help', _('Help'))
from qommon.publisher import get_publisher_class
get_publisher_class().root_directory_class = AlternateRootDirectory
get_publisher_class().after_login_url = 'myspace/'
get_publisher_class().use_sms_feature = True
TextsDirectory.register('aq-editor-info', N_('Editor Informations'))
TextsDirectory.register('aq-accessibility', N_('Accessibility Statement'))
TextsDirectory.register('aq-contact', N_('Contact Information'))
TextsDirectory.register('aq-help', N_('Help'))
TextsDirectory.register('aq-sso-text', N_('Connecting with Identity Provider'),
default = N_('''<h3>Connecting with Identity Provider</h3>
<p>You can also use your identity provider to connect.
</p>'''))
TextsDirectory.register('aq-home-page', N_('Home Page'), wysiwyg = True)