This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic-old/authentic/root.ptl

1348 lines
53 KiB
Plaintext

import re
import os
import random
import string
import urllib
import base64
import Cookie
import lasso
from quixote import get_request, get_response, get_session, get_session_manager, get_publisher, redirect, get_field
from quixote.html import htmltext
from quixote.directory import Directory
from quixote.util import StaticDirectory
from authentic.form import *
from qommon import get_cfg, get_logger
from qommon import errors
from qommon import emails
from qommon.admin.emails import EmailsDirectory
from qommon.admin.texts import TextsDirectory
from qommon import template
from qommon.tokens import Token
import authentic.identities as identities
import authentic.misc as misc
import authentic.liberty.root as liberty_root
import authentic.liberty.saml2 as saml2_root
import authentic.liberty.idwsf2 as idwsf2_root
import authentic.admin.root as root
import admin.configuration as configuration
try:
import cas
except ImportError:
cas = None
import datetime
import login_token
class LoginError(Exception):
pass
class RegistrationError(Exception):
pass
class ForgotPassword(RuntimeError):
pass
class CookieSetterDirectory(Directory):
_q_exports = ['', 'idpintro']
def _q_index [html] (self):
template.html_top()
_('This domain is not for humans, it is only used to set identity '
'provider discovery cookie.')
def idpintro(self):
tok = get_request().form.get('tok')
token = Token.get(tok)
session = get_session_manager().get(token.session_id)
request = get_request()
try:
intro_cookie = request.cookies['_saml_idp']
except KeyError:
intro_cookie = ''
intro_cookie_q = urllib.unquote(intro_cookie)
splitted_cookie = [x for x in intro_cookie_q.split(' ') if x]
succinct_id = base64.encodestring(token.provider_id).strip()
if succinct_id in splitted_cookie:
splitted_cookie.remove(succinct_id)
splitted_cookie.append(succinct_id)
new_cookie = urllib.quote(' '.join(splitted_cookie))
if new_cookie != intro_cookie:
response = get_response()
response.set_cookie('_saml_idp', new_cookie,
domain = '.' + token.common_domain, path = '/',
expires = Cookie._getdate(3*365*86400))
token.remove_self()
return redirect(token.next_url)
class RootDirectory(Directory):
_q_exports = ['', 'admin', 'liberty', 'login', 'logout', 'change_password', 'register',
'forgot_password', 'update_info', 'saml', 'singleLogout',
'federations', 'login_local', 'login_ssl', 'associate_certificate',
'themes', 'disco', 'cas', 'forgot_identifier']
admin = root.RootDirectory()
disco = idwsf2_root.DiscoveryServiceDirectory()
saml = saml2_root.RootDirectory()
liberty = liberty_root.RootDirectory()
themes = template.ThemesDirectory()
if cas:
cas = cas.CASDirectory()
def _q_traverse(self, path):
if get_request().environ.get('HTTPS') and not get_request().session.ssl and get_request().session.id:
get_session_manager().expire_session()
get_request().session = get_session_manager().get_session()
fn = os.path.join(get_publisher().app_dir, 'common_cookie')
if os.path.exists(fn):
# on special domain to set cookie, nothing else, let's change root
get_publisher().app_dir = open(fn).read()
return CookieSetterDirectory()._q_traverse(path)
session = get_session()
if session:
get_request().user = session.get_user()
else:
get_request().user = None
response = get_response()
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
return Directory._q_traverse(self, path)
def _q_lookup(self, component):
if component in ('css','images'):
return StaticDirectory(os.path.join(get_publisher().data_dir, 'web', component), follow_symlinks = True)
if component == 'qo':
dirname = os.path.join(get_publisher().data_dir, 'qommon')
return StaticDirectory(dirname, follow_symlinks = True)
if get_publisher().WEBROOT_DIR:
dirname = os.path.abspath(os.path.join(get_publisher().WEBROOT_DIR, component))
if os.path.exists(dirname):
return StaticDirectory(dirname, follow_symlinks = True)
raise errors.TraversalError()
def _q_index [html] (self):
session = get_session()
if not session or not session.user:
return self.login()
try:
identities.get_store().load_identities()
except identities.IdentityStoreException:
return template.error_page(_('Failed to connect to identities storage.'))
identities.get_store().connect(session)
try:
identity = identities.get_store().get_identity(session.user)
except KeyError:
# identity no longer available; perhaps identity store changed ?
return self.logout()
alternate_homepage_url = configuration.get_configuration(str('homepage')).get(str('alternate_homepage_url'))
if alternate_homepage_url:
return redirect(alternate_homepage_url)
identities_cfg = get_cfg('identities', {})
branding_cfg = get_cfg('branding', {})
passwords_cfg = get_cfg('passwords', {})
ssl_cfg = get_cfg('ssl', {})
template.html_top(_('Account Management'))
get_response().breadcrumb.append( ('', _('Account Management')) )
allow_certificate_federation = ssl_cfg.get('allow_certificate_federation', False)
vars = {
'can_change_password': str(passwords_cfg.get('can_change', False)),
'creation_mode': identities_cfg.get('creation'),
'identity_label': str(identity),
'idp_sso_list': str(self.get_idp_sso_list()),
'federations_list': str(self.get_idp_federations_list(identity)),
'admin': identity.is_admin(),
'show_federations': get_cfg('idp', {}).get('defederation', False),
}
if allow_certificate_federation:
vars['allow_certificate_federation_url'] = htmltext('https://' + get_request().environ['HTTP_HOST'] + get_request().environ['SCRIPT_NAME'] + '/associate_certificate')
certificates = [ x for x in identity.accounts if isinstance(x, identities.CertificateAccount) ]
certificate_list = '<p>' + _('Certificates federated:') + '\n<ol>\n'
for x in certificates:
certificate_list = certificate_list + '<li>' + htmltext(x.dn or x.certificate_sha1) + '</li>\n'
certificate_list = certificate_list + '</ol>\n</p>\n'
if certificates:
vars['certificate_list'] = certificate_list
return template.process_template(
str(TextsDirectory.get_html_text('account')), vars)
def federations [html] (self):
if not get_cfg('idp', {}).get('defederation', False):
return template.error_page(_('You are not allowed to manipulate your federations'))
session = get_session()
if not session or not session.user:
return redirect(get_request().environ['SCRIPT_NAME'] + '/login')
try:
identities.get_store().load_identities()
except identities.IdentityStoreException:
return template.error_page(_('Failed to connect to identities storage.'))
identities.get_store().connect(session)
try:
identity = identities.get_store().get_identity(session.user)
except KeyError:
# identity no longer available; perhaps identity store changed ?
return self.logout()
identities_cfg = get_cfg('identities', {})
branding_cfg = get_cfg('branding', {})
passwords_cfg = get_cfg('passwords', {})
template.html_top(_('Federations'))
get_response().breadcrumb.append( ('federations', _('Federations')) )
vars = {
'identity_label': str(identity),
'federations_list': str(self.get_idp_federations_list(identity))
}
return template.process_template(
str(TextsDirectory.get_html_text('federations')), vars)
def get_idp_sso_list [html] (self):
if not get_cfg('providers', {}).items():
return ''
'<ul>'
for klp, lp in get_cfg('providers', {}).items():
if lp['role'] == lasso.PROVIDER_ROLE_IDP:
continue # only allows initiated login to service providers
if lp.get('idp_initiated_sso', True) is False:
continue
try:
provider, label = misc.get_provider_and_label(klp)
except KeyError:
continue
if hasattr(provider, str('getProtocolConformance')) and \
provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
url = 'saml/sp/%s/login' % klp
else:
url = 'liberty/sp/%s/login' % klp
'<li><a href="%s">%s</a></li>' % (
url, htmltext(_('Log on %s') % label))
'</ul>'
def get_idp_federations_list [html] (self, identity):
if not identity.lasso_dump:
return None
'<ul class="FederationsWidget">'
server = misc.get_lasso_server()
lasso_identity = lasso.Identity.newFromDump(identity.lasso_dump)
for k in lasso_identity.providerIds:
klp = misc.get_provider_key(k)
try:
provider, label = misc.get_provider_and_label(klp)
except KeyError:
continue
if hasattr(provider, str('getProtocolConformance')) and \
provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
url = 'saml/sp/%s/terminate' % klp
else:
url = 'liberty/sp/%s/terminate' % klp
'<li>%s <a href="%s" class="terminate">%s</a></li>' % (
htmltext(label), url, _('Terminate Federation'))
'</ul>'
@misc.protect_form_from_get_parameters
def change_password [html] (self):
pwd_cfg = configuration.get_configuration(str('passwords'))
if not pwd_cfg.get('can_change'):
raise errors.AccessForbiddenError()
session = get_session()
if not session or not session.user:
raise errors.AccessUnauthorizedError()
identities.get_store().load_identities()
identities.get_store().connect(session)
try:
identity = identities.get_store().get_identity(session.user)
except KeyError:
raise errors.AccessError()
for account in identity.accounts:
if hasattr(account, str('password')):
break
else:
return template.error_page(_('No password for this identity'))
pw_account = account
form = Form()
form.keep_referer()
form.add(PasswordWidget, "password", title=_("Current Password"), required=True)
form.add(PasswordWidget, "new_password", title=_("New Password"), required=True)
form.add(PasswordWidget, "new2_password", title=_("New Password (confirm)"), required=True)
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if get_request().get_method() == 'GET':
get_request().form = {}
if form.get_submit() == 'cancel':
return misc.redirect_to_return_url() or \
misc.redirect_to_referer(form) or \
misc.redirect_home()
if form.is_submitted() and not form.has_errors():
# check current password
account = identities.PasswordAccount()
account.username = identities.get_store().get_identity(session.user).accounts[0].username
account.password = form.get_widget('password').parse()
if not identities.get_store().get_identity_for_account(account):
form.set_error('password', _('Wrong password'))
# check new password is ok
new_password = form.get_widget('new_password').parse()
new2_password = form.get_widget('new2_password').parse()
if new_password != new2_password:
form.set_error('new2_password', _('Passwords do not match'))
else:
min_length = pwd_cfg.get('min_length')
if len(new_password) < min_length:
form.set_error('new_password',
_('Password is too short. It must be at least %d characters.') % min_length)
max_length = pwd_cfg.get('max_length')
if max_length and len(new_password) > max_length:
form.set_error('new_password',
_('Password is too long. It must be at most %d characters.') % max_length)
if new_password == account.password:
form.set_error('new_password', _('New password is the same as the old one'))
if form.is_submitted() and not form.has_errors():
get_logger().info('changed password')
pw_account.password = str(form.get_widget('new_password').parse())
identities.get_store().save(identity)
return misc.redirect_to_return_url() or \
misc.redirect_to_referer(form) or \
misc.redirect_home()
if not form.is_submitted():
get_logger().info('changing password page')
else:
get_logger().info('changing password page (had_errors)')
form.get_widget('new2_password').set_value('')
template.html_top(_('Changing Password'))
get_response().breadcrumb.append( ('', _('Account Management')) )
get_response().breadcrumb.append( ('change_password', _('Changing Password')) )
vars = {
'change_password_form': str(form.render())
}
return template.process_template(
str(TextsDirectory.get_html_text('change_password')), vars)
def get_login_form(self,cancel=False):
identities_cfg = get_cfg('identities', {})
passwords_cfg = get_cfg('passwords', {})
login_cfg = get_cfg('login', {})
ssl_cfg = get_cfg('ssl', {})
form = Form(enctype="multipart/form-data", id = "login", method = 'post', use_tokens = False)
if identities_cfg.get('email-as-username', False):
# keep it as StringWidget since every identity may not be using
# their email address (example: admin account)
form.add(StringWidget, "username", title = _("Email"), size=20, required = True)
else:
form.add(StringWidget, "username", title = _("Username"), size=20, required = True)
form.add(PasswordWidget, "password", title = _("Password"), size=20, required = True)
if get_cfg('idp', {}).get('idff_proxy'):
options = [('', _('None'))]
for klp, lp in get_cfg('providers', {}).items():
if lp['role'] == lasso.PROVIDER_ROLE_SP:
continue
try:
p, label = misc.get_provider_and_label(klp)
except KeyError:
continue
options.append((p.providerId, label))
if len(options) > 1:
form.add(SingleSelectWidget, "idp", title = _('Proxies request to'), options = options)
form.add_submit('submit', _('Log in'))
if ssl_cfg.get('allow_ssl_login', False):
form.add_submit('ssl', _('Log in using SSL certificate'))
if login_cfg.get('cancel_button') and cancel:
form.add_submit('cancel', _('Cancel'))
return form
def login_local(self):
if not get_cfg('idp', {}).get('direct_proxy'):
raise errors.TraversalError()
return self.login(allow_direct = False)
def login [html] (self, allow_direct = True): # only works with PasswordAccount for now
request = get_request()
# Already logged no login token (not a ForceAuthn login)
if request.user is not None and (not get_field('LoginToken')
or login_token.LoginToken.get(get_field('LoginToken'), ignore_errors=True) is None):
return misc.redirect_home()
if allow_direct and get_cfg('idp', {}).get('direct_proxy'):
for klp, lp in get_cfg('providers', {}).items():
if lp['role'] == lasso.PROVIDER_ROLE_SP:
continue
try:
provider, label = misc.get_provider_and_label(klp)
except KeyError:
continue
get_logger().info('login request, direct proxying to %s' % provider.providerId)
if hasattr(provider, str('getProtocolConformance')) and \
provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
liberty_backend = self.saml
else:
liberty_backend = self.liberty
liberty_backend.perform_proxy_login(provider.providerId,
relay_state = get_field('LoginToken'))
identities_cfg = get_cfg('identities', {})
passwords_cfg = get_cfg('passwords', {})
branding_cfg = get_cfg('branding', {})
ssl_cfg = get_cfg('ssl', {})
cancel=get_request().form.has_key('cancelURL')
save_form=None
if get_request().get_method() == 'GET':
save_form=get_request().form
get_request().form={}
form = self.get_login_form(cancel=cancel)
if form.is_submitted() and form.get_widget('idp') and form.get_widget('idp').parse():
remote_idp = form.get_widget('idp').parse()
get_logger().info('login page, proxying to %s' % remote_idp)
remote_idp_key = misc.get_provider_key(remote_idp)
provider, label = misc.get_provider_and_label(remote_idp_key)
if hasattr(provider, str('getProtocolConformance')) and \
provider.getProtocolConformance() == lasso.PROTOCOL_SAML_2_0:
return self.saml.perform_proxy_login(remote_idp)
else:
return self.liberty.perform_proxy_login(remote_idp)
if form.is_submitted() and form.get_submit() == 'cancel':
get_logger().info('login page, cancel')
return self.login_cancel()
if form.is_submitted() and form.get_submit() == 'ssl' and ssl_cfg.get('allow_ssl_login', False):
return misc.redirect_with_same_qs(str(get_request().environ['SCRIPT_NAME'] + '/login_ssl'))
authentication_failure = None
if form.is_submitted() and not form.has_errors():
try:
return self.login_submit(form)
except LoginError:
authentication_failure = _('Authentication Failure')
get_logger().info('login page (after failed attempt)')
else:
get_logger().info('login page')
template.html_top(_('Login'))
get_response().breadcrumb.append( ('login', _('Login')) )
vars = {
'login_form': str(form.render()),
'authentication_failure': authentication_failure,
'cookies': get_request().cookies,
'lost_password_behaviour': passwords_cfg.get('lost_password_behaviour', 'nothing'),
'creation_mode': identities_cfg.get('creation'),
}
get_request().form=save_form
return template.process_template(
str(TextsDirectory.get_html_text('login')), vars)
def login_ssl (self):
request = get_request()
response = get_response()
session = get_session()
environ = request.environ
self.close_header(response)
ssl_cfg = get_cfg('ssl', {})
allow_ssl_login = ssl_cfg.get('allow_ssl_login', False)
if not environ.get('HTTPS'):
return redirect(request.environ['SCRIPT_NAME'] + '/')
ssl_session_id = environ.get('SSL_SESSION_ID')
ssl_client_cert = environ.get('SSL_CLIENT_CERT')
ssl_client_s_dn = environ.get('SSL_CLIENT_S_DN')
ssl_client_verify = environ.get('SSL_CLIENT_VERIFY','')
if not allow_ssl_login:
raise errors.TraversalError()
allow_certificate_federation = ssl_cfg.get('allow_certificate_federation', False)
# Find an identity
identity = None
if ssl_client_s_dn and ssl_client_verify == 'OK':
id = ssl_client_s_dn
dn_to_id_regexp = ssl_cfg.get('dn_to_id_regexp')
if dn_to_id_regexp:
try:
result = re.search(dn_to_id_regexp, id)
id = result.group(1)
except:
get_logger().warn('Wrong dn to id regexp, must at least one subgroup, dn: %s regexp: %s' % (ssl_client_s_dn, dn_to_id_regexp))
raise errors.AccessForbiddenError()
try:
identity = identities.get_store().get_identity(id)
except KeyError:
try:
identity = identities.get_store().get_identity_for_username(id)
except KeyError:
pass
if identity:
get_logger().info('Mapped identity from certificate with dn: %s to account %s' % (ssl_client_s_dn, identity.id))
if not identity and allow_certificate_federation and ssl_client_cert: # and not ssl_client_verify.startswith('FAILED:')
account = identities.CertificateAccount(certificate = ssl_client_cert)
identity = identities.get_store().get_identity_for_account(account)
if not identity:
get_logger().warn('SSL Authentification failure')
return misc.redirect_home()
if identity.disabled:
get_logger().warn('SSL Authentification Failure: identity %s disabled' % identity.id)
return self.login_success(identity.id, 'client-certificate')
def close_header(self, response):
response.set_header('Connection', 'close')
def associate_certificate [html] (self):
request = get_request()
response = get_response()
self.close_header(response)
session = get_session()
environ = request.environ
ssl_cfg = get_cfg('ssl', {})
ssl_session_id = environ.get('SSL_SESSION_ID')
ssl_client_cert = environ.get('SSL_CLIENT_CERT')
ssl_client_s_dn = environ.get('SSL_CLIENT_S_DN')
ssl_client_verify = environ.get('SSL_CLIENT_VERIFY','')
https = environ.get('HTTPS')
if not https:
retry_url = 'https://' + request.get_server() + request.get_path()
return template.error_page(_('This action needs an HTTPS connection'),
continue_to = (retry_url , _('Home')))
if not ssl_client_cert:
return template.error_page(_('This action needs that you present an SSL certificate, maybe the server is not configured to ask one ?')
, continue_to = ('.', _('Home')))
allow_certificate_federation = ssl_cfg.get('allow_certificate_federation', False)
allow_ssl_login = ssl_cfg.get('allow_ssl_login', False)
if allow_certificate_federation and allow_ssl_login and session.user:
if ssl_client_cert:
# Support only one certificate federation
account = identities.CertificateAccount(certificate = ssl_client_cert, dn = ssl_client_s_dn)
identity = identities.get_store().get_identity(session.user)
identity.accounts = [ x for x in identity.accounts if not isinstance(x, identities.CertificateAccount) ]
identity.accounts.append(account)
identity.store()
return redirect('.')
else:
raise errors.TraversalError()
def more_login_text(self):
pass # placeholder to be subclassed XXX: inoperant now
def login_submit(self, form):
identities.get_store().load_identities()
account = identities.PasswordAccount()
account.username = form.get_widget('username').parse()
account.password = form.get_widget('password').parse()
try:
identity = identities.get_store().get_identity_for_account(account)
except identities.IdentityStoreException:
# aie
if os.path.exists(os.path.join(get_publisher().app_dir, 'emergency')):
session = get_session()
session.set_user(1)
if session.after_url:
after_url = session.after_url
session.after_url = None
return redirect(after_url)
return redirect(get_request().environ['SCRIPT_NAME'] + '/')
return template.error_page(_('Failed to connect to identities storage.'))
if identity is None:
passwords_cfg = get_cfg('passwords', {})
if passwords_cfg.get('log_wrong_passwords', False):
get_logger().warn('Authentication Failure (un: %s pw: %s)' % (
account.username.replace('\n', ' ').replace('\r', ' '),
account.password.replace('\n', ' ').replace('\r', ' ')))
else:
get_logger().warn('Authentication Failure (un: %s)' % (
account.username.replace('\n', ' ').replace('\r', ' ')))
raise LoginError()
if identity.disabled:
get_logger().warn('Authentication Failure (un: %s) (disabled)' % (
account.username.replace('\n', ' ').replace('\r', ' ')))
raise LoginError()
if get_request().environ.get('HTTPS') == 'on':
authentication_method = 'password-on-https'
else:
authentication_method = 'password'
result = self.login_success(identity.id, authentication_method)
identities.get_store().init_session(get_session(), account)
return result
def login_cancel(self):
login_token_id = get_field('LoginToken')
if login_token_id:
login_token.LoginToken.set_authentication_result(login_token_id,
False)
return misc.redirect_to_return_url('cancel') or \
misc.redirect_to_return_url() or \
misc.redirect_home()
def redirect_to_common_domain_setter_url(self):
common_domain = get_cfg('idp', {}).get('common_domain')
common_domain_setter_url = get_cfg('idp', {}).get('common_domain_setter_url')
if common_domain_setter_url and common_domain:
token = Token(expiration_delay = 600) # ten minutes
token.session_id = session.id
token.protocol = 'local'
server = misc.get_lasso_server(protocol = 'saml2')
token.provider_id = server.providerId
token.common_domain = common_domain
token.next_url = get_request().get_url(-1)
token.store()
session.lasso_login_dump = None
return redirect(common_domain_setter_url + '?tok=%s' % token.id)
return None
def login_success(self, user, method):
# Setup current session
get_session_manager().expire_session()
get_session().set_user(user)
get_session().authentication_method = method
get_request().user = user
get_session().authentication_instant = datetime.datetime.utcnow()
login_token_id = get_field('LoginToken')
if login_token_id:
login_token.LoginToken.set_authentication_result(login_token_id,
True, user = user, method = method)
get_session().__dict__.setdefault('login_tokens',[]) \
.append(login_token_id)
return misc.redirect_to_return_url('ok') or \
misc.redirect_to_return_url() or \
misc.redirect_to_after_url() or \
self.redirect_to_common_domain_setter_url() or \
misc.redirect_home()
def singleLogout(self):
return self.logout()
def logout(self):
# if a user is still present, try a liberty or saml logout
if get_session().user and get_session().lasso_session_dump:
if get_session().saml2:
return self.saml.slo_idp()
else:
return self.liberty.singleLogout()
else:
# expire session
get_logger().info('logout')
get_session_manager().expire_session()
return misc.redirect_home()
def register [html] (self):
identities_cfg = get_cfg('identities', {})
if not identities_cfg.get('creation') in ('self', 'moderated'):
raise errors.TraversalError()
form = Form(enctype="multipart/form-data")
for field in identities.get_store_class().fields:
field.add_to_form(form) and None
has_email = 'email' in [x.key for x in identities.get_store_class().fields]
if not has_email or not identities_cfg.get('email-as-username', False):
form.add(ValidatedStringWidget, "username", title=_("Username"),
size=30, required=True,
regex=identities.get_store().username_regex)
passwords_cfg = get_cfg('passwords', {})
if passwords_cfg.get('can_change', False) and not passwords_cfg.get('generate', False):
form.add(PasswordWidget, "password", title=_("Password"), size=30, required=True)
else:
form.add(HtmlWidget, '<p>%s</p>' % _('A password will be mailed to you.'))
if passwords_cfg.get('lost_password_behaviour', 'nothing') == 'dumb_question':
dumb_questions_options = [(str(x), _(identities.dumb_questions[x])) \
for x in identities.available_dumb_questions]
form.add(HtmlWidget, '<p>%s</p>' % _('If you forget your password...'))
form.add(SingleSelectWidget, 'dumb_question',
title = _('Security question'), required=True,
options = [(None, _('[Select a question]'))] + dumb_questions_options)
form.add(StringWidget, 'smart_answer', title = _('Your answer'), required=True)
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.register_submit(form)
except RegistrationError:
pass
template.html_top(_('Registration'))
get_response().breadcrumb.append( ('register', _('Registration')) )
vars = {
'register_form': str(form.render())
}
return template.process_template(
str(TextsDirectory.get_html_text('register')), vars)
def register_submit(self, form):
identities_cfg = get_cfg('identities', {})
pwd_cfg = configuration.get_configuration('passwords')
store = identities.get_store()
account = identities.PasswordAccount()
has_email = 'email' in [x.key for x in identities.get_store_class().fields]
if identities_cfg.get('email-as-username', False):
account.username = form.get_widget('email').parse()
else:
account.username = form.get_widget('username').parse()
if pwd_cfg.get('lost_password_behaviour') == 'dumb_question':
account.dumb_question = form.get_widget('dumb_question').parse()
account.smart_answer = form.get_widget('smart_answer').parse()
if store.has_identity_with_username(account.username):
if identities_cfg.get('email-as-username', False):
form.get_widget('email').set_error(_('That address is already in use'))
else:
form.get_widget('username').set_error(_('That username is already in use'))
raise RegistrationError()
if pwd_cfg.get('can_change') and not pwd_cfg.get('generate'):
password = form.get_widget('password').parse()
if len(account.password) < min_pw_length:
form.set_error('password',
_('Password is too short. It must be at least %d characters.') % \
min_pw_length)
raise RegistrationError()
if max_pw_length and len(account.password) > max_pw_length:
form.set_error('password',
_('Password is too long. It must be at most %d characters.') % \
max_pw_length)
raise RegistrationError()
else:
password = store.create_password(for_account=account.username)
account.password = store.hash_password(password)
identity = store.get_identity_class()()
if not identity.id:
identity.id = account.username # or sequence ?
for field in identities.get_store_class().fields:
if form.get_widget(field.key):
setattr(identity, field.key, form.get_widget(field.key).parse())
identity.accounts = [account]
self.pre_registration_callback(identity)
try:
store.add(identity)
except identities.AlreadyExists, e:
if e.args:
keys = e.args[0]
for key in keys:
form.set_error(key, _('This value must be unique but it already exists for another user'))
raise RegistrationError()
if identities_cfg.get('creation') == 'moderated':
identity.disabled = True
else:
try:
self.email_password(identity, welcome = True,
password = password)
except errors.EmailError:
store.remove(identity)
get_logger().error('Error emailing password to user (%s)' % identity.email)
return template.error_page(
_('An error occured and your password could not be send. '
'Is your email address correct?'))
except ForgotPassword, e:
message = e.args[0]
get_logger().error('Registration password sending failed for %s: %s' % (account.username, message))
return template.error_page(message)
if identities_cfg.get('notify-on-register', False):
self.notify_registration(identity)
if store.count() == 0:
# first user created gets admin role
identity.roles = [identities.ROLE_ADMIN]
store.save(identity)
get_logger().info('User created new identity (%s)' % identity)
self.registration_callback(identity)
if identities_cfg.get('creation') == 'self' and not pwd_cfg.get('generate'):
return redirect(get_request().environ['SCRIPT_NAME'] + '/login')
elif identities_cfg.get('creation') == 'moderated':
return self.moderated_answer()
else:
return self.register_done_password_sent()
def pre_registration_callback(self, identity):
pass # for derivatives to override
def registration_callback(self, identity):
pass # for derivatives to override
def moderated_answer [html] (self):
template.html_top(_('Registration'))
'Registration is moderated.'
def notify_registration(self, identity):
identities_cfg = get_cfg('identities', {})
admins = identities.get_store().administrators()
admin_emails = [x.email for x in admins if x.email]
if not admin_emails:
return
data = {
'hostname': get_request().get_server(),
'identity': str(identity),
'email': identity.email,
'email_as_username': str(identities_cfg.get('email-as-username', False)),
'username': identity.accounts[0].username,
'service': get_session().service,
}
emails.custom_ezt_email('new-registration-admin-notification', data,
admin_emails, fire_and_forget = True)
def register_done_password_sent [html] (self):
template.html_top(_('Registration Completed'))
get_response().breadcrumb.append( ('registration', _('Registration Completed')) )
branding_cfg = get_cfg('branding', {})
return template.process_template(
str(TextsDirectory.get_html_text('register_completed')), {})
def email_password(self, identity, welcome = False, password = None):
store = identities.get_store()
for account in identity.accounts:
if hasattr(account, 'username'):
break
else:
account = None
if not account:
raise ForgotPassword(_('Your account has no password, it \
certainly uses another kind of authentication, contact an administrator.'))
if not identity.email:
raise ForgotPassword(_('Your account has no email, contact an \
administrator'))
identities_cfg = configuration.get_configuration('identities')
pwd_cfg = configuration.get_configuration('passwords')
if not password:
password = account.password
if password is None or identities.get_pwd_hashing_scheme() != 'clear' or \
pwd_cfg.get('generate_on_remind') or \
identities.get_hash_format(password) is not None:
password = store.create_password(for_account=account.username)
account.password = store.hash_password(password)
store.save(identity)
data = {
'hostname': get_request().get_server(),
'identity': str(identity),
'email': identity.email,
'email_as_username': str(identities_cfg.get('email-as-username')),
'username': identity.accounts[0].username,
'password': password,
'service': get_session().service,
}
if welcome:
emails.custom_ezt_email('welcome-email', data, identity.email)
else:
emails.custom_ezt_email('password-email', data, identity.email)
def forgot_password_message [html] (self):
template.html_top(_('Lost Password'))
return template.process_template(
str(TextsDirectory.get_html_text('lost_password_mailed')), vars)
def forgot_password_dumb_question [html] (self, form):
identities_cfg = get_cfg('identities', {})
username = form.get_widget('username').parse()
identity = identities.get_store().get_identity_for_username(username)
if identity:
for account in identity.accounts:
if hasattr(account, str('username')):
break
else:
raise 'XXX'
else:
account = identities.PasswordAccount()
if not get_session().question_key:
get_session().question_key = random.choice(identities.available_dumb_questions)
account.dumb_question = get_session().question_key
get_session().question_key = account.dumb_question
form = Form(enctype="multipart/form-data", use_tokens = False)
form.add_hidden("username", username)
if not hasattr(account, str('dumb_question')):
return template.error_page('No password question for this identity')
if not identities.dumb_questions.has_key(account.dumb_question):
return template.error_page('No password question for this identity')
if identities_cfg.get('email-as-username', False):
form.add(HtmlWidget, '<p>%s: %s</p>' % (_('Email'), username))
else:
form.add(HtmlWidget, '<p>%s: %s</p>' % (_('Username'), username))
form.add(StringWidget, "answer",
title = _(identities.dumb_questions[account.dumb_question]),
required = True, size = 30)
answer = form.get_widget('answer').parse()
user_answer = account.smart_answer or ''
if answer and str(answer).lower().strip() != str(user_answer).lower().strip():
return template.error_page(_('Wrong answer or inexistant user'))
form.set_error('answer', _('Wrong answer'))
if not get_request().form.has_key('answer'):
form.set_error('answer', None)
if not form.has_errors() and get_request().form.has_key('answer'):
return self.forgot_password_submit(form) or \
self.forgot_password_message()
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
vars = {
'lost_password_question_form': str(form.render())
}
return template.process_template(
str(TextsDirectory.get_html_text('lost_password_question')), vars)
def forgot_password [html] (self):
get_response().breadcrumb.append( ('forgot_password', _('Lost Password')) )
passwords_cfg = get_cfg('passwords', {})
behaviour = passwords_cfg.get('lost_password_behaviour', 'nothing')
if behaviour == 'nothing':
raise errors.AccessForbiddenError()
form = Form(enctype="multipart/form-data", use_tokens = False)
identities_cfg = get_cfg('identities', {})
if identities_cfg.get('email-as-username', False):
form.add(EmailWidget, "username", title= _('Email'), size=30, required=True)
else:
form.add(StringWidget, "username", title=_("Username"), size=30, required=True)
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
if behaviour == 'dumb_question':
return self.forgot_password_dumb_question(form)
else:
return self.forgot_password_submit(form) or \
self.forgot_password_message()
else:
vars = {
'lost_password_behaviour': str(behaviour),
'lost_password_form': str(form.render()),
'generate_on_remind': passwords_cfg.get('generate_on_remind', False)
}
template.html_top(_('Lost Password'))
return template.process_template(
str(TextsDirectory.get_html_text('lost_password')), vars)
def forgot_password_submit(self, form):
username = form.get_widget('username').parse()
try:
identity = identities.get_store().get_identity_for_username(
username,throw=True)
except identities.TooMuchAccounts:
get_logger().warning('Forgot password: more than one account for identifier %r' % username)
return template.error_page(htmltext(_('There is more than one accounts for the identifier <em>%s</em>, try to \
<a href="/forgot_identifier">ask for all your accounts</a>')) % username)
if not identity:
get_logger().warning('Forgot password: no account for username %r' % username)
return template.error_page(_('Your identity %r is unknown.') % username)
try:
self.email_password(identity)
except errors.EmailError:
get_logger().error('lost password -> email password (%s) (failure)' % username)
return template.error_page(_('An error occured and your password could not be send.'))
except ForgotPassword, e:
message = e.args[0]
get_logger().error('lost password -> %s: %s' % (username, message))
return template.error_page(message)
get_logger().info('lost password -> email password of %s to %s' % (username, identity.email))
@misc.protect_form_from_get_parameters
def update_info [html] (self):
identities_cfg = get_cfg('identities', {})
if identities_cfg.get('creation') != 'self':
raise errors.TraversalError()
session = get_session()
if not session or not session.user:
raise errors.AccessForbiddenError()
form = Form(enctype="multipart/form-data")
form.keep_referer()
skip_email = identities_cfg.get('email-as-username', False)
identity = identities.get_store().get_identity(session.user)
for field in identities.get_store_class().fields:
if field.key == 'email' and skip_email:
continue
field.add_to_form(form, identity = identity) and None
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if get_request().get_method() == 'GET':
get_request().form = {}
if form.get_submit() == 'cancel':
return misc.redirect_to_return_url() or \
misc.redirect_to_referer(form) or \
misc.redirect_home()
if form.is_submitted() and not form.has_errors():
return self.update_info_submit(form, identity)
template.html_top(_('Updating Personal Information'))
get_response().breadcrumb.append( ('', _('Account Management')) )
get_response().breadcrumb.append( ('update_info', _('Updating Personal Information')) )
vars = {
'info_form': str(form.render())
}
return template.process_template(
str(TextsDirectory.get_html_text('update_info')), vars)
def update_info_submit(self, form, identity):
identities_cfg = get_cfg('identities', {})
skip_email = identities_cfg.get('email-as-username', False)
for field in identities.get_store_class().fields:
if field.key == 'email' and skip_email:
continue
if form.get_widget(field.key):
setattr(identity, field.key, form.get_widget(field.key).parse())
identities.get_store().save(identity)
get_logger().info('Updated identity %r' % identity.id)
return misc.redirect_to_return_url() or \
misc.redirect_to_referer(form) or \
misc.redirect_home()
def forgot_identifier(self):
get_response().breadcrumb.append( ('forgot_identifier', _('Lost Account Name')) )
form = Form(enctype="multipart/form-data", use_tokens = False)
form.add(EmailWidget, "email", title= _('Email'), size=30, required=True)
form.add_submit("submit", _("Submit"))
form.add_submit("cancel", _("Cancel"))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
res, error = self.forgot_identifier_submit(form)
if not error:
return res
form.set_error('email', error)
vars = {
'lost_identifier_form': str(form.render())
}
template.html_top(_('Lost Account Name'))
return template.process_template(
str(TextsDirectory.get_html_text('lost_identifier')), vars)
def forgot_identifier_submit(self, form):
email = form.get_widget('email').parse()
idents = identities.get_store().get_identities_by_attributes({
'email': email })
usernames = []
for ident in idents:
for account in ident.accounts:
if hasattr(account, 'username'):
usernames.append(account.username)
if not usernames:
return None, _('There is no account with this email')
data = {
'hostname': get_request().get_server(),
'email': email,
'usernames': usernames,
'service': get_session().service,
}
get_logger().info('Forgot identifier: reminded email %r of its accounts' % email)
try:
emails.custom_ezt_email('identifier-email', data, email)
except:
return None, _('Error when sending the mail')
get_session().message = ('info', _('Your identifiers have been send to %s'))
return misc.redirect_home(), None
TextsDirectory.register('account',
N_('Account Management'),
hint = N_('Available variables: identity_label, idp_sso_list, show_federations, federations_list, certificate_list, allow_certificate_federation'),
default = N_('''\
<h2 class="identity-title">[identity_label]</h2>
[is creation_mode "self"]
<p><a href="update_info">Update Personal Information</a></p>
[end]
[is can_change_password "True"]
<p><a href="change_password">Change Password</a></p>
[end]
[idp_sso_list]
<p id="logout"><a href="singleLogout">Logout</a></p>
[is show_federations "True"]
[if-any federations_list]
<p id="federations"><a href="federations">Federations</a></p>
[end]
[if-any allow_certificate_federation_url]
<p><a href="[allow_certificate_federation_url]">Associate a certificate to this account<a/></p>
[if-any certificate_list]
[certificate_list]
[end]
[end]
[end]
[is admin "True"]
<p id="admin"><a href="admin">Administration</a></p>
[end]
'''))
TextsDirectory.register('register',
N_('Registration'),
hint = N_('Available variable: register_form'),
default = N_('''\
[register_form]
'''))
TextsDirectory.register('register_completed',
N_('Registration Completed'),
default = N_('''\
<p>Your password has been mailed to you.</p>
<p><a href="login">Login page</a></p>
'''))
TextsDirectory.register('change_password',
N_('Changing Password'),
hint = N_('Available variable: change_password_form'),
default = N_('''\
<div id="identity-content">
[change_password_form]
</div>
'''))
TextsDirectory.register('lost_password',
N_('Lost Password'),
hint = N_('Available variables: lost_password_form, lost_password_behaviour, generate_on_remind'),
default = N_('''\
[is lost_password_behaviour "dumb_question"]
[is generate_on_remind "True"]<p>Fill the form to get a <strong>new</strong> password mailed back to you.</p>
[else]<p>Fill the form to get your password mailed back to you.</p>
[end]
[end]
[is lost_password_behaviour "email_reminder"]
[is generate_on_remind "True"]<p>A <strong>new</strong> password will be mailed back to you.</p>
[else]<p>Your password will be mailed back to you.</p>
[end]
[end]
[lost_password_form]
'''))
TextsDirectory.register('lost_identifier',
N_('Lost Account Name'),
hint = N_('Available variables: lost_identifier_form'),
default = N_('''\
<p>Give your email to get back the list of your accounts.</p>
[lost_identifier_form]
'''))
TextsDirectory.register('lost_password_question',
N_('Lost Password Question'),
hint = N_('Available variable: lost_password_question_form'),
default = N_('''\
[lost_password_question_form]
'''))
TextsDirectory.register('lost_password_mailed',
N_('Lost Password (mailed)'),
default = N_('''\
<p>Your password has been mailed back to you.</p>
<p><a href="login">Login</a>
'''))
TextsDirectory.register('update_info',
N_('Updating Personal Information'),
hint = N_('Available variable: info_form'),
default = N_('''\
[info_form]
'''))
TextsDirectory.register('login',
N_('Login'),
hint = N_('Available variables: login_form, authentication_failure'),
default = N_('''\
<div id="login-form">
[if-any authentication_failure]
<div class="errornotice">
[authentication_failure]
</div>
[end]
[login_form]
[if-any cookies]
<p id="cookies" class="error-notice">
You must accept cookies to authenticate.
</p>
[end]
[is lost_password_behaviour "nothing"][else]
<p id="forgot-password"><a href="forgot_password">Forgot your password?</a></p>
[end]
<p id="forgot-identifier"><a href="forgot_identifier">Forgot your account name?</a></p>
[is creation_mode "self"]
<p id="register"><a href="register">Register a new account</a></p>
[end]
[is creation_mode "moderated"]
<p id="register"><a href="register">Register a new account</a></p>
[end]
<script type="text/javascript">
[" document.forms[0]['username'].focus(); "]
</script>
</div>
'''))
TextsDirectory.register('federations',
N_('Federations'),
hint = N_('Available variables: identity_label, federations_list'),
default = N_('''\
[if-any federations_list]
[federations_list]
[end]
<a href=".">Back to home</a>
'''))
EmailsDirectory.register('welcome-email',
N_('Welcome Email'),
N_('Available variables: %s') % 'hostname, identity, username, password, email_as_username, service',
default_subject = N_('Welcome to [hostname]'),
default_body = N_('''\
Welcome to [hostname],
Your password is: [password]
'''))
EmailsDirectory.register('password-email',
N_('Password Reminder'),
N_('Available variables: %s') % 'hostname, identity, email, username, password, email_as_username, service',
default_subject = N_('Your account on [hostname]'),
default_body = N_('''\
Hello,
You asked for your password on [hostname].
[is email_as_username "True"]
Your username is: [email]
[else]
Your username is: [username]
[end]
Your password is: [password]
'''))
EmailsDirectory.register('new-registration-admin-notification',
N_('Notification of new registration to administrators'),
N_('Available variables: %s') % 'hostname, identity, email, email_as_username, username, service',
default_subject = N_('New Registration'),
default_body = N_('''\
Hello,
A new user registered on [hostname].
- Identity: [identity]
[if-any email] - Email: [email][end]
[is email_as_username "False"] - Username: [username][end]
'''))
EmailsDirectory.register('identifier-email',
N_('Identifier Email'),
N_('Available variables: %s') % 'hostname, usernames, email, service',
default_subject = N_('List of identifiers for [email] on [hostname]'),
default_body = N_('''\
You asked for all usernames linked to address [email] on [hostname].
These usernames are: [for usernames]
- [usernames][end]
'''))