This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
univnautes-old/virtualenv/pfidp/users_admin/views.py

322 lines
14 KiB
Python

# views for user admin
# -*- encoding: utf-8 -*-
from django.conf import settings
from django.shortcuts import render_to_response, redirect
from django.contrib.auth.decorators import user_passes_test
from django.template import RequestContext, loader, Context
from django.contrib import messages
from django.http import HttpResponse
import pfusers
from .forms import UserForm, NewUserForm, ConfirmForm, UploadFileForm
import datetime
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def index(request):
filter = request.GET.get('filter', None)
if filter is None:
filter = request.COOKIES.get('filter', None)
context = { 'users': pfusers.get_all_pfusers(filter),
'filter': filter or '' }
response = render_to_response('users_admin/index.html',
context,
context_instance=RequestContext(request))
if filter is not None:
response.set_cookie('filter', filter)
return response
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def create(request):
if request.method == 'POST':
form = NewUserForm(request.POST)
if form.is_valid():
expires = form.cleaned_data.get('expires')
# check expiration date
if settings.IDP_UA_MAX_EXPIRES > 0:
try:
delta = expires - datetime.date.today()
if delta.days > settings.IDP_UA_MAX_EXPIRES:
form.errors['expires'] = [u"Mauvaise date d'expiration " \
u"(maximum %d jours à partir d'aujourd'hui)." % settings.IDP_UA_MAX_EXPIRES]
except:
form.errors['expires'] = [u"Mauvais format de date d'expiration (jj/mm/aaaa)."]
if form.errors.get('expires'):
return render_to_response('users_admin/create.html',
{ 'form': form, },
context_instance=RequestContext(request))
name = form.cleaned_data.get('name')
password = form.cleaned_data.get('password')
descr = form.cleaned_data.get('descr')
disabled = form.cleaned_data.get('disabled')
multiple = form.cleaned_data.get('multiple')
userset_number = int(form.cleaned_data.get('userset_number'))
if userset_number == 1:
ret, log = pfusers.create(name, password=password, descr=descr,
expires=expires, disabled=disabled, multiple=multiple)
if ret:
messages.success(request, u'Utilisateur <%s> ajouté.' % name)
else:
messages.error(request, u'Erreur lors de la création <%s>: %s' % (name, log))
else: # (multiple users creation)
userset_start = int(form.cleaned_data.get('userset_start'))
for n in range(userset_start, userset_start+userset_number):
username = '%s-%d' % (name, n)
ret, log = pfusers.create(username, password=password, descr=descr,
expires=expires, disabled=disabled, multiple=multiple)
if not ret:
messages.error(request, u'Erreur lors de la création <%s>: %s' % (username, log))
break
messages.success(request, u'Utilisateurs <%s-%d> à <%s-%d> ajoutés.' % \
(name, userset_start, name, userset_start+userset_number-1))
return redirect('.')
else:
initial = {
'name': '',
'userset_number': 1,
'userset_start': 1,
}
if settings.IDP_UA_MAX_EXPIRES > 0:
dt = datetime.date.today() + datetime.timedelta(settings.IDP_UA_MAX_EXPIRES)
else:
dt = datetime.date.today() + datetime.timedelta(7) # one week, by default
initial['expires'] = dt.strftime('%d/%m/%Y')
form = NewUserForm(initial=initial)
return render_to_response('users_admin/create.html',
{ 'form': form, },
context_instance=RequestContext(request))
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def read(request, name=None):
user = pfusers.get_all_pfusers(with_password=True).get(name, None)
if user == None:
messages.error(request, u'Utilisateur <%s> inconnu.' % name)
return redirect('..')
return render_to_response('users_admin/read.html',
{ 'user': user, },
context_instance=RequestContext(request))
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def update(request, name=None):
user = pfusers.get_all_pfusers().get(name, None)
if user == None:
messages.error(request, u'Utilisateur <%s> inconnu.' % name)
return redirect('..')
if request.method == 'POST':
form = UserForm(request.POST)
if form.is_valid():
# check expiration date
expires = form.cleaned_data.get('expires')
if settings.IDP_UA_MAX_EXPIRES > 0:
# check only if the expire change
if expires != user['expires']:
try:
delta = expires - datetime.date.today()
if delta.days > settings.IDP_UA_MAX_EXPIRES:
form.errors['expires'] = [u"Mauvaise date d'expiration " \
u"(maximum %d jours à partir d'aujourd'hui)." % settings.IDP_UA_MAX_EXPIRES]
except:
form.errors['expires'] = [u"Mauvais format de date d'expiration (jj/mm/aaaa)."]
if form.errors.get('expires'):
return render_to_response('users_admin/update.html',
{ 'form': form, 'user': user, },
context_instance=RequestContext(request))
# modify the user
password = form.cleaned_data.get('password')
descr = form.cleaned_data.get('descr')
disabled = form.cleaned_data.get('disabled')
multiple = form.cleaned_data.get('multiple')
ret, log = pfusers.update(name, password=password, descr=descr, expires=expires,
disabled=disabled, multiple=multiple)
if ret:
messages.success(request, u'Utilisateur <%s> modifié.' % name)
return redirect('..')
else:
messages.error(request, u'Erreur lors de la modification de <%s>: %s' % (name, log))
return redirect('..')
else:
initial = {
'name': user['name'],
'descr': user['descr'],
'multiple': 'univnautes-idp-multiple' in user['priv'],
'disabled': user['disabled'],
}
if isinstance(user['expires'], datetime.date):
initial['expires'] = user['expires'].strftime('%d/%m/%Y')
form = UserForm(initial=initial)
return render_to_response('users_admin/update.html',
{ 'form': form, 'user': user, },
context_instance=RequestContext(request))
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def delete(request, name=None):
user = pfusers.get_all_pfusers().get(name, None)
if user == None:
messages.error(request, u'Utilisateur <%s> inconnu.' % name)
return redirect('..')
if request.method == 'POST':
form = ConfirmForm(request.POST)
if form.is_valid():
ret, log = pfusers.delete(name)
if ret:
messages.success(request, u'Utilisateur <%s> supprimé.' % name)
return redirect('..')
else:
messages.error(request, u'Erreur lors de la suppression de <%s>: %s' % (name, log))
return redirect('..')
form = ConfirmForm()
return render_to_response('users_admin/confirm.html',
{ 'form': form, 'users': [user] ,
'title': u"Supprimer le compte <%s> ?" % name },
context_instance=RequestContext(request))
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def desactivate(request, name=None):
user = pfusers.get_all_pfusers().get(name, None)
if user == None:
messages.error(request, u'Utilisateur <%s> inconnu.' % name)
return redirect('..')
if request.method == 'POST':
form = ConfirmForm(request.POST)
if form.is_valid():
ret, log = pfusers.desactivate(name)
if ret:
messages.success(request, u'Utilisateur <%s> désactivé.' % name)
return redirect('..')
else:
messages.error(request, u'Erreur lors de la désactivation de <%s>: %s' % (name, log))
return redirect('..')
form = ConfirmForm()
return render_to_response('users_admin/confirm.html',
{ 'form': form, 'users': [user] ,
'title': u"Désactiver le compte <%s> ?" % name },
context_instance=RequestContext(request))
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def activate(request, name=None):
user = pfusers.get_all_pfusers().get(name, None)
if user == None:
messages.error(request, u'Utilisateur <%s> inconnu.' % name)
return redirect('..')
if request.method == 'POST':
form = ConfirmForm(request.POST)
if form.is_valid():
ret, log = pfusers.activate(name)
if ret:
messages.success(request, u'Utilisateur <%s> activé.' % name)
return redirect('..')
else:
messages.error(request, u"Erreur lors de l'activation de <%s>: %s" % (name, log))
return redirect('..')
form = ConfirmForm()
return render_to_response('users_admin/confirm.html',
{ 'form': form, 'users': [user] ,
'title': u"Activer le compte <%s> ?" % name },
context_instance=RequestContext(request))
ACTION_NAME = {
'delete': u'Suppression',
'desactivate': u'Désactivation',
'activate': u'Activation',
'csv': u'Export CSV',
}
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def multiple(request):
if request.method == 'POST':
action = request.POST.get('action')
if action:
# we need a confirmation
if not action in ACTION_NAME:
messages.warning(request, u'Choisissez une action...')
return redirect('.')
else:
title = '%s de ces comptes ?' % ACTION_NAME[action]
names = request.POST.getlist('users')
if len(names) == 0:
messages.warning(request, u'Sélectionnez au moins un utilisateur.')
return redirect('.')
all_pfusers = pfusers.get_all_pfusers(with_password=(action=="csv"))
try:
users = [ all_pfusers[name] for name in names ]
except KeyError:
messages.error(request, u'Au moins un utilisateur inconnu dans la liste.')
return redirect('.')
if action == "csv":
return csv(users)
request.session['univnautes_idpua_action'] = action
request.session['univnautes_idpua_names'] = names
form = ConfirmForm()
return render_to_response('users_admin/confirm.html',
{ 'form': form,
'users': users,
'title': title },
context_instance=RequestContext(request))
else:
# normally, it's a confirmation
form = ConfirmForm(request.POST)
if form.is_valid():
try:
names = request.session['univnautes_idpua_names']
action = request.session['univnautes_idpua_action']
except KeyError:
messages.error(request, u'Erreur dans la session !')
return redirect('.')
if not action in ACTION_NAME:
messages.error(request, u'Action invalide')
return redirect('.')
success = []
errors = []
for name in names:
ret, log = getattr(pfusers, action)(name)
if ret:
success.append(name)
else:
errors.append('%s (%s)' % (name, log))
if success:
messages.success(request, u'%s: %s' % (ACTION_NAME[action], ', '.join(success)))
if errors:
messages.error(request, u'ERREUR %s: %s' % (ACTION_NAME[action], ', '.join(errors)))
del request.session['univnautes_idpua_names']
del request.session['univnautes_idpua_action']
else:
messages.error('Erreur lors de la confirmation.')
return redirect('.')
def csv(users):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="users.csv"'
t = loader.get_template('users_admin/users.csv')
c = Context({ 'users': users, })
response.write(t.render(c))
return response
@user_passes_test(lambda user: user.is_staff, login_url='/logout')
def csv_import(request):
if request.method == 'POST':
form = UploadFileForm(request.POST, request.FILES)
if form.is_valid():
try:
new_users = import_csv_file(request.FILES['file'])
except Exception as e:
messages.error(request, u'Import du fichier impossible, erreur : %s' % e)
else:
messages.success(request, u'%d utilisateurs importés' % new_users)
return redirect('.')
else:
form = UploadFileForm()
return render_to_response('users_admin/import.html',
{'form': form},
context_instance=RequestContext(request))
def import_csv_file(f):
filename = '/var/tmp/users-import.csv'
with open(filename, 'wb+') as destination:
for chunk in f.chunks():
destination.write(chunk)
raise Exception("TBD")
return 0