script pour complétion des attributs pslBadgeCsn (#33401)
This commit is contained in:
parent
f5a80149fb
commit
effd9d678a
|
@ -0,0 +1,282 @@
|
|||
#!/usr/bin/env python
|
||||
import logging
|
||||
import ldap
|
||||
import ldap.sasl
|
||||
import ldap.dn
|
||||
import ldap.modlist
|
||||
import sys
|
||||
|
||||
from ldaptools import paged
|
||||
|
||||
base = 'o=meta'
|
||||
|
||||
logger = logging.getLogger()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
def split_name(attribute):
|
||||
parts = attribute.split(';')
|
||||
return parts[0].lower(), set(map(str.lower, parts[1:]))
|
||||
|
||||
ATTRIBUTE = 'pslbadgecsn'
|
||||
|
||||
FORMATS = {
|
||||
'x-csn-msb56hex': {'fmt': 'hex', 'msb_or_lsb': 'msb'},
|
||||
'x-csn-lsb56hex': {'fmt': 'hex', 'msb_or_lsb': 'lsb'},
|
||||
'x-csn-msb56dec': {'fmt': 'dec', 'msb_or_lsb': 'msb'},
|
||||
'x-csn-lsb56dec': {'fmt': 'dec', 'msb_or_lsb': 'lsb'},
|
||||
'x-csn-msb56bin': {'fmt': 'bin', 'msb_or_lsb': 'msb'},
|
||||
'x-csn-lsb56bin': {'fmt': 'bin', 'msb_or_lsb': 'lsb'},
|
||||
}
|
||||
|
||||
|
||||
class PSLError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def check_format(fmt, value):
|
||||
if FORMATS[fmt]['fmt'] == 'dec':
|
||||
try:
|
||||
int(value)
|
||||
except ValueError:
|
||||
return False
|
||||
if FORMATS[fmt]['fmt'] == 'hex':
|
||||
value = value.strip()
|
||||
try:
|
||||
int(value, 16)
|
||||
except ValueError:
|
||||
return False
|
||||
if len(value) != 14:
|
||||
return False
|
||||
if FORMATS[fmt]['fmt'] == 'bin':
|
||||
value = value.strip()
|
||||
try:
|
||||
int(value, 2)
|
||||
except ValueError:
|
||||
return False
|
||||
if len(value) != 56:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def to_mifare(fmt, value):
|
||||
return Mifare(value, **FORMATS[fmt])
|
||||
|
||||
|
||||
def to_fmt(fmt, value):
|
||||
return value.fmt(**FORMATS[fmt])
|
||||
|
||||
|
||||
def codes_by_format(record):
|
||||
'''Return pslBadgeCsn values sorted by format'''
|
||||
codes = {}
|
||||
for attribute in record:
|
||||
name, tags = split_name(attribute)
|
||||
if name != ATTRIBUTE:
|
||||
continue
|
||||
formats = [tag for tag in tags if tag in FORMATS]
|
||||
if not formats:
|
||||
raise PSLError('missing x-csn- tag')
|
||||
if len(formats) > 1:
|
||||
raise PSLError('too many tags %s' % tags)
|
||||
values = record[attribute]
|
||||
if not values:
|
||||
continue
|
||||
if len(values) > 1:
|
||||
raise PSLError('too many values for attribute %s' % attribute)
|
||||
fmt = formats[0]
|
||||
value = values[0]
|
||||
if not check_format(fmt, value):
|
||||
raise PSLError('value has wrong format for attribute %s, %r' % (attribute, value))
|
||||
codes[fmt] = value
|
||||
return codes
|
||||
|
||||
|
||||
def add_missing_codes(conn, dn, codes):
|
||||
missing_formats = set(FORMATS) - set(codes)
|
||||
assert missing_formats, 'no missing formats'
|
||||
|
||||
fmt = codes.keys()[0]
|
||||
value = to_mifare(fmt, codes[fmt])
|
||||
|
||||
for other_fmt in codes:
|
||||
if other_fmt == fmt:
|
||||
continue
|
||||
assert codes[other_fmt] == value.fmt(**FORMATS[other_fmt]), dn
|
||||
|
||||
new_codes = {fmt: to_fmt(fmt, value) for fmt in missing_formats}
|
||||
modlist = []
|
||||
for fmt in new_codes:
|
||||
modlist.append((ldap.MOD_ADD, '%s;%s' % (ATTRIBUTE, fmt), [new_codes[fmt]]))
|
||||
if modlist:
|
||||
conn.modify_s(dn, modlist)
|
||||
return len(new_codes)
|
||||
|
||||
|
||||
class Mifare(object):
|
||||
def revert_hex(s):
|
||||
n = ''
|
||||
while s:
|
||||
n += s[-2:]
|
||||
s = s[:-2]
|
||||
return n
|
||||
|
||||
def revert_bin(s):
|
||||
n = ''
|
||||
while s:
|
||||
n += s[-8:]
|
||||
s = s[:-8]
|
||||
return n
|
||||
|
||||
def dec_to_hex(d):
|
||||
h = hex(int(d)).lstrip('0x')
|
||||
h = '0' * (14 - len(h)) + h
|
||||
return h
|
||||
|
||||
def hex_to_dec(s):
|
||||
return str(int(s, 16))
|
||||
|
||||
def dec_to_bin(d):
|
||||
b = bin(int(d)).lstrip('0b')
|
||||
b = '0' * (56 - len(b)) + b
|
||||
return b
|
||||
|
||||
def bin_to_dec(s):
|
||||
return str(int(s, 2))
|
||||
|
||||
# test vectors
|
||||
msb_dec = '1284410151620224'
|
||||
lsb_dec = '36115473924526084'
|
||||
msb_bin = '00000100100100000010101000001010110101010100111010000000'
|
||||
lsb_bin = '10000000010011101101010100001010001010101001000000000100'
|
||||
msb_hex = '04902a0ad54e80'
|
||||
lsb_hex = '804ed50a2a9004'
|
||||
|
||||
assert dec_to_hex(msb_dec) == msb_hex
|
||||
assert dec_to_hex(lsb_dec) == lsb_hex
|
||||
|
||||
assert dec_to_bin(msb_dec) == msb_bin
|
||||
assert dec_to_bin(lsb_dec) == lsb_bin
|
||||
|
||||
assert revert_hex(msb_hex) == lsb_hex
|
||||
assert revert_bin(msb_bin) == lsb_bin
|
||||
|
||||
assert hex_to_dec(msb_hex) == msb_dec
|
||||
assert hex_to_dec(lsb_hex) == lsb_dec
|
||||
|
||||
assert bin_to_dec(msb_bin) == msb_dec
|
||||
assert bin_to_dec(lsb_bin) == lsb_dec
|
||||
|
||||
assert hex_to_dec(revert_hex(dec_to_hex(msb_dec))) == lsb_dec
|
||||
assert bin_to_dec(revert_bin(dec_to_bin(msb_dec))) == lsb_dec
|
||||
|
||||
assert bin_to_dec(revert_bin(msb_bin)) == lsb_dec
|
||||
|
||||
revert_hex = staticmethod(revert_hex)
|
||||
revert_bin = staticmethod(revert_bin)
|
||||
dec_to_hex = staticmethod(dec_to_hex)
|
||||
hex_to_dec = staticmethod(hex_to_dec)
|
||||
dec_to_bin = staticmethod(dec_to_bin)
|
||||
bin_to_dec = staticmethod(bin_to_dec)
|
||||
|
||||
# default storage is hex-msb
|
||||
def __init__(self, value, fmt, msb_or_lsb):
|
||||
assert fmt in ('dec', 'hex', 'bin')
|
||||
assert msb_or_lsb in ('msb', 'lsb')
|
||||
|
||||
if fmt == 'dec':
|
||||
value = self.dec_to_hex(value)
|
||||
elif fmt == 'hex':
|
||||
pass
|
||||
elif fmt == 'bin':
|
||||
value = self.dec_to_hex(self.bin_to_dec(value))
|
||||
|
||||
if msb_or_lsb == 'lsb':
|
||||
value = self.revert_hex(value)
|
||||
self.value = value
|
||||
|
||||
def fmt(self, fmt, msb_or_lsb):
|
||||
assert fmt in ('dec', 'hex', 'bin')
|
||||
assert msb_or_lsb in ('msb', 'lsb')
|
||||
|
||||
value = self.value
|
||||
if msb_or_lsb == 'lsb':
|
||||
value = self.revert_hex(value)
|
||||
|
||||
if fmt == 'hex':
|
||||
return value
|
||||
value = self.hex_to_dec(value)
|
||||
if fmt == 'dec':
|
||||
return value
|
||||
value = self.dec_to_bin(value)
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def test(cls):
|
||||
for fmt1 in ('dec', 'hex', 'bin'):
|
||||
for dir1 in ('msb', 'lsb'):
|
||||
for fmt2 in ('dec', 'hex', 'bin'):
|
||||
for dir2 in ('msb', 'lsb'):
|
||||
value1 = getattr(cls, '%s_%s' % (dir1, fmt1))
|
||||
value2 = getattr(cls, '%s_%s' % (dir2, fmt2))
|
||||
assert Mifare(value1, fmt1, dir1).fmt(fmt2, dir2) == value2
|
||||
|
||||
Mifare.test()
|
||||
|
||||
def show_help():
|
||||
print 'slapd-supann psl-badge-csn [complete|convert dc=univ,dc=fr [%s]' % '|'.join(FORMATS)
|
||||
raise SystemExit(1)
|
||||
|
||||
if not sys.argv[1:]:
|
||||
show_help()
|
||||
|
||||
if sys.argv[1] == 'complete':
|
||||
conn = paged.PagedLDAPObject(sys.argv[2] if sys.argv[2:] else 'ldapi://', retry_max=3, retry_delay=2)
|
||||
conn.sasl_interactive_bind_s("", ldap.sasl.external())
|
||||
result = conn.paged_search_ext_s(sys.argv[3] if sys.argv[3:] else base, ldap.SCOPE_SUBTREE, '%s=*' % ATTRIBUTE, [ATTRIBUTE])
|
||||
for dn, record in result:
|
||||
try:
|
||||
codes = codes_by_format(record)
|
||||
except PSLError as e:
|
||||
logger.error('%s error : %s', dn, str(e))
|
||||
continue
|
||||
|
||||
if not codes:
|
||||
logger.error('%s error: it has no readable pslBadgeCsn', dn)
|
||||
continue
|
||||
|
||||
if len(codes) == len(FORMATS):
|
||||
# ok all is good, continue
|
||||
continue
|
||||
|
||||
if len(codes) > 1:
|
||||
logger.warning('%s: entry ignored but it has less than %d formats (it has %d)',
|
||||
dn, len(FORMATS), len(codes))
|
||||
continue
|
||||
count = add_missing_codes(conn, dn, codes)
|
||||
if count:
|
||||
logger.info('%s: added %d new codes', dn, count)
|
||||
elif sys.argv[1] == 'convert':
|
||||
if len(sys.argv[2:]) < 2:
|
||||
show_help()
|
||||
base = sys.argv[2]
|
||||
fmt = sys.argv[3]
|
||||
print 3
|
||||
if fmt not in FORMATS:
|
||||
show_help()
|
||||
conn = paged.PagedLDAPObject(sys.argv[4] if sys.argv[4:] else 'ldapi://', retry_max=3, retry_delay=2)
|
||||
conn.sasl_interactive_bind_s("", ldap.sasl.external())
|
||||
result = conn.paged_search_ext_s(base, ldap.SCOPE_SUBTREE, '%s=*' % ATTRIBUTE, [ATTRIBUTE])
|
||||
for dn, record in result:
|
||||
for attribute in record:
|
||||
name, tags = split_name(attribute)
|
||||
if tags:
|
||||
continue
|
||||
modlist = [
|
||||
(ldap.MOD_DELETE, ATTRIBUTE, None),
|
||||
(ldap.MOD_ADD, ATTRIBUTE + ';' + fmt, record[attribute]),
|
||||
]
|
||||
conn.modify_s(dn, modlist)
|
||||
logger.info('%s: converting pslBadgeCsn to fmt %s', dn, fmt)
|
||||
else:
|
||||
show_help()
|
|
@ -0,0 +1,20 @@
|
|||
manipule les attributs pslBadgeCsn
|
||||
|
||||
slapd-supann complete [ldapi://] [o=meta]
|
||||
|
||||
Ajoute les formats manquants aux attributs pslBadgeCsn, les attributs existants
|
||||
doivent déjà avoir une option de format ';x-csn-' et doivent être cohérents
|
||||
(toutes les valeurs doivent correspondre) sinon une erreur sera levé pour le DN
|
||||
concerné.
|
||||
|
||||
Le premier argument optionnel est l'adresse du serveur OpenLDAP local via socket unix.
|
||||
|
||||
Le deuxième argument optionnel est la racine de l'arbre LDAP meta.
|
||||
|
||||
slapd-supann convert dc=univ-psl,dc=fr x-csn-msb56hex [ldapi://]
|
||||
|
||||
Convertit une base existante n'utilisant pas les options de format en l'ajoutant à tous les attributs pslBadgeCsn n'ayant pas d'option de format.
|
||||
|
||||
Le premier argument doit être la racine de l'arbre LDAP.
|
||||
|
||||
Le deuxième argument doit être le format utilisé.
|
Reference in New Issue