235 lines
8.0 KiB
Python
235 lines
8.0 KiB
Python
#!env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import unicodedata
|
|
import collections
|
|
|
|
import ldif
|
|
import ldap.dn
|
|
|
|
from .tb import print_tb
|
|
|
|
DC = 'curie'
|
|
O = 'Institut curie'
|
|
UAI = '{UAI}ATROUVER'
|
|
BASE_DN = 'dc=%s,dc=fr' % ldap.dn.escape_dn_chars(DC)
|
|
|
|
|
|
def lowercase_keys(d):
|
|
return dict((k.lower(), v) for k, v in d.iteritems())
|
|
|
|
|
|
def strip_accents(s):
|
|
s = unicode(s, 'utf-8')
|
|
return ''.join(c for c in unicodedata.normalize('NFD', s)
|
|
if unicodedata.category(c) != 'Mn').encode('utf-8')
|
|
|
|
|
|
class Error(Exception):
|
|
def __init__(self, dn, *args):
|
|
self.dn = dn
|
|
self.args = args
|
|
|
|
def __str__(self):
|
|
return '%s: %s' % (self.dn, ' '.join(map(str, self.args)))
|
|
|
|
|
|
class CurieLdifParser(ldif.LDIFParser):
|
|
'''Conversion LDAP Institut Curie vers SUPANN
|
|
|
|
Il faut aggréger les données venant de l'annuaire AD et de l'annuaire LDAP Sun
|
|
|
|
Attributs obligatoires:
|
|
|
|
uid <- AD employeeNumber ou LDAP uid @curie
|
|
givenname <- LDAP ICPrenomNaissance
|
|
sn <- LDAP ICNomNaissance
|
|
supannAliasLogin <- AD samAccountName ou LDAP ICLogin
|
|
userPassword <- {SASL}[samAccountName]@curie
|
|
supannListeRouge <- FALSE
|
|
|
|
Attribut optionnel:
|
|
|
|
telephoneNumber <- telephoneNumber
|
|
|
|
si LDAP.ICEntite == 'Recherche':
|
|
suppanEntiteAffectationPrincipale = LDAP.ICEquipeRecherche[0]['OU']
|
|
supannEntiteAffectation = LDAP.ICEntite
|
|
sinon si LDAP.ICEntite == 'Hopital'
|
|
supannEntiteAffectation = LDAP.ICEntite
|
|
sinon si LDAP.ICEneite == 'SI'
|
|
supannEntiteAffectation = LDAP.ICEntite
|
|
sinon:
|
|
error
|
|
|
|
Les erreurs seront disponible sur la sortie erreur.
|
|
'''
|
|
|
|
errors = None
|
|
users = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.errors = []
|
|
self.users = kwargs.pop('users', None) or collections.defaultdict(lambda: {})
|
|
ldif.LDIFParser.__init__(self, *args, **kwargs)
|
|
|
|
def assert_sv_attribute(self, entry, name):
|
|
assert name in entry, 'attribut %s manquant' % name
|
|
assert len(entry[name]) == 1, 'plus d\'un attribut %s' % name
|
|
return entry[name][0]
|
|
|
|
def handle(self, dn, entry):
|
|
entry = lowercase_keys(entry)
|
|
try:
|
|
if 'ref' in entry:
|
|
return
|
|
if 'control' in entry:
|
|
return
|
|
if 'result' in entry:
|
|
return
|
|
if 'employeenumber' in entry:
|
|
self.handle_ad(dn, entry)
|
|
elif 'ICPersonne' in entry['objectclass']:
|
|
self.handle_sun(dn, entry)
|
|
else:
|
|
assert False, ('entrée ignorée, car absence d\'attribut employeeNumber ou '
|
|
'objectClass=ICPersonne')
|
|
except AssertionError, e:
|
|
self.errors.append(Error(dn, str(e)))
|
|
|
|
def handle_ad(self, dn, entry):
|
|
uid = self.assert_sv_attribute(entry, 'employeenumber')
|
|
supann_alias_login = self.assert_sv_attribute(entry, 'samaccountname')
|
|
user_password = '{SASL}' + supann_alias_login + '@curie'
|
|
supann_liste_rouge = 'FALSE'
|
|
|
|
self.users[uid].update({
|
|
'objectClass': ['person', 'supannPerson', 'organizationalPerson', 'eduPerson',
|
|
'inetOrgPerson', 'icPerson'],
|
|
'uid': uid,
|
|
'supannAliasLogin': supann_alias_login,
|
|
'userPassword': user_password,
|
|
'supannListeRouge': supann_liste_rouge,
|
|
'eduPersonPrincipalName': supann_alias_login + '@curie.fr',
|
|
})
|
|
self.users[uid].setdefault('_source', set()).add('ad')
|
|
|
|
def extract_top_rdn(self, dn, name='ou'):
|
|
parsed = ldap.dn.str2dn(dn)
|
|
assert len(parsed) > 1, 'dn is empty'
|
|
assert len(parsed[0]) == 1, 'rdn has more than one part %r' % parsed[0]
|
|
assert parsed[0][0][0].lower() == name, 'top rdn is not %s: %r' % (name, parsed[0][0])
|
|
return parsed[0][0][1]
|
|
|
|
def handle_sun(self, dn, entry):
|
|
uid = self.assert_sv_attribute(entry, 'uid')
|
|
if 'icprenomnaissance' in entry:
|
|
prenom = self.assert_sv_attribute(entry, 'icprenomnaissance')
|
|
else:
|
|
prenom = self.assert_sv_attribute(entry, 'givenname')
|
|
if 'icnomnaissance' in entry:
|
|
nom = self.assert_sv_attribute(entry, 'icnomnaissance')
|
|
else:
|
|
nom = self.assert_sv_attribute(entry, 'sn')
|
|
if 'cn' in entry:
|
|
cn = self.assert_sv_attribute(entry, 'cn')
|
|
else:
|
|
cn = strip_accents('%s %s' % (prenom, nom)).strip()
|
|
mail = entry.get('mail', [])
|
|
supann_entite_affectation = []
|
|
supann_entite_affectation_principale = []
|
|
|
|
try:
|
|
ic_equipe_recherche = self.assert_sv_attribute(entry, 'icequiperecherche')
|
|
except AssertionError, e:
|
|
self.errors.append(Error(dn, str(e)))
|
|
else:
|
|
ou = self.extract_top_rdn(ic_equipe_recherche)
|
|
supann_entite_affectation.append(ou)
|
|
supann_entite_affectation_principale.append(ou)
|
|
try:
|
|
ic_unite_fonctionnelle = self.assert_sv_attribute(entry, 'icunitefonctionnelle')
|
|
except AssertionError, e:
|
|
self.errors.append(Error(dn, str(e)))
|
|
else:
|
|
ou = self.extract_top_rdn(ic_unite_fonctionnelle)
|
|
supann_entite_affectation.append(ou)
|
|
supann_civilite = []
|
|
if entry.get('icsexe', []) == ['M']:
|
|
supann_civilite = ['M.']
|
|
if entry.get('icsexe', []) == ['F']:
|
|
supann_civilite = ['Mme']
|
|
|
|
d = {
|
|
'uid': uid,
|
|
'sn': nom,
|
|
'givenName': prenom,
|
|
'cn': cn,
|
|
'mail': mail,
|
|
'supannEntiteAffectation': supann_entite_affectation,
|
|
'supannEntiteAffectationPrincipale': supann_entite_affectation_principale,
|
|
'supanncivilite': supann_civilite,
|
|
'supannEtablissement': '{UAI}0753172R',
|
|
}
|
|
if entry.get('iclisterouge') and entry['iclisterouge'][0]:
|
|
d['icListeRouge'] = [entry['iclisterouge'][0].upper()]
|
|
# attributes to copy
|
|
for to_copy in ('telephoneNumber', 'icLibelleEntite', 'icUniteFonctionnelle',
|
|
'icEquipeRecherche', 'icEmploi'):
|
|
to_copy = to_copy.lower()
|
|
if to_copy in entry:
|
|
d[to_copy] = entry[to_copy]
|
|
self.users[uid].update(d)
|
|
self.users[uid].setdefault('_source', set()).add('sun')
|
|
|
|
|
|
def main():
|
|
users = None
|
|
if not sys.argv[1:]:
|
|
print 'Utilisation : curie2supann LDIF_FILE [LDIF_FILE..]'
|
|
print
|
|
print 'Le nouveau fichier LDIF est émis sur la sortie standard et les erreurs sur la sortie'
|
|
print 'erreur.'
|
|
sys.exit(1)
|
|
for path in sys.argv[1:]:
|
|
parser = CurieLdifParser(open(path), users=users)
|
|
parser.parse()
|
|
users = parser.users
|
|
|
|
if parser.errors:
|
|
print >>sys.stderr, path, 'ERRORS:', len(parser.errors)
|
|
for error in parser.errors:
|
|
print >>sys.stderr, ' -', error
|
|
else:
|
|
print >>sys.stderr, path, 'OK'
|
|
writer = ldif.LDIFWriter(sys.stdout)
|
|
for uid in users:
|
|
d = users[uid]
|
|
if d['_source'] != set(['ad', 'sun']):
|
|
msg = 'uid=%s uniquement dans l\'annuaire %s' % (uid, list(d['_source'])[0])
|
|
print >>sys.stderr, msg
|
|
else:
|
|
# make it globally unique
|
|
d['uid'] = d['uid'] + '@curie'
|
|
dn = [[('uid', d['uid'], 1)],
|
|
[('ou', 'people', 1)],
|
|
[('dc', DC, 1)],
|
|
[('dc', 'fr', 1)]]
|
|
entry = {}
|
|
for k, v in d.items():
|
|
if k.startswith('_'):
|
|
continue
|
|
if isinstance(v, list):
|
|
v = filter(None, v)
|
|
entry[k] = v
|
|
elif v:
|
|
entry[k] = [v]
|
|
writer.unparse(ldap.dn.dn2str(dn), entry)
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except:
|
|
print_tb()
|