This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
dauphine-logement/appli_project/appli_socle/backends.py

204 lines
8.6 KiB
Python

# -*- encoding: utf-8 -*-
import urllib2
from urlparse import urljoin
import ldap
from ldap.filter import filter_format
import logging
from django.contrib.auth.backends import ModelBackend
from django.conf import settings
from django.utils.http import urlencode
from django.http import HttpResponseRedirect
import django_journal
from models import (ProfilRecherche, ProfilOffre)
import middleware
class ProfilMoteurAuthentification(ModelBackend):
'''Classe de base pour les moteurs d'authentification des profils.
Il renvoie des classes filles du modèle User et pas le modèle User lui
même.
'''
prefetch_related = ()
def authenticate(self, email, password, profil=None):
try:
user = self.profil_classe.objects.get(email=email)
if profil and not isinstance(user, profil):
return None
if user.check_password(password):
return user
except self.profil_classe.DoesNotExist:
return None
def get_user(self, user_id):
try:
return self.profil_classe.objects \
.prefetch_related(*self.prefetch_related) \
.get(pk=user_id)
except self.profil_classe.DoesNotExist:
return None
class ProfilOffreMoteurAuthentification(ProfilMoteurAuthentification):
profil_classe = ProfilOffre
def catch_ldap_error(function):
'''Décorateur qui transforme n'importe quelle fonction de telle manière que
si une exception du type LDAPError est levée, celle-ci est envoyé dans les
logs.
'''
def f(*args, **kwargs):
try:
return function(*args, **kwargs)
except ldap.LDAPError:
logging.exception("Call to the LDAP server failed")
return None
except TypeError:
raise
except Exception, e:
logging.exception("exception in LDAP code")
raise
return f
class ProfilRechercheAuthentification(ProfilMoteurAuthentification):
'''Moteur d'authentification pour le profil recherche.
À partir d'un ticket CAS on vérifie sur le serveur CAS que
l'authentification a réussie puis on vérifie que l'identifiant CAS
correspond à un compte LDAP existant. Si un compte CAS existe déjà pour
cet identifiant, on récupère l'email du LDAP est on les synchronise.
Si aucun compte n'existe pour cet identifiant, on vérifie qu'il
appartient aux groupes autorisés et on lui crée un compte.
Les paramètres à configurer dans le settings.py sont:
LDAP_URL: l'url du serveur LDAP
LDAP_BIND_DN: le DN pour le bind administratif
LDAP_BIND_PASSWORD: le mot de passe pour le bind administratif
LDAP_BASE: l'arbre dans lequel faire les recherches
'''
profil_classe = ProfilRecherche
_ldap_connection = None
EDU_PERSON_AFFILIATION = 'eduPersonAffiliation'
MAIL = 'mail'
STUDENT = 'student'
UID = 'uid'
SUPANN_REF_ID = 'supannRefId'
SUPANN_MAIL_PERSO = 'supannMailPerso'
attributes = (MAIL, EDU_PERSON_AFFILIATION, UID, SUPANN_REF_ID, SUPANN_MAIL_PERSO)
prefetch_related = ('type_d_offre', 'annonces_sauvegardees',
'recherches_sauvegardees')
@property
def connection_ldap(self):
'''Retourne une connection authentifiée au serveur LDAP de l'UPD
Après le premier appel la connection est mise en cache.'''
if self._ldap_connection is None:
ldap_url = settings.LDAP_URL
ldap_bind_dn = settings.LDAP_BIND_DN
ldap_bind_password = settings.LDAP_BIND_PASSWORD
self._ldap_connection = ldap.initialize(ldap_url)
self._ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 5.0)
self._ldap_connection.simple_bind_s(ldap_bind_dn, ldap_bind_password)
return self._ldap_connection
def recherche_ldap(self, _filter, attributes=None):
'''Exécute une recherche LDAP sur le serveur LDAP de l'UPD'''
ldap_base = settings.LDAP_BASE
if not attributes:
attributes = self.attributes
return self.connection_ldap.search_s(ldap_base, ldap.SCOPE_SUBTREE,
_filter, attributes)
@catch_ldap_error
def recherche_utilisateur(self, identifiant):
'''Recherche une utilisateur sur le serveur LDAP de l'UPD'''
return self.recherche_ldap(filter_format('(%s=%s)',
(settings.LDAP_CAS_UID, identifiant)))
def creation_du_compte(self, identifiant, cas_url, ldap_resultat):
'''Création de compte depuis le LDAP
'''
email = ldap_resultat[0][1].get(self.MAIL, [''])[0]
email = email or ldap_resultat[0][1].get(self.SUPANN_MAIL_PERSO, [''])[0]
email = email or ''
return self.profil_classe.new_cas_user(identifiant, email, cas_url)
def creation_automatique_du_compte(self, identifiant, cas_url, ldap_resultat):
'''Si le compte LDAP pour l'identifiant CAS identifiant est un compte d'étudiant,
on lui crée automatiquement un profil recherche.
'''
if len(ldap_resultat):
attributes = ldap_resultat[0][1]
if self.STUDENT in attributes.get(self.EDU_PERSON_AFFILIATION, []) or \
(len(attributes.get(self.EDU_PERSON_AFFILIATION, [])) == 0
and len(attributes.get(self.MAIL, [])) == 0
and attributes.get(self.SUPANN_REF_ID, [''])[0].startswith('{APOGEE:OPI}')
and attributes.get(self.SUPANN_MAIL_PERSO)):
return self.creation_du_compte(identifiant, cas_url, ldap_resultat)
else:
django_journal.error_record('warning',
u'création de compte impossible pour l\'identifiant CAS {cas_user}',
cas_user=identifiant,
ldap_data=ldap_resultat)
return None
@catch_ldap_error
def authenticate(self, cas_ticket, service):
'''Valide et authentifie un ticket CAS'''
try:
cas_url = settings.CAS_URL
validation_url = '%s?%s'% (urljoin(cas_url, 'validate'),
urlencode({ 'ticket': cas_ticket, 'service': service }))
logging.debug('cas callback URL: %s', validation_url)
resultat, identifiant = urllib2.urlopen(validation_url).read().splitlines()
logging.debug('cas callback result: %s', resultat)
if resultat == 'yes':
ldap_resultat = self.recherche_utilisateur(identifiant)
try:
utilisateur_cas = self.profil_classe.objects.get(utilisateur_cas__identifiant=identifiant)
except self.profil_classe.DoesNotExist:
if not ldap_resultat:
django_journal.record('error',
u'identifiant CAS inconnu dans le LDAP {cas_user}',
cas_user=identifiant)
return None
utilisateur_cas = self.creation_automatique_du_compte(identifiant, cas_url, ldap_resultat)
if utilisateur_cas:
django_journal.record('nouveau-compte-recherche', u'nouveau compte CAS {cas_user}',
cas_user=identifiant, ldap_data=ldap_resultat)
else:
# synchronisation de l'email
if ldap_resultat is not None:
ldap_email = ldap_resultat[0][1].get(self.MAIL) \
or ldap_resultat[0][1].get(self.SUPANN_MAIL_PERSO)
if ldap_email and ldap_email[0] != utilisateur_cas.email:
django_journal.record('modifie-compte', u'synchronisation de l\'email pour {email}',
user=utilisateur_cas,
cas_user=identifiant,
email=ldap_email[0].decode('utf-8'))
utilisateur_cas.email = ldap_email[0].decode('utf-8')
utilisateur_cas.save()
return utilisateur_cas
except Exception:
logging.exception('exception in authenticate')
raise
@classmethod
def redirect_to_cas(cls, request, service=None, passif=False, force=False):
if not service:
service = request.build_absolute_uri(request.get_full_path())
params = { 'service': service }
if passif:
params['gateway'] = '1'
if force:
params['renew'] = '1'
url = '%s?%s' % (urljoin(settings.CAS_URL, 'login'),
urlencode(params))
return HttpResponseRedirect(url)