This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic-adeline/extra/modules/root.ptl

622 lines
24 KiB
Plaintext

import lasso
import Cookie
from quixote import redirect, get_session, get_session_manager, get_response, get_field, get_request
from qommon import get_cfg, get_logger
from qommon.form import *
from qommon import template
from qommon.admin.texts import TextsDirectory
import authentic.root
from authentic import identities
from authentic import misc
OldRootDirectory = authentic.root.RootDirectory
from liberty import AlternateLibertyDirectory
from national_services import NationalService
import collectivity
from qommon.storage import StorableObject
from authentic.root import LoginError, RegistrationError
def is_federated_with_msp(self):
if not self.lasso_proxy_dump:
return False
identity = lasso.Identity.newFromDump(self.lasso_proxy_dump)
msp_provider_id = get_cfg('adeline', {}).get('msp_idp')
for p in identity.providerIds:
if p == msp_provider_id:
return True
return False
identities.Identity.is_federated_with_msp = is_federated_with_msp
def get_key_from_provider_id(provider_id):
return provider_id.replace('://', '-').replace('/', '-')
class MspUserHash(StorableObject):
_names = 'mspuserhashes'
user_id = None
def get_by_cookie(cls):
try:
hash = get_request().cookies['mspuserhash']
except KeyError:
raise KeyError
return cls.get(hash)
get_by_cookie = classmethod(get_by_cookie)
class AlternateRootDirectory(OldRootDirectory):
_q_exports = ['', 'admin', 'liberty', 'login', 'logout', 'change_password', 'register',
'forgot_password', 'update_info', 'saml', 'singleLogout',
'federations', 'login_local', 'login_msp', 'federate_msp',
'migration_done', 'reset', 'add_msp_cookie']
liberty = AlternateLibertyDirectory()
def _q_traverse(self, path):
t = OldRootDirectory._q_traverse(self, path)
federated_with_msp = ''
if get_session() and get_session().user:
identity = identities.get_store().get_identity(get_session().user)
if identity.is_federated_with_msp():
federated_with_msp = 'plop' # any value will do
get_response().filter['federated_with_msp'] = federated_with_msp
get_response().filter['msp_session_image_url'] = get_cfg('adeline', {}).get(
'msp_session_image_url', '')
return t
def get_idp_sso_list [html] (self):
if not get_cfg('providers', {}).items():
return ''
'<ul>'
federated_with_msp = False
if get_session() and get_session().user:
identity = identities.get_store().get_identity(get_session().user)
if identity.is_federated_with_msp():
federated_with_msp = True
for klp, lp in get_cfg('providers', {}).items():
if lp['role'] == lasso.PROVIDER_ROLE_IDP:
continue # only allows initiated login to service providers
if lp.get('idp_initiated_sso', True) is False:
continue
if federated_with_msp and str('adeldata') in klp:
continue
try:
provider, label = misc.get_provider_and_label(klp)
except KeyError:
continue
if hasattr(provider, str('getProtocolConformance')) and \
provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
url = 'saml/sp/%s/login' % klp
else:
url = 'liberty/sp/%s/login' % klp
'<li>%s <span class="but-right"><a href="%s">%s</a></span></li>' % (label, url, _('Connect'))
'</ul>'
def _q_index [html] (self):
template.html_top()
session = get_session()
if not session.user:
return redirect('login')
try:
identity = identities.get_store().get_identity(get_session().user)
except KeyError:
return template.error_page('Failed to get identity')
'<div class="cdc_col1">'
'<p>'
_('Your keyring allows you to access...')
'</p>'
'<p>'
_('To access to the service of your choice...')
'</p>'
'</div>'
'<div class="cdc_col2">'
'<fieldset class="cdc_connections">'
'<legend>%s</legend>' % _('Your local services')
self.get_idp_sso_list()
'</fieldset>'
'<fieldset class="cdc_connections">'
'<legend>%s</legend>' % _('Our partners services')
'<ul>'
if identity.is_federated_with_msp():
msp_provider_id = get_cfg('adeline', {}).get('msp_idp')
klp = get_key_from_provider_id(msp_provider_id)
terminate_url = '/liberty/sp/%s/proxy_terminate' % klp
msp_portal_url = get_cfg('adeline', {}).get('msp_portal_url')
'<li>%s <span class="but-right"><a href="%s">%s</a> <a href="%s">%s</a></span></li>' % (
_('Access MSP'), msp_portal_url, _('Connect'), terminate_url, _('Remove Link'))
else:
'<li>%s <span class="but-right"><a href="/federate_msp">%s</a></span></li>' % (
_('Create a Link with MSP'), _('Connect'))
national_services = NationalService.select()
for nationalservice in national_services:
'<li><a href="%s">%s</a></li>' % (nationalservice.url,
_('Log on %s') % nationalservice.name)
'</ul>'
'</fieldset>'
'<fieldset class="cdc_connections">'
'<legend>%s</legend>' % _('Debug')
'<ul>'
'<li><a href="reset">%s</a></li>' % _('Reset account federations and offerings')
'</ul>'
'</fieldset>'
'</div> <!-- end of col2 -->'
'<div class="cdc_mrpropre">&nbsp;</div>'
def get_login_form(self):
identities_cfg = get_cfg('identities', {})
passwords_cfg = get_cfg('passwords', {})
login_cfg = get_cfg('login', {})
form = MspLoginForm(enctype="multipart/form-data", id = "login", use_tokens = False)
if not self.collectivity.own_idp:
if identities_cfg.get('email-as-username', False):
form.add(StringWidget, "username", title = _("Username (your email address)"), size=20, required=True)
else:
form.add(StringWidget, "username", title = _("Username"), size=20, required=True)
form.add(PasswordWidget, "password", title = _("Password"), size=20, required=True)
form.add_submit("submit", _("Log in"))
if self.collectivity.own_idp:
form.add_submit('collidp', _('Move to %s Identity Provider') % self.collectivity.name)
#if self.collectivity.propose_msp:
# form.add_submit('msp', _('Use my MSP Identity'))
if not self.collectivity.own_idp:
form.add_submit('cancel', _('Cancel'))
return form
def sso_to_msp(self, nameIdPolicy = 'none'):
msp_provider_id = get_cfg('adeline', {}).get('msp_idp')
if not msp_provider_id:
return template.error_page(_('MSP Identity Provider Id is not known.'))
lasso_login_dump = get_session().lasso_login_dump
extension = [('IdPortailRef', misc.get_lasso_server().providerId.replace('liberty/metadata', 'vandoeuvre'))]
collectivity_id = get_request().get_header('X-SPL-Collectivity-ID')
if collectivity_id:
extension.insert(0, ('SPLCOLID', collectivity_id))
if lasso_login_dump:
# coming from a service provider, pass Extension
login = lasso.Login.newFromDump(misc.get_lasso_server(), lasso_login_dump)
provider_key = get_key_from_provider_id(login.remoteProviderId)
msp_mapping = get_cfg('adeline', {}).get('msp_mapping', {})
try:
identity = identities.get_store().get_identity(get_session().user)
except KeyError:
identity = None
if identity:
if not hasattr(identity, 'msp_federations'):
identity.msp_federations = {}
splist = []
for k in identity.msp_federations.keys():
if identity.msp_federations[k]:
key = msp_mapping.get(get_key_from_provider_id(k))
if key:
splist.append(key)
if splist and False:
extension.append(('SPList', '%s' % ';'.join(splist)))
key = msp_mapping.get(provider_key)
if key:
extension.insert(0, ('ORISP', key))
if not extension:
extension = None
return self.liberty.perform_proxy_login(msp_provider_id, nameIdPolicy,
extensions = extension)
def federate_msp(self):
if get_field('idTs'):
for coll in collectivity.Collectivity.select():
if coll.msp_tsid == get_field('idTs'):
resp = get_response()
resp.set_header('X-Gdd-Account-Number', coll.id)
break
session = get_session()
if not session.user:
session.msp_init_federate = True
return redirect('login')
get_session().peer_cancelled = False
return self.sso_to_msp(nameIdPolicy = 'federated')
def login_msp(self):
if get_field('idTs'):
for coll in collectivity.Collectivity.select():
if coll.msp_tsid == get_field('idTs'):
resp = get_response()
resp.set_header('X-Gdd-Account-Number', coll.id)
break
get_session().peer_cancelled = False
return self.sso_to_msp()
def login(self):
move = get_request().cookies.has_key('msp-user')
if move and get_session().msp_login_dump is None:
get_session().peer_cancelled = False
return self.sso_to_msp()
collectivity_id = get_request().get_header('X-SPL-Collectivity-ID')
self.collectivity = collectivity.Collectivity()
if collectivity_id:
try:
self.collectivity = collectivity.Collectivity.get(collectivity_id)
get_session().collectivity_id = self.collectivity.id
except KeyError:
pass
session = get_session()
session.wrong_idp = False # just to be sure
form = self.get_login_form()
if form.is_submitted() and get_request().form.get('msp'):
get_session().peer_cancelled = False
return self.sso_to_msp()
if form.is_submitted() and get_request().form.get('msp'):
if session.lasso_login_dump:
login = lasso.Login.newFromDump(misc.get_lasso_server(), session.lasso_login_dump)
session.lasso_login_dump = None
return self.liberty.sso_after_authentication(login, False)
else:
# This button should finally federate accounts, not only make a SSO
return self.sso_to_msp(nameIdPolicy = 'federated')
if self.collectivity.own_idp and not get_session().peer_cancelled:
return self.liberty.perform_proxy_login(self.collectivity.own_idp)
if form.is_submitted() and form.get_submit() == 'collidp':
get_session().peer_cancelled = False
return self.liberty.perform_proxy_login(self.collectivity.own_idp)
# redirecting to MSP must only happen if this is SSO, not on direct
# access to this page
if session.lasso_login_dump:
move = get_request().cookies.has_key('msp-user')
# get user magic msp hash and check if he once said he agreed to
# move to msp for this service provider
try:
hash = MspUserHash.get_by_cookie()
except KeyError:
hash = None
if hash and hash.user_id:
try:
identity = identities.get_store().get_identity(hash.user_id)
except KeyError:
identity = None
if identity:
lasso_login_dump = session.lasso_login_dump
if hasattr(identity, 'msp_federations') and lasso_login_dump:
login = lasso.Login.newFromDump(misc.get_lasso_server(), lasso_login_dump)
move = identity.msp_federations.get(login.remoteProviderId, False)
if not move:
session.set_user(None)
if move:
# the user is ultimately coming from MSP
# nameIdPolicy is always none in first request to MSP
return self.sso_to_msp('none')
return self.login_html(form)
def login_html [html] (self, form):
# similar to authentic/login but not displaying 'lost password' and with
# option about MSP
authentication_failure = None
if form.is_submitted() and not form.has_errors():
lasso_login_dump = get_session().lasso_login_dump
try:
t = self.login_submit(form)
except LoginError:
authentication_failure = _('Authentication Failure')
get_logger().info('login page (after failed attempt)')
else:
return t
site_name = get_cfg('misc', {}).get('sitename', _('Login'))
template.html_top(site_name)
get_response().breadcrumb.append( ('login', _('Login')) )
get_response().filter['body_class'] = 'login'
identities_cfg = get_cfg('identities', {})
passwords_cfg = get_cfg('passwords', {})
login_cfg = get_cfg('login', {})
branding_cfg = get_cfg('branding', {})
'<div class="cdc_col1">'
'<p>Identifiez-vous pour acc&#xE9;der &#xE0; vos services personnalis&#xE9;s :</p>'
if authentication_failure:
'<div class="errornotice">%s</div>' % authentication_failure
form.render()
if not (get_session().msp_login_dump or get_session().msp_init_federate):
'''
<p class="petit">Vous &#xEA;tes nouveau &#xE0; Vand&#x153;uvre ou vous souhaitez vous y installer ? Ouvrez un compte afin de b&#xE9;n&#xE9;ficier de l'acc&#xE8;s aux services offerts par la mairie.</p>
'''
if identities_cfg.get('creation') in ('self', 'moderated'):
'''<div class="cdc_plus"><a href="register">Cr&#xE9;er mon compte</a></div>'''
'</div>'
'<div class="cdc_col2">'
if get_session().msp_login_dump or get_session().msp_init_federate:
'''
<div class="cdc_bloc" id="cdc_bloc_pourquoi">
<div class="cdc_tete">
<div class="cdc_coindroit">
<h2>POURQUOI LIER MES COMPTES ?</h2>
</div>
</div>
<div class="cdc_content">
<div class="cdc_fond">
<p>Cr&eacute;er une liaison entre vos<br />
comptes personnels Vandoeuvre<br />
et mon.service-public vous permettra de :</p>
<ul>
<li><span>vous authentifier une seule fois aupr&eacute;s des diff&eacute;rentes administrations.</span></li>
<li><span>pr&eacute;remplir les formulaires en ligne.</span></li>
<li><span>transmettre facilement et de mani&egrave;re s&eacute;curis&eacute;e, vos documents officiels.</span></li>
</ul>
</div>
</div>
<div class="cdc_pied">
<div class="cdc_coingauche">&nbsp;</div>
<div class="cdc_coindroit">&nbsp;</div>
<div class="cdc_mrpropre">&nbsp;</div>
</div>
</div>
'''
elif not get_request().cookies.has_key('msp-user'):
'''
<p>Si vous disposez d'un compte mon.service-public.fr, connectez-vous ci-dessous : </p>
<div class="cdc_boite_connexion">
<div class="cdc_bouton"><a href="%(login_msp_url)s">Connexion</a></div>
<div class="cdc_logo"><img src="/themes/adeline/images/logo_servicepublic_connexion.gif" alt="monservicepublic.fr" /></div>
<div class="cdc_mrpropre">&nbsp;</div>
</div>
<p class="petit">Vous n'avez pas encore de compte mon.service-public.fr ?<br />
Cr&#xE9;ez d&#xE8;s maintenant votre espace personnel pour simplifier vos &#xE9;changes avec l'administration ! </p>
<div class="cdc_plus"><a href="%(new_msp_account_url)s">Cr&#xE9;er mon compte</a></div>
''' % {'login_msp_url': '/login_msp',
'new_msp_account_url': get_cfg('adeline', {}).get('msp_new_account_url', '#')}
'</div>'
'<div class="cdc_mrpropre">&nbsp;</div>'
def login_success(self, identity):
session = get_session()
if session.msp_login_dump:
identity = identities.get_store().get_identity(get_session().user)
if get_request().form.get('tfConfirmFederationFromMSP') or \
get_response().filter.get('federated_with_msp'):
return self.liberty.proxyAssertionConsumer()
elif get_request().form.get('tfConfirmFederationFromMSP'):
if session.msp_init_federate:
session.msp_init_federate = False
return self.federate_msp()
return OldRootDirectory.login_success(self, identity)
def migration_done(self):
# To be called when the data service migration has completed
session = get_session()
if session.lasso_login_dump:
login = lasso.Login.newFromDump(get_lasso_server(), session.lasso_login_dump)
session.lasso_login_dump = None
return self.liberty.sso_after_authentication(login, True, proxied = True)
if session.after_url:
after_url = session.after_url
session.after_url = None
return redirect(after_url)
return redirect(get_request().environ['SCRIPT_NAME'] + '/')
def reset(self):
get_response().expire_cookie('msp-user',
domain = get_publisher().config.session_cookie_domain,
path = '/')
session = get_session()
if not session.user:
return redirect('login')
try:
identity = identities.get_store().get_identity(get_session().user)
except KeyError:
return template.error_page('Failed to get identity')
identity.lasso_dump = None
identity.lasso_proxy_dump = None
identities.get_store().save(identity)
return redirect('/')
def add_msp_cookie [html] (self):
response = get_response()
response.set_cookie('msp-user', 'true',
domain = get_publisher().config.session_cookie_domain,
expires = Cookie._getdate(3*365*86400),
path = '/')
template.html_top(_('Debug page'))
'<p>'
_('Out-of-flow MSP cookie set')
'</p>'
def logout(self):
get_logger().info('logout')
session = get_session()
if session and session.lasso_proxy_session_dump:
try:
return self.liberty.proxySingleLogout(session)
except lasso.Error:
# silently ignore the error and goes on to logout; this could
# have been fixed if cap gemini described their actions...
pass
get_session_manager().expire_session()
coll_id = get_request().get_header('X-Gdd-Account-Number')
if coll_id:
try:
coll = collectivity.Collectivity.get(coll_id)
if coll.url_on_logout:
return redirect(coll.url_on_logout)
except KeyError:
pass
return redirect('/')
def register [html] (self):
identities_cfg = get_cfg('identities', {})
if not identities_cfg.get('creation') in ('self', 'moderated'):
raise errors.TraversalError()
form = Form(enctype="multipart/form-data")
fields = [ identities.Field('name', N_('First Name / Last Name'), required = True, size = 30),
identities.Field('email', N_('Email'), required = True, size = 30) ]
for field in fields:
field.add_to_form(form) and None
has_email = 'email' in [x.key for x in identities.fields]
if not has_email or not identities_cfg.get('email-as-username', False):
form.add(StringWidget, "username", title=_("Username"), size=30, required=True)
passwords_cfg = get_cfg('passwords', {})
if passwords_cfg.get('can_change', False) and not passwords_cfg.get('generate', False):
form.add(PasswordWidget, "password", title=_("Password"), size=30, required=True)
else:
form.add(HtmlWidget, '<p>%s</p>' % _('A password will be mailed to you.'))
if passwords_cfg.get('lost_password_behaviour', 'nothing') == 'dumb_question':
dumb_questions_options = [(str(x), _(identities.dumb_questions[x])) \
for x in identities.available_dumb_questions]
form.add(HtmlWidget, '<p>%s</p>' % _('If you forget your password...'))
form.add(SingleSelectWidget, 'dumb_question',
title = _('Security question'), required=True,
options = [(None, _('[Select a question]'))] + dumb_questions_options)
form.add(StringWidget, 'smart_answer', title = _('Your answer'), required=True)
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.register_submit(form)
except RegistrationError:
pass
template.html_top(_('Registration'))
get_response().breadcrumb.append( ('register', _('Registration')) )
vars = {
'register_form': str(form.render())
}
return template.process_template(
str(TextsDirectory.get_html_text('register')), vars)
class MspLoginForm(Form):
def render(self):
if get_session().msp_login_dump or get_session().msp_init_federate:
return htmltext('''
<form method="post">
<fieldset>
<label for="username">Courriel</label>
<input type="text" id="username" name="username" />
</fieldset>
<fieldset>
<label for="password">Mot de passe</label>
<input type="password" id="password" name="password" />
</fieldset>
<fieldset class="cdc_valider">
<div class="cdc_plus"><a href="forgot_password">Mot de passe perdu ?</a></div>
</fieldset>
<fieldset class="cdc_coche">
<input type="checkbox" id="tfConfirmFederationFromMSP" name="tfConfirmFederationFromMSP" checked="checked"/>
<label for="tfConfirmFederationFromMSP">Cr&eacute;er une liaison avec <img src="/themes/adeline/images/logo_servicepublic_mini.gif" alt="mon servicepublic.fr" /></label>
</fieldset>
<fieldset class="cdc_valider">
<input type="submit" value="Valider" />
</fieldset>
</form>
''')
else:
return htmltext('''
<form method="post">
<fieldset>
<label for="username">Courriel</label>
<input type="text" id="username" name="username" />
</fieldset>
<fieldset>
<label for="password">Mot de passe</label>
<input type="password" id="password" name="password" />
</fieldset>
<fieldset class="cdc_valider">
<div class="cdc_plus"><a href="forgot_password">Mot de passe perdu ?</a></div>
<input type="submit" value="Valider" />
</fieldset>
</form>
''')
from qommon.publisher import get_publisher_class
get_publisher_class().root_directory_class = AlternateRootDirectory