This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
curie/curie/curie2supann.py

190 lines
6.4 KiB
Python
Executable File

#!env python
# -*- coding: utf-8 -*-
import sys
import unicodedata
import collections
import ldif
import ldap.dn
DC = 'curie'
O = 'Institut curie'
UAI = '{UAI}ATROUVER'
BASE_DN = 'dc=%s,dc=fr' % ldap.dn.escape_dn_chars(DC)
def lowercase_keys(d):
return dict((k.lower(), v) for k, v in d.iteritems())
def strip_accents(s):
s = unicode(s, 'utf-8')
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn').encode('utf-8')
class Error(Exception):
def __init__(self, dn, *args):
self.dn = dn
self.args = args
def __str__(self):
return '%s: %s' % (self.dn, ' '.join(map(str, self.args)))
class CurieLdifParser(ldif.LDIFParser):
'''Conversion LDAP Institut Curie vers SUPANN
Il faut aggréger les données venant de l'annuaire AD et de l'annuaire LDAP Sun
Attributs obligatoires:
uid <- AD employeeNumber ou LDAP uid @curie
givenname <- LDAP ICPrenomNaissance
sn <- LDAP ICNomNaissance
supannAliasLogin <- AD samAccountName ou LDAP ICLogin
userPassword <- {SASL}[samAccountName]@curie
supannListeRouge <- FALSE
Attribut optionnel:
telephoneNumber <- telephoneNumber
si LDAP.ICEntite == 'Recherche':
suppanEntiteAffectationPrincipale = LDAP.ICEquipeRecherche[0]['OU']
supannEntiteAffectation = LDAP.ICEntite
sinon si LDAP.ICEntite == 'Hopital'
supannEntiteAffectation = LDAP.ICEntite
sinon si LDAP.ICEneite == 'SI'
supannEntiteAffectation = LDAP.ICEntite
sinon:
error
Les erreurs seront disponible sur la sortie erreur.
'''
errors = None
users = None
def __init__(self, *args, **kwargs):
self.errors = []
self.users = kwargs.pop('users', None) or collections.defaultdict(lambda: {})
ldif.LDIFParser.__init__(self, *args, **kwargs)
def assert_sv_attribute(self, entry, name):
assert name in entry, 'attribut %s manquant' % name
assert len(entry[name]) == 1, 'plus d\'un attribut %s' % name
return entry[name][0]
def handle(self, dn, entry):
entry = lowercase_keys(entry)
try:
if 'employeenumber' in entry:
self.handle_ad(dn, entry)
elif 'ICPersonne' in entry['objectclass']:
self.handle_sun(dn, entry)
else:
raise Error(dn, 'entrée ignorée, car absence d\'attribut employeeNumber ou '
'objectClass=ICPersonne')
except AssertionError, e:
self.errors.append(Error(dn, str(e)))
def handle_ad(self, dn, entry):
uid = self.assert_sv_attribute(entry, 'employeenumber')
supann_alias_login = self.assert_sv_attribute(entry, 'samaccountname')
user_password = '{SASL}' + supann_alias_login + '@curie'
supann_liste_rouge = 'FALSE'
self.users[uid].update({
'uid': uid,
'supannAliasLogin': supann_alias_login,
'userPassword': user_password,
'supannListeRouge': supann_liste_rouge,
})
self.users[uid].setdefault('_source', set()).add('ad')
def handle_sun(self, dn, entry):
uid = self.assert_sv_attribute(entry, 'uid')
try:
ic_entite = self.assert_sv_attribute(entry, 'icentite')
except AssertionError:
ic_entite = None
else:
assert ic_entite in ('Recherche', 'Hopital', 'SI'), \
'valeur d\'ICEntite inconnue: %s' % ic_entite
supann_entite_affectation = ic_entite
prenom = self.assert_sv_attribute(entry, 'icprenomnaissance')
nom = self.assert_sv_attribute(entry, 'icnomnaissance')
telephone = entry.get('telephoneNumber', [])
if ic_entite == 'Recherche':
ic_equipe_recherche = self.assert_sv_attribute(entry, 'icequiperecherche')
ic_equipe_recherche_dn = ldap.dn.str2dn(ic_equipe_recherche)
assert ic_equipe_recherche[0][0][0].lower() == 'ou', ('ICEquipeRecherche ne contient '
'pas le DN d\'une OU: %s'
% ic_equipe_recherche)
supann_entite_affectation_principale = ic_equipe_recherche_dn[0][0][1]
else:
supann_entite_affectation_principale = None
d = {
'uid': uid,
'sn': nom,
'givenName': prenom,
'cn': strip_accents('%s %s' % (prenom, nom)).strip(),
}
if supann_entite_affectation:
d['supannEntiteAffectation'] = supann_entite_affectation
if telephone:
d['telephoneNumber'] = telephone
if supann_entite_affectation:
d['supannEntiteAffectationPrincipale'] = supann_entite_affectation_principale
self.users[uid].update(d)
self.users[uid].setdefault('_source', set()).add('sun')
def main():
users = None
if not sys.argv[1:]:
print 'Utilisation : curie2supann LDIF_FILE [LDIF_FILE..]'
print
print 'Le nouveau fichier LDIF est émis sur la sortie standard et les erreurs sur la sortie'
print 'erreur.'
sys.exit(1)
for path in sys.argv[1:]:
parser = CurieLdifParser(open(path), users=users)
parser.parse()
users = parser.users
if parser.errors:
print >>sys.stderr, path, 'ERRORS:', len(parser.errors)
for error in parser.errors:
print >>sys.stderr, ' -', error
else:
print >>sys.stderr, path, 'OK'
writer = ldif.LDIFWriter(sys.stdout)
for uid in users:
d = users[uid]
if d['_source'] != set(['ad', 'sun']):
msg = 'uid=%s uniquement dans l\'annuaire %s' % (uid, list(d['_source'])[0])
print >>sys.stderr, msg
else:
# make it globally unique
d['uid'] = d['uid'] + '@curie'
dn = [[('uid', d['uid'], 1)],
[('ou', 'people', 1)],
[('dc', DC, 1)],
[('dc', 'fr', 1)]]
entry = {}
for k, v in d.items():
if k.startswith('_'):
continue
if isinstance(v, list):
entry[k] = v
else:
entry[k] = [v]
writer.unparse(ldap.dn.dn2str(dn), entry)
if __name__ == '__main__':
main()