ldap_backend: replace dn lookup by an external_id lookup

The external_id template can be specified using external_id_tuples. Each
tuple list the attributes to concatenante to build the external id.
Attributes are urlencode then joined using a space character. If you add
the ':unquote' suffix to an attribute name it will not be urlencoded,
but you must be sure it's always an ASCII string without any space.

The new setting clean_external_id_on_update indicate to clean all other
existing external id for an user after linking the user to an external
id.

All use of UserExternalId is supported by the default configuration of:

	external_id_tuples=(('dn:unquote',),),

to migrate to a new way of building the external id just define:

	external_id_tuple=(('my', 'new', 'tuple), ('dn:unquote',)),

The first tuple is used to canonicalize the external id of a found or
newly created user. The other tuples are only used to lookup existing
users, so that you can safely migrate from an old way of building the
external_id to a new one.

On AD the following configuration gives a permanent external id:

 external_id_tuple=(('objectGUID',),)

On OpenLDAP:

 external_id_tuple=(('entryUUID',),)
This commit is contained in:
Benjamin Dauvergne 2014-07-07 16:07:21 +02:00
parent 2383d81f73
commit 6c20a1a064
1 changed files with 94 additions and 22 deletions

View File

@ -10,8 +10,10 @@ import random
import pickle
import base64
import hashlib
import urllib
# code copied from http://www.amherst.k12.oh.us/django-ldap.html
# code originaly copied from by now merely inspired by
# http://www.amherst.k12.oh.us/django-ldap.html
log = logging.getLogger(__name__)
@ -87,9 +89,11 @@ _DEFAULTS = {
# update username on all login, use with CAUTION !! only if you know that
# generated username are unique
'update_username': False,
# lookup existing user with dn
'lookups': ('username', 'dn'),
# lookup existing user with an external id build with attributes
'lookups': ('external_id', 'username'),
'external_id_tuples': (('dn:noquote',),),
# keep password around so that Django authentification also work
'clean_external_id_on_update': True,
'keep_password': True,
'limit_to_realm': True,
}
@ -533,11 +537,14 @@ class LDAPBackend():
def get_ldap_attributes(self, uri, dn, conn, block):
'''Retrieve some attributes from LDAP, add mandatory values then apply
defined mappings between atrribute names'''
attributes = map(str, block['attributes'])
attributes = set()
attributes.update(map(str, block['attributes']))
for external_id_tuple in block['external_id_tuples']:
attributes.update(external_id_tuple)
attribute_mappings = block['attribute_mappings']
mandatory_attributes_values = block['mandatory_attributes_values']
try:
results = conn.search_s(dn, ldap.SCOPE_BASE, '(objectclass=*)', attributes)
results = conn.search_s(dn, ldap.SCOPE_BASE, '(objectclass=*)', list(attributes))
except ldap.LDAPError:
log.exception('unable to retrieve attributes of dn %r', dn)
return
@ -561,22 +568,74 @@ class LDAPBackend():
attribute_map['dn'] = dn
return attribute_map
def lookup_existing_user(self, uri, dn, username, password, conn, block, attributes):
def external_id_to_filter(self, external_id, external_id_tuple):
'''Split the external id, decode it and build an LDAP filter from it
and the external_id_tuple.
'''
splitted = external_id.split()
if len(splitted) != len(external_id_tuple):
return
filters = zip(external_id_tuple, splitted)
decoded = []
for attribute, value in filters:
quote = True
if ':' in attribute:
attribute, param = attribute.split(':')
quote = not 'noquote' in param.split(',')
if quote:
decoded.append((attribute, urllib.unquote(value)))
else:
decoded.append((attribute, value.encode('utf-8')))
filters = [filter_format('(%s=%s)', (a,b)) for a, b in decoded]
return '(&{0})'.format(''.join(filters))
def build_external_id(self, external_id_tuple, attributes):
'''Build the exernal id for the user, use attribute that eventually never change like GUID or UUID'''
l = []
for attribute in external_id_tuple:
quote = True
if ':' in attribute:
attribute, param = attribute.split(':')
quote = not 'noquote' in param.split(',')
v = attributes[attribute]
if isinstance(v, list):
v = v[0]
if isinstance(v, unicode):
v = v.encode('utf-8')
if quote:
v = urllib.quote(v)
l.append(v)
return ' '.join(v for v in l)
def lookup_by_username(self, username):
User = get_user_model()
try:
log.debug('lookup using username %r', username)
return LDAPUser.objects.get(**{User.USERNAME_FIELD: username})
except User.DoesNotExist:
return
def lookup_by_external_id(self, block, attributes):
User = get_user_model()
for eid_tuple in block['external_id_tuples']:
external_id = self.build_external_id(eid_tuple, attributes)
if not external_id:
continue
try:
log.debug('lookup using external_id %r: %r', eid_tuple,
external_id)
return LDAPUser.objects.get(
userexternalid__external_id=external_id,
userexternalid__source=block['realm'])
except User.DoesNotExist:
pass
def lookup_existing_user(self, uri, dn, username, password, conn, block, attributes):
for lookup_type in block['lookups']:
if lookup_type == 'username':
try:
log.debug('lookup using username %r', username)
return LDAPUser.objects.get(**{User.USERNAME_FIELD: username})
except User.DoesNotExist:
pass
elif lookup_type == 'dn':
try:
log.debug('lookup using dn %r', dn)
return LDAPUser.objects.get(userexternalid__external_id=dn,
userexternalid__source=block['realm'])
except User.DoesNotExist:
pass
return self.lookup_by_username(username)
elif lookup_type == 'external_id':
return self.lookup_by_external_id(block, attributes)
def update_user_identifiers(self, user, uri, dn, username, conn,
block, attributes):
@ -589,12 +648,25 @@ class LDAPBackend():
log.debug(log_msg, user.username, username)
user.username = username
user.save()
# if dn lookup is used, update it
if 'dn' in block['lookups']:
# if external_id lookup is used, update it
if 'external_id' in block['lookups'] \
and block.get('external_id_tuples') \
and block['external_id_tuples'][0]:
if not user.pk:
user.save()
UserExternalId.objects.get_or_create(user=user, external_id=dn,
source=block['realm'])
external_id = self.build_external_id(
block['external_id_tuples'][0],
attributes)
if external_id:
new, created = UserExternalId.objects.get_or_create(
user=user,
external_id=external_id,
source=block['realm'])
if block['clean_external_id_on_update']:
UserExternalId.objects \
.exclude(id=new.id) \
.filter(user=user, source=block['realm']) \
.delete()
def _return_user(self, uri, dn, username, password, conn, block):
attributes = self.get_ldap_attributes(uri, dn, conn, block)