This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
portail-citoyen2/portail_citoyen/management/commands/load-drupal-users.py

165 lines
6.0 KiB
Python

import random
import string
import locale
from optparse import make_option
import os.path
import logging
import csv
import operator
import collections
try:
from django.contrib.auth import get_user_model
except ImportError:
from django.contrib.auth.models import User
get_user_model = lambda: User
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from authentic2.saml.models import LibertyServiceProvider, LibertyFederation
from authentic2.hashers import Drupal7PasswordHasher
logger = logging.getLogger()
class FakeException(Exception):
pass
class Command(BaseCommand):
INPUT_FIELD_NAMES = (
'uid', 'username', 'password', 'email', 'firstname', 'name_id'
)
OUTPUT_FIELD_NAMES = (
'uid', 'username', 'name_id',
)
args = 'input.csv output.csv'
help = '''Load Drupal users
The input file must have the following columns:
''' + ', '.join(INPUT_FIELD_NAMES) + '\n'
option_list = BaseCommand.option_list + (
make_option('--provider', action="store"),
make_option('--list-providers', action="store_true"),
make_option('--verbose', action="store_true"),
make_option('--fake', action="store_true"),
make_option('--debug', action="store_true"),
)
def handle(self, *args, **options):
locale.setlocale(locale.LC_ALL, '')
if options['verbose'] or options['debug']:
handler = logging.StreamHandler()
handler.setLevel(level=logging.DEBUG if options['debug'] else logging.INFO)
logger.addHandler(handler)
if options['list_providers']:
print 'Providers:'
for p in LibertyServiceProvider.objects.all():
print '-', p.liberty_provider.slug
return
try:
self.load_drupal_users(args[0], args[1], options)
except FakeException:
pass
def generate_name_id(self):
# example: _9903A47299512211F49F9E7931183761
alpha = string.ascii_uppercase + string.digits
name_id = ''.join(random.choice(alpha) for i in xrange(32))
return '_%s' % name_id
@transaction.commit_on_success
def load_drupal_users(self, input_csv, output_csv, options):
infile = file(input_csv)
outfile = file(output_csv, 'w')
csv_in = csv.DictReader(infile, fieldnames=self.INPUT_FIELD_NAMES)
# skip header line
csv_in.next()
rows = list(csv_in)
csv_out = csv.DictWriter(outfile, fieldnames=self.OUTPUT_FIELD_NAMES)
# writer header line to output file
csv_out.writerow(dict(zip(self.OUTPUT_FIELD_NAMES,
self.OUTPUT_FIELD_NAMES)))
new_users = []
if not options['provider']:
raise CommandError('--provider option is required')
try:
provider = LibertyServiceProvider.objects.get(liberty_provider__slug=options['provider'])
except LibertyProvider.DoesNotExist:
raise CommandError('service provider %s does not exist' % options['provider'])
User = get_user_model()
hasher = Drupal7PasswordHasher()
created_account = 0
logger.info('loading drupal accounts from %r', input_csv)
errors = False
def filter_null(row):
for key, cell in row.iteritems():
if cell == 'NULL':
yield key, ''
else:
yield key, cell
for row in rows:
row = dict(filter_null(row))
uid = row['uid']
username = row['username']
password = row['password']
email = row['email']
firstname = row['firstname'].decode('utf-8')[:30]
name_id = row['name_id']
if name_id:
try:
fed = LibertyFederation.objects.get(
user__isnull=False,
sp=provider,
name_id_format='urn:oasis:names:tc:SAML:2.0:nameid-format:persistent',
name_id_content=name_id)
logger.debug('user %r already linked with name_id %r',
username, name_id)
continue
except LibertyFederation.DoesNotExist:
if User.objects.filter(username=username).exists():
errors = True
logger.warning('user %r exist but not with name_id %r, '
'aborting, check manually', username, name_id)
continue
else:
name_id = self.generate_name_id()
new_users.append({
'uid': uid,
'username': username,
'name_id': name_id
})
try:
User.objects.get(username=username)
errors = True
logger.warning('username %r already exists but does not have '
'a name_id, please check manually.' % username)
continue
except User.DoesNotExist:
pass
user = User.objects.create(
username=username,
email=email,
first_name=firstname,
password=hasher.from_drupal(password)
)
LibertyFederation.objects.create(
user=user,
sp=provider,
name_id_format='urn:oasis:names:tc:SAML:2.0:nameid-format:persistent',
name_id_content=name_id
)
logger.info('created account %r with name_id %r', username, name_id)
created_account += 1
if errors:
raise CommandError('Too much errors, see logs.')
if options['fake']:
logger.info('fake run')
raise FakeException()
else:
csv_out.writerows(new_users)
outfile.close()
logger.info('created %d accounts' % created_account)