Manage LDAP extra attributes (#19365)

This extra attributes are retreived by making other LDAP queries
with parameters composed by looping on an user object's attribute
values. A mapping will be apply on corresponding objects's attributes
and resulting informations are compiled in a list and serialize in
JSON (configurable, but JSON is the only format available for now)
This commit is contained in:
Benjamin Renard 2017-12-11 19:56:26 +01:00 committed by Benjamin Dauvergne
parent 43d20aafa1
commit 7ac1fbcb4b
2 changed files with 146 additions and 1 deletions

View File

@ -33,6 +33,7 @@ import logging
import random
import base64
import os
import json
# code originaly copied from by now merely inspired by
# http://www.amherst.k12.oh.us/django-ldap.html
@ -449,6 +450,8 @@ class LDAPBackend(object):
'mandatory_attributes_values': {},
# mapping from LDAP attributes name to other names
'attribute_mappings': [],
# extra attributes retrieve by making other LDAP search using user object informations
'extra_attributes': {},
# realm for selecting an ldap configuration or formatting usernames
'realm': 'ldap',
# template for building username
@ -961,6 +964,13 @@ class LDAPBackend(object):
from_ldap = mapping.get('from_ldap')
if from_ldap:
attributes.add(from_ldap)
for extra_at in block.get('extra_attributes', {}):
if 'loop_over_attribute' in block['extra_attributes'][extra_at]:
attributes.add(block['extra_attributes'][extra_at]['loop_over_attribute'])
at_mapping = block['extra_attributes'][extra_at].get('mapping', {})
for key in at_mapping:
if at_mapping[key] != 'dn':
attributes.add(at_mapping[key])
return list(set(attribute.lower() for attribute in attributes))
@classmethod
@ -992,6 +1002,66 @@ class LDAPBackend(object):
new = set(old) | set(attribute_map[from_attribute])
attribute_map[to_attribute] = list(new)
attribute_map['dn'] = force_text(dn)
# extra attributes
attribute_map = cls.get_ldap_extra_attributes(block, conn, dn, attribute_map)
return attribute_map
@classmethod
def get_ldap_extra_attributes(cls, block, conn, dn, attribute_map):
'''Retrieve extra attributes from LDAP'''
ldap_scopes = {
'base': ldap.SCOPE_BASE,
'one': ldap.SCOPE_ONELEVEL,
'sub': ldap.SCOPE_SUBTREE,
}
log.debug('Attrs before extra attributes : %s', attribute_map)
for extra_attribute_name in block.get('extra_attributes', {}):
extra_attribute_config = block['extra_attributes'][extra_attribute_name]
extra_attribute_values = []
if 'loop_over_attribute' in extra_attribute_config:
extra_attribute_config['loop_over_attribute'] = extra_attribute_config['loop_over_attribute'].lower()
if extra_attribute_config['loop_over_attribute'] not in attribute_map:
log.debug('loop_over_attribute %s not present (or empty) in user object attributes retreived. Pass.' % extra_attribute_config['loop_over_attribute'])
continue
if 'filter' not in extra_attribute_config and 'basedn' not in extra_attribute_config:
log.warning('Extra attribute %s not correctly configured : you need to defined at least one of filter or basedn parameters' % extra_attribute_name)
for item in attribute_map[extra_attribute_config['loop_over_attribute']]:
ldap_filter = extra_attribute_config.get('filter', 'objectClass=*').format(item=item, **attribute_map)
ldap_basedn = extra_attribute_config.get('basedn', block.get('basedn')).format(item=item, **attribute_map)
ldap_scope = ldap_scopes.get(extra_attribute_config.get('scope', 'sub'), ldap.SCOPE_SUBTREE)
ldap_attributes_mapping = extra_attribute_config.get('mapping', {})
ldap_attributes_names = list(filter(lambda a: a != 'dn', ldap_attributes_mapping.values()))
try:
results = conn.search_s(ldap_basedn, ldap_scope, ldap_filter, ldap_attributes_names)
except ldap.LDAPError:
log.exception('unable to retrieve extra attribute %s for item %s' % (extra_attribute_name, item))
continue
item_value = {}
for obj in results:
log.debug(u'Object retrieved for extra attr %s with item %s : %s' % (extra_attribute_name, item, obj))
obj_attributes = cls.normalize_ldap_results(obj[1])
obj_attributes[dn] = obj[0]
log.debug(u'Object attributes normalized for extra attr %s with item %s : %s' % (extra_attribute_name, item, obj_attributes))
for key in ldap_attributes_mapping:
item_value[key] = obj_attributes.get(ldap_attributes_mapping[key].lower())
log.debug(u'Object attribute %s value retrieved for extra attr %s with item %s : %s' % (ldap_attributes_mapping[key], extra_attribute_name, item, item_value[key]))
if not item_value[key]:
del item_value[key]
elif len(item_value[key]) == 1:
item_value[key] = item_value[key][0]
extra_attribute_values.append(item_value)
else:
log.warning('loop_over_attribute not defined for extra attribute %s' % extra_attribute_name)
extra_attribute_serialization = extra_attribute_config.get('serialization', None)
if extra_attribute_serialization is None:
attribute_map[extra_attribute_name] = extra_attribute_values
elif extra_attribute_serialization == 'json':
attribute_map[extra_attribute_name] = json.dumps(extra_attribute_values)
else:
log.warning('Invalid serialization type "%s" for extra attribute %s' % (extra_attribute_serialization, extra_attribute_name))
return attribute_map
@classmethod
@ -1141,6 +1211,7 @@ class LDAPBackend(object):
for block in cls.get_config():
names.update(cls.get_ldap_attributes_names(block))
names.update(map_text(block['mandatory_attributes_values']).keys())
names.update(map_text(block['extra_attributes']).keys())
return [(a, '%s (LDAP)' % a) for a in sorted(names)]
@classmethod

View File

@ -15,6 +15,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import pytest
@ -54,6 +55,17 @@ DN = 'cn=%s,o=ôrga' % escape_dn_chars(CN)
PASS = 'passé'
UPASS = u'passé'
EMAIL = 'etienne.michu@example.net'
CARLICENSE = '123445ABC'
EO_O = "EO"
EO_STREET = "169 rue du Chateau"
EO_POSTALCODE = "75014"
EO_CITY = "PARIS"
EE_O = "EE"
EE_STREET = "44 rue de l'Ouest"
EE_POSTALCODE = "75014"
EE_CITY = "PARIS"
base_dir = os.path.dirname(__file__)
key_file = os.path.join(base_dir, 'key.pem')
@ -90,12 +102,32 @@ gn: Étienne
l: Paris
mail: etienne.michu@example.net
jpegPhoto:: ACOE
carLicense: {cl}
o: EO
o: EE
dn: cn=group1,o=ôrga
objectClass: groupOfNames
member: {dn}
'''.format(dn=DN, uid=UID, password=PASS))
dn: o={eo_o},o=ôrga
objectClass: organization
o: {eo_o}
postalAddress: {eo_street}
postalCode: {eo_postalcode}
l: {eo_city}
dn: o={ee_o},o=ôrga
objectClass: organization
o: {ee_o}
postalAddress: {ee_street}
postalCode: {ee_postalcode}
l: {ee_city}
'''.format(dn=DN, uid=UID, password=PASS, cl=CARLICENSE,
eo_o=EO_O, eo_street=EO_STREET, eo_postalcode=EO_POSTALCODE, eo_city=EO_CITY,
ee_o=EE_O, ee_street=EE_STREET, ee_postalcode=EE_POSTALCODE, ee_city=EE_CITY
))
for i in range(5):
slapd.add_ldif('''dn: uid=mïchu{i},o=ôrga
objectClass: inetOrgPerson
@ -949,6 +981,7 @@ def test_get_attributes(slapd, settings, db, rf):
'url': [slapd.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
'attributes': ['uid', 'carLicense'],
}]
user = authenticate(username=USERNAME, password=UPASS)
assert user
@ -958,6 +991,7 @@ def test_get_attributes(slapd, settings, db, rf):
'mail': [u'etienne.michu@example.net'],
'sn': [u'Michu'],
'uid': [u'etienne.michu'],
'carlicense': ['123445ABC'],
}
# simulate LDAP down
slapd.stop()
@ -967,6 +1001,7 @@ def test_get_attributes(slapd, settings, db, rf):
'mail': [u'etienne.michu@example.net'],
'sn': [u'Michu'],
'uid': [u'etienne.michu'],
'carlicense': ['123445ABC'],
}
assert not user.check_password(UPASS)
# simulate LDAP come back up
@ -982,4 +1017,43 @@ def test_get_attributes(slapd, settings, db, rf):
'mail': [u'etienne.michu@example.net'],
'sn': [u'Micho'],
'uid': [u'etienne.michu'],
'carlicense': ['123445ABC'],
}
@pytest.mark.django_db
def test_get_extra_attributes(slapd, settings, client):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd.ldap_url],
'basedn': 'o=ôrga',
'use_tls': False,
'groupstaff': ['cn=group1,o=ôrga'],
'attributes': ['uid'],
'extra_attributes': {
'orga': {
'loop_over_attribute': 'o',
'filter': '(&(objectclass=organization)(o={item}))',
'basedn': 'o=ôrga',
'scope': 'sub',
'mapping': {
'id': 'o',
'street': 'postalAddress',
'city': 'l',
'postal_code': 'postalCode',
},
'serialization': 'json'
}
},
}]
response = client.post('/login/', {'login-password-submit': '1',
'username': 'etienne.michu',
'password': PASS}, follow=True)
user = response.context['user']
fetched_attrs = user.get_attributes(object(), {})
assert UID in fetched_attrs.get('uid')
assert 'orga' in fetched_attrs
orgas = json.loads(fetched_attrs.get('orga'))
assert isinstance(orgas, list)
assert len(orgas) == 2
assert {'id': EO_O, 'street': EO_STREET, 'city': EO_CITY, 'postal_code': EO_POSTALCODE} in orgas
assert {'id': EE_O, 'street': EE_STREET, 'city': EE_CITY, 'postal_code': EE_POSTALCODE} in orgas