This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-pratic/src/authentic2_pratic/management/commands/load-pratic-ldif.py

270 lines
10 KiB
Python

# -*- coding: utf-8 -*-
import logging
from optparse import make_option
from datetime import datetime
from itertools import chain
import ldif
from django.core.management.base import BaseCommand
from django.core.exceptions import ValidationError
from django.db import transaction
from django.utils.timezone import get_default_timezone, make_aware
from django_rbac.managers import defer_update_transitive_closure
from authentic2.hashers import olap_password_to_dj
from authentic2_pratic.models import (Service, Collectivity, ServiceInstance,
User, Access)
class PraticLDIFParser(ldif.LDIFParser):
def __init__(self, *args, **kwargs):
self.services = {}
for s in Service.objects.all():
self.services[s.slug] = s
self.service_instances = {}
for si in ServiceInstance.objects.all():
self.service_instances[(si.collectivity.slug, si.slug)] = si
self.accesses = []
self.collectivities = {}
for col in Collectivity.objects.all():
self.collectivities['ou=%s,dc=pratic59,dc=fr' % col.slug] = col
self.users = []
self.log = logging.getLogger(__name__)
ldif.LDIFParser.__init__(self, *args, **kwargs)
def convert_to_utf8(self, entry):
for key in entry:
new_values = []
for value in entry[key]:
try:
value = unicode(value, 'utf-8')
except UnicodeDecodeError:
pass
new_values.append(value)
entry[key] = new_values
def handle(self, dn, entry):
objectclass = entry['objectClass']
self.convert_to_utf8(entry)
if 'cdg59serviceInstance' in objectclass:
self.handle_service_instance(dn, entry)
elif 'cdg59service' in objectclass:
self.handle_service(dn, entry)
elif 'cdg59collectivity' in objectclass:
self.handle_collectivity(dn, entry)
elif 'cdg59agent' in objectclass:
self.handle_user(dn, entry)
def resolve_attribute(self, entry, key):
return entry.get(key, [''])[0]
def resolve_mapping(self, mapping, entry):
d = {}
for key in mapping:
ldap_name = mapping[key]
if ldap_name in entry:
d[key] = entry[ldap_name][0]
return d
def handle_collectivity(self, dn, entry):
slug = entry['ou'][0]
name = entry.get('cn', ['slug'])[0]
mapping = {
'collectivity_id': 'cdg59collectivityId',
'sirh_code': 'cdg59collectivitySirhCode',
'sirh_label': 'cdg59collectivitySirhLabel',
'siret_code': 'cdg59siretCode',
'postal_address': 'postalAddress',
'street_number': 'cdg59streetNumber',
'street': 'street',
'postal_code': 'postalCode',
'address_complementary': 'cdg59addressCompl',
'address_mention': 'cdg59addressMention',
'arrondissement_code': 'cdg59arrondissementCode',
'canton_code': 'cdg59cantonCode',
'departement_code': 'cdg59departementCode',
'dist_office': 'cdg59distOffice',
'region_code': 'cdg59regionCode',
'phone': 'telephoneNumber',
'fax': 'facsimileTelephoneNumber',
'email': 'email',
'url': 'cdg59URL',
}
others = self.resolve_mapping(mapping, entry)
collectivity = Collectivity(name=name, slug=slug, **others)
try:
collectivity.clean()
except ValidationError, e:
print 'Impossible de charger la collectivité', dn, ':', e
raise SystemExit
self.collectivities[dn] = collectivity
def handle_service(self, dn, entry):
# services
# dn: cdg59sid=agirhe,dc=pratic59,dc=fr
# cn: Agirhe
# cdg59isGlobal: TRUE
# objectClass: cdg59service
# cdg59metadataURL: https://agirhe.pratic59.fr/liberty/agirhe/saml/metadata.xml
# cdg59URL: https://agirhe.pratic59.fr/
# cdg59sid: agirhe
# structuralObjectClass: cdg59service
# entryUUID: c8a4a0fc-7813-102d-97eb-a7725ef3fb45
# creatorsName: uid=cdg59,ou=admin,dc=pratic59,dc=fr
# createTimestamp: 20090116122019Z
# entryCSN: 20090116122019.000000Z#000000#000#000000
# modifiersName: uid=cdg59,ou=admin,dc=pratic59,dc=fr
# modifyTimestamp: 20090116122019Z
slug = entry['cdg59sid'][0]
name = entry.get('cn', [slug])[0]
service_url = entry.get('cdg59URL', [''])[0]
metadata_url = entry.get('cdg59metadataURL', [''])[0]
is_global = entry.get('cdg59isGlobal', ['FALSE'])[0] == 'TRUE'
service = Service(
name=name,
slug=slug,
is_global=is_global,
service_url=service_url,
metadata_url=metadata_url)
try:
service.clean()
except ValidationError, e:
print 'Impossible de charger le service', dn, ':', e
raise SystemExit
self.services[slug] = service
def handle_service_instance(self, dn, entry):
collectivity_dn = dn.split(',', 1)[1]
collectivity = self.collectivities[collectivity_dn]
mapping = {
'slug': 'cdg59siid',
'service_url': 'cdg59URL',
'metadata_url': 'cdg59metadataURL',
}
others = self.resolve_mapping(mapping, entry)
def resolve():
service = self.services[entry['cdg59serviceType'][0]]
if not service.is_global and 'service_url' not in others:
others['service_url'] = 'http://missing-url-%s.com' % others['slug']
service_instance = ServiceInstance(
collectivity=collectivity,
service=service,
**others)
try:
service_instance.clean()
except ValidationError, e:
print 'Impossible de charger l\'instance de service', dn, ':', e
raise SystemExit
return service_instance
self.service_instances[(collectivity.slug, others['slug'])] = resolve
def handle_user(self, dn, entry):
collectivity_dn = dn.split(',', 1)[1]
collectivity = self.collectivities[collectivity_dn]
is_active = entry.get('cdg59isDisabled', ['FALSE'])[0] == 'FALSE'
mapping = {
'first_name': 'givenName',
'last_name': 'sn',
'email': 'mail',
'uid': 'uid',
'sirh_code': 'cdg59agentSirhCode',
'direction': 'cdg59direction',
'employee_type': 'employeeType',
'postal_address': 'postalAddress',
'fax': 'facsimileTelephoneNumber',
'mobile': 'mobile',
'phone': 'telephoneNumber',
}
others = self.resolve_mapping(mapping, entry)
last_login_duration = int(entry.get('cdg59lastConnectionDuration', [0])[0])
is_admin = entry.get('cdg59isAdmin', ['FALSE'])[0] == 'TRUE'
user = User(
collectivity=collectivity,
is_active=is_active,
last_login_duration=last_login_duration,
is_admin=is_admin,
**others)
if 'cdg59lastConnectionTime' in entry:
last_login = datetime.fromtimestamp(int(entry['cdg59lastConnectionTime'][0]))
last_login = make_aware(last_login, get_default_timezone())
user.last_login = last_login
if 'userPassword' in entry:
password = olap_password_to_dj(entry['userPassword'][0])
user.password = password
user.clean()
self.users.append(user)
#accesses
for siid in entry.get('cdg59serviceAccesses', []):
def g(siid):
def f()
try:
service_instance = self.service_instances[(collectivity.slug, siid)]
access = Access(
user=user,
service_instance=service_instance)
return access
except KeyError:
pass
return f
self.accesses.append(g(siid))
class Command(BaseCommand):
'''Load LDAP ldif file'''
can_import_django_settings = True
requires_model_validation = True
option_list = BaseCommand.option_list + (
make_option('--fake',
action='store_true',
help='file to store a JSON log of created users'),
)
args = '<ldif_file...>'
help = 'Load/update LDIF files as users'
def save_object(self, obj):
# set fk
if not obj:
return
if obj.pk:
return
for attr in ('collectivity', 'service', 'service_instance', 'user'):
if hasattr(obj, attr):
setattr(obj, attr, getattr(obj, attr))
try:
# link to existing object
already = obj.__class__.objects.get_by_natural_key(*obj.natural_key())
obj.pk = already.pk
except obj.__class__.DoesNotExist:
pass
obj.save()
def handle(self, *args, **options):
options['verbosity'] = int(options['verbosity'])
for arg in args:
f = file(arg)
parser = PraticLDIFParser(f)
parser.parse()
print 'Parsed:'
print '', '-', len(parser.collectivities), 'collectivities'
print '', '-', len(parser.services), 'services'
print '', '-', len(parser.users), 'users'
print '', '-', len(parser.service_instances), 'service instances'
print '', '-', len(parser.accesses), 'accesses'
for key in parser.service_instances:
if callable(parser.service_instances[key]):
parser.service_instances[key] = parser.service_instances[key]()
parser.accesses = [resolve() for resolve in parser.accesses]
with transaction.atomic():
with defer_update_transitive_closure():
for obj in chain(parser.collectivities.itervalues(),
parser.services.itervalues(),
parser.service_instances.itervalues(),
parser.users,
parser.accesses):
self.save_object(obj)
if options['fake']:
raise Exception('Fake execution')