123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
"""CAS authentication backend"""
|
|
|
|
try:
|
|
import ldap
|
|
import ldap.filter
|
|
except ImportError:
|
|
ldap = None
|
|
|
|
from django_cas.backends import CASBackend, _verify
|
|
from django.contrib.auth.backends import ModelBackend
|
|
from django.core.cache import cache
|
|
|
|
from models.user import User, PolynumProfile
|
|
|
|
import app_settings
|
|
|
|
class PolynumBackendMixin(object):
|
|
def get_user(self, user_id):
|
|
"""Retrieve the user's entry in the User model if it exists"""
|
|
try:
|
|
return User.objects.get(pk=user_id)
|
|
except User.DoesNotExist:
|
|
return None
|
|
|
|
class CASBackend(PolynumBackendMixin, CASBackend):
|
|
USER_ATTRIBUTES = {
|
|
'mail': 'email',
|
|
'givenName': 'first_name',
|
|
'sn': 'last_name',
|
|
}
|
|
PROFILE_ATTRIBUTES = {
|
|
'telephoneNumber': 'phone',
|
|
}
|
|
|
|
ATTRIBUTES = [ 'supannEntiteAffectationPrincipale',
|
|
'supannEntiteAffectation', 'mail', 'telephoneNumber',
|
|
'supannAutreTelephone', 'roomNumber', 'givenName', 'sn' ]
|
|
|
|
def query_ldap(self, username):
|
|
r = []
|
|
if ldap:
|
|
try:
|
|
cache_key = 'ldap_query_%s' % username
|
|
r = cache.get(cache_key)
|
|
if r is None:
|
|
connection = ldap.initialize(app_settings.LDAP_URL)
|
|
connection.simple_bind_s(app_settings.LDAP_BIND_DN,
|
|
app_settings.LDAP_BIND_PASSWORD)
|
|
ldap_filter = ldap.filter.filter_format(app_settings.LDAP_USER_QUERY, (username,))
|
|
r = connection.search_s(app_settings.LDAP_BASE,
|
|
ldap.SCOPE_SUBTREE,
|
|
ldap_filter,
|
|
self.ATTRIBUTES)
|
|
cache.set(cache_key, r, 600)
|
|
except ldap.LDAPError:
|
|
pass
|
|
return r
|
|
|
|
def authenticate(self, ticket, service):
|
|
username, attributes = _verify(ticket, service)
|
|
if not username:
|
|
return None
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
except User.DoesNotExist:
|
|
# user will have an "unusable" password
|
|
if not self.query_ldap(username):
|
|
return None
|
|
user = User.objects.create_user(username, '')
|
|
user.save()
|
|
if user is not None:
|
|
user = self.populate_from_ldap(user)
|
|
if hasattr(user, 'save'):
|
|
user.save()
|
|
return user
|
|
|
|
def get_user(self, user_id):
|
|
try:
|
|
user = User.objects.get(pk=user_id)
|
|
except User.DoesNotExist:
|
|
return None
|
|
self.populate_from_ldap(user)
|
|
return user
|
|
|
|
def ldap_attributes_to_unicode(self, ldap_attributes):
|
|
return dict(((a, map(lambda x: unicode(x, 'utf-8'), b))
|
|
for a, b in ldap_attributes.iteritems()))
|
|
|
|
def get_ldap_attributes(self, user):
|
|
r = self.query_ldap(user.username)
|
|
if r:
|
|
return self.ldap_attributes_to_unicode(r[0][1])
|
|
return {}
|
|
|
|
def populate_from_ldap(self, user):
|
|
def get_preferred_entities():
|
|
r = self.get_ldap_attributes(user)
|
|
l1 = r.get('supannEntiteAffectationPrincipale', [])
|
|
l2 = r.get('supannEntiteAffectation', [])
|
|
entities = set(l1) | set(l2)
|
|
return list(entities)
|
|
|
|
if not user:
|
|
return user
|
|
user.get_ldap_attributes = lambda: self.get_ldap_attributes(user)
|
|
user.get_preferred_entities = get_preferred_entities
|
|
profile, created = PolynumProfile.objects.get_or_create(user=user)
|
|
r = self.query_ldap(user.username)
|
|
if r:
|
|
attributes = self.ldap_attributes_to_unicode(r[0][1])
|
|
for key in attributes:
|
|
value = attributes[key][0]
|
|
setattr(user, key, value)
|
|
if key in self.USER_ATTRIBUTES:
|
|
setattr(user, self.USER_ATTRIBUTES[key], value)
|
|
if key in self.PROFILE_ATTRIBUTES:
|
|
setattr(profile, self.PROFILE_ATTRIBUTES[key], value)
|
|
profile.save()
|
|
return user
|
|
|
|
class ModelBackend(PolynumBackendMixin, ModelBackend):
|
|
pass
|