This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandayejs/mandayejs/mandaye/management/commands/migrate-users.py

91 lines
2.9 KiB
Python

from __future__ import absolute_import
import csv
import json
import ldif
import logging
from django.core.management.base import BaseCommand
from django.db import IntegrityError
from django.contrib.auth.models import User
from django.core.exceptions import ImproperlyConfigured
from django.utils.encoding import force_text
from mandayejs.mandaye.models import UserCredentials
from mellon.models import UserSAMLIdentifier
from mellon.utils import get_idps
logger = logging.getLogger(__name__)
def get_issuer():
idps = list(get_idps())
if not idps:
raise ImproperlyConfigured('ENTITY_ID or METADATA_URL required in settings.MELLON_IDENTITY_PROVIDERS')
idp = idps[0]
issuer = idp.get('METADATA_URL', None) or idp.get('ENTITY_ID', None)
return issuer
class Command(BaseCommand):
help = 'Migrate users from ldif file or csv file'
def add_arguments(self, parser):
parser.add_argument('filename', metavar='FILENAME', type=str,
help='name of file to import')
parser.add_argument(
'--ldap',
action='store_true',
default=False,
help='Migrate users from a ldap dump file'
)
parser.add_argument(
'--csv',
action='store_true',
default=False,
help='Migrate users from a csv file'
)
def handle(self, filename, *args, **kwargs):
if kwargs.get('csv'):
data = self.get_csv_data(filename)
self.migrate(data)
else:
data = self.get_ldif_data(filename)
data = [d[1] for d in data]
data = [
{k: b''.join(v) for k, v in d.items()} for d in data]
self.migrate(data)
def get_ldif_data(self, filename):
with open(filename, 'r') as fd:
return ldif.ParseLDIF(fd)
def get_csv_data(self, filename):
with open(filename, 'r') as fd:
fieldnames = ['idpUniqueID', 'spPostValues']
reader = csv.DictReader(fd, delimiter=';', quotechar='|', fieldnames=fieldnames)
return list(reader)
def migrate(self, parsed_data):
issuer = get_issuer()
for data in parsed_data:
try:
name_id = force_text(data.get('idpUniqueID'))
credentials = json.loads(force_text(data.get('spPostValues')))
user, created = User.objects.get_or_create(
username=name_id, last_name=force_text(data.get('spLogin', '')))
uc = UserCredentials(user=user, locators=credentials)
uc.save()
saml_id, created = UserSAMLIdentifier.objects.get_or_create(user=user, name_id=name_id, issuer=issuer)
logger.debug('{idpUniqueID} imported'.format(**data))
except (IntegrityError,) as e:
logger.debug(e)
continue