155 lines
5.8 KiB
Python
155 lines
5.8 KiB
Python
import csv
|
|
import os.path
|
|
from optparse import make_option
|
|
import unicodedata
|
|
import random
|
|
import sys
|
|
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.contrib.auth import models as auth_models
|
|
from django.utils.encoding import force_text
|
|
|
|
from ... import models
|
|
|
|
|
|
def strip_accents(s):
|
|
return ''.join((c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn'))
|
|
|
|
|
|
def keep_letters(s):
|
|
return ''.join([c for c in s if c.isalpha()])
|
|
|
|
|
|
def unicode_csv_reader(utf8_csv_data, dialect=csv.excel, **kwargs):
|
|
# csv.py doesn't do Unicode; encode temporarily as UTF-8:
|
|
csv_reader = csv.reader(utf8_csv_data, dialect=dialect, **kwargs)
|
|
for row in csv_reader:
|
|
# decode UTF-8 back to Unicode, cell by cell:
|
|
yield [force_text(cell, 'utf-8') for cell in row]
|
|
|
|
|
|
def csv_to_list(s):
|
|
return filter(None, map(str.strip, s.split(u',')))
|
|
|
|
|
|
# Utilise seulement des majuscules et des chiffres, sauf i,l et 1, O et 0
|
|
__pwd_alphabet = 'ABCDEFGHJKMNPQRSTUVWXYZ23456789'
|
|
|
|
|
|
def create_password(pwd_length=8):
|
|
password = ''.join([random.choice(__pwd_alphabet) for x in range(pwd_length)])
|
|
return password
|
|
|
|
|
|
class Command(BaseCommand):
|
|
args = '[--profile default_profile1,default_profile2] [--group defaut_group1,default_group2] [--password default_password] [--generate-password] file.csv'
|
|
help = 'Load a CSV file containg user definitions'
|
|
|
|
option_list = BaseCommand.option_list + (
|
|
make_option("--profile", action='append', default=[]),
|
|
make_option("--group", action='append', default=[]),
|
|
make_option("--password", action='store'),
|
|
make_option("--activate", action='store_true'),
|
|
make_option("--generate-password", action='store_true', dest='generate_password'),
|
|
)
|
|
|
|
headers = {
|
|
'nom': 'last_name',
|
|
'prenom': 'first_name',
|
|
'email': 'email',
|
|
'profil': None,
|
|
'username': None,
|
|
'groupe': None,
|
|
'password': None,
|
|
}
|
|
|
|
def synthesis(self, data, **options):
|
|
if not data.get('username'):
|
|
if not ('nom' in data and 'prenom' in data):
|
|
raise CommandError('Username or nom/prenom must be given')
|
|
prenom = keep_letters(strip_accents(data['prenom'])).lower()
|
|
nom = keep_letters(strip_accents(data['nom'])).lower()
|
|
username = "%s.%s" % (prenom, nom)
|
|
data['username'] = username
|
|
|
|
if 'profil' not in data:
|
|
default_profiles = csv_to_list(','.join(map(force_text, options.get('profile', []))))
|
|
data['profil'] = default_profiles
|
|
else:
|
|
data['profil'] = csv_to_list(data['profil'])
|
|
|
|
if 'groupe' not in data:
|
|
default_groups = csv_to_list(','.join(map(force_text, options.get('group', []))))
|
|
data['groupe'] = default_groups
|
|
else:
|
|
data['groupe'] = csv_to_list(data['groupe'])
|
|
|
|
if not data.get('password'):
|
|
if options.get('password'):
|
|
data['password'] = force_text(options['password'], 'utf8')
|
|
elif options.get('generate_password', False):
|
|
data['password'] = force_text(create_password())
|
|
|
|
def handle(self, *args, **options):
|
|
if len(args) == 0:
|
|
raise CommandError("missing filename")
|
|
if not os.path.exists(args[0]):
|
|
raise CommandError("%s not found" % args[0])
|
|
tuples = unicode_csv_reader(open(args[0]), dialect='excel')
|
|
|
|
first = tuples.next()
|
|
allowed_headers = set(self.headers.keys())
|
|
if not set(first) <= allowed_headers:
|
|
msg = "Bad headers %s, only those are permitted: %s" % (first, allowed_headers)
|
|
raise CommandError(msg)
|
|
|
|
all_users = []
|
|
for line in tuples:
|
|
d = dict(zip(first, line))
|
|
self.synthesis(d, **options)
|
|
all_users.append(d)
|
|
all_profiles = set(reduce(list.__add__, [x['profil'] for x in all_users]))
|
|
profiles = dict()
|
|
count_created, count_modified = 0, 0
|
|
for profile in filter(None, all_profiles):
|
|
profiles[profile], created = models.MailingList.objects.get_or_create(name=profile)
|
|
profiles[profile].save()
|
|
|
|
for user in all_users:
|
|
username = user['username']
|
|
user_instance, created = auth_models.User.objects.get_or_create(username=username)
|
|
if created:
|
|
count_created += 1
|
|
else:
|
|
count_modified += 1
|
|
for header in self.headers:
|
|
map_to = self.headers[header]
|
|
if map_to and header in user:
|
|
setattr(user_instance, map_to, user[header])
|
|
if user['profil']:
|
|
user_profiles = map(profiles.get, user['profil'])
|
|
user_instance.mailing_lists = user_profiles
|
|
if user['groupe']:
|
|
user_groups = map(
|
|
lambda x: auth_models.Group.objects.get_or_create(name=x)[0], user['groupe']
|
|
)
|
|
user_instance.groups = user_groups
|
|
user_instance.is_staff = reduce(bool.__or__, ['Administrateur' in x for x in user['groupe']])
|
|
if user.get('password'):
|
|
password = user.get('password')
|
|
if password.startswith('sha1$'):
|
|
user_instance.password = password
|
|
else:
|
|
user_instance.set_password(user['password'])
|
|
if options.get('activate'):
|
|
user_instance.is_active = True
|
|
user_instance.save()
|
|
header = ['username', 'prenom', 'nom', 'email', 'password']
|
|
csv_writer = csv.DictWriter(sys.stdout, header)
|
|
csv_writer.writerow(dict(zip(header, header)))
|
|
for user in all_users:
|
|
d = {}
|
|
for key in header:
|
|
d[key] = user.get(key, '').encode('utf8')
|
|
csv_writer.writerow(d)
|