447 lines
16 KiB
Python
447 lines
16 KiB
Python
import json
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.http import HttpResponseRedirect, HttpResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.contrib import messages
|
|
from django.contrib.auth.views import redirect_to_login
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.db.models.query import Q
|
|
from django.core.urlresolvers import reverse, reverse_lazy
|
|
|
|
from django.views.generic import (TemplateView, UpdateView,
|
|
CreateView, DeleteView)
|
|
from django_tables2 import SingleTableView
|
|
|
|
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY
|
|
from authentic2.manager.views import AjaxFormViewMixin, \
|
|
ActionMixin, OtherActionsMixin, TitleMixin, Action
|
|
from authentic2.manager.user_views import UserEditView, UserAddView
|
|
from authentic2.utils import make_url
|
|
from authentic2.attributes_ng.engine import get_attributes
|
|
|
|
from . import models, tables, forms, constants
|
|
|
|
class SearchMixin(object):
|
|
search_filter = []
|
|
|
|
def get_queryset(self):
|
|
qs = super(SearchMixin, self).get_queryset()
|
|
search = self.request.GET.get('search')
|
|
if search:
|
|
filters = []
|
|
for name in self.search_filter:
|
|
filters.append(Q(**{'%s__contains' % name: search}))
|
|
filters = reduce(Q.__or__, filters)
|
|
qs = qs.filter(filters)
|
|
return qs
|
|
|
|
class AdminMixin(object):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
user = request.user
|
|
if not user.is_authenticated:
|
|
return redirect_to_login(request.get_full_path())
|
|
|
|
if not user.is_superuser and (not hasattr(user, 'is_admin') or not user.is_admin):
|
|
messages.warning(request, _('You are not a super-administrator or a collectivity administrator'))
|
|
return redirect('auth_homepage')
|
|
return super(AdminMixin, self).dispatch(request, *args, **kwargs)
|
|
|
|
class SuperAdminMixin(object):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
user = request.user
|
|
if not user.is_authenticated:
|
|
return redirect_to_login(request.get_full_path())
|
|
|
|
if not user.is_superuser:
|
|
messages.warning(request, _('You are not a super-administrator'))
|
|
return redirect('auth_homepage')
|
|
return super(SuperAdminMixin, self).dispatch(request, *args, **kwargs)
|
|
|
|
class HomepageView(AdminMixin, TemplateView):
|
|
template_name = 'authentic2_pratic/homepage.html'
|
|
|
|
# Services
|
|
|
|
class ServicesView(SuperAdminMixin, SingleTableView):
|
|
template_name = 'authentic2_pratic/services.html'
|
|
model = models.Service
|
|
table_class = tables.ServiceTable
|
|
|
|
class ServiceAddView(SuperAdminMixin, TitleMixin, ActionMixin, AjaxFormViewMixin, CreateView):
|
|
model = models.Service
|
|
form_class = forms.ServiceForm
|
|
title = _('Add service')
|
|
template_name = 'authentic2_pratic/form.html'
|
|
action = _('Add')
|
|
success_url = '..'
|
|
|
|
class ServiceView(SuperAdminMixin, TitleMixin, OtherActionsMixin,
|
|
AjaxFormViewMixin, UpdateView):
|
|
model = models.Service
|
|
title = _('Edit service')
|
|
template_name = 'authentic2_pratic/form.html'
|
|
form_class = forms.ServiceForm
|
|
success_url = '..'
|
|
|
|
class ServiceDeleteView(SuperAdminMixin, TitleMixin, AjaxFormViewMixin, DeleteView):
|
|
model = models.Service
|
|
template_name = 'authentic2_pratic/delete.html'
|
|
title = _('Delete service')
|
|
success_url = reverse_lazy('a2-pratic-services')
|
|
|
|
# Collectivities
|
|
class CollectivitiesView(SuperAdminMixin, SearchMixin, SingleTableView):
|
|
search_filter = ('name', 'slug', 'sirh_label', 'postal_code')
|
|
template_name = 'authentic2_pratic/collectivities.html'
|
|
model = models.Collectivity
|
|
table_class = tables.CollectivityTable
|
|
|
|
class CollectivityAddView(SuperAdminMixin, TitleMixin, ActionMixin, AjaxFormViewMixin, CreateView):
|
|
model = models.Collectivity
|
|
title = _('Add collectivity')
|
|
template_name = 'authentic2_pratic/form.html'
|
|
action = _('Add')
|
|
form_class = forms.CollectivityForm
|
|
success_url = '..'
|
|
|
|
class CollectivityView(SuperAdminMixin, TitleMixin, OtherActionsMixin,
|
|
AjaxFormViewMixin, UpdateView):
|
|
model = models.Collectivity
|
|
title = _('Edit collectivity')
|
|
template_name = 'authentic2_pratic/collectivity_edit.html'
|
|
form_class = forms.CollectivityForm
|
|
pk_url_kwarg = 'collectivity_pk'
|
|
|
|
class CollectivityDeleteView(SuperAdminMixin, TitleMixin, AjaxFormViewMixin, DeleteView):
|
|
model = models.Collectivity
|
|
template_name = 'authentic2_pratic/delete.html'
|
|
title = _('Delete collectivity')
|
|
success_url = reverse_lazy('a2-pratic-collectivities')
|
|
pk_url_kwarg = 'collectivity_pk'
|
|
|
|
class CollectivityMixin(object):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
self.collectivity = get_object_or_404(models.Collectivity,
|
|
pk=kwargs['collectivity_pk'])
|
|
user = request.user
|
|
if not user.is_authenticated:
|
|
return redirect_to_login(request.get_full_path())
|
|
|
|
if not user.is_superuser and \
|
|
(not hasattr(user, 'is_admin') or
|
|
not (user.is_admin and user.collectivity == self.collectivity)):
|
|
messages.warning(request, _('You are not a super-administrator or an administrator of %s') % self.collectivity)
|
|
return redirect('auth_homepage')
|
|
return super(CollectivityMixin, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(CollectivityMixin, self).get_context_data(**kwargs)
|
|
ctx['collectivity'] = self.collectivity
|
|
return ctx
|
|
|
|
class CollectivityChildMixin(CollectivityMixin):
|
|
def get_queryset(self):
|
|
qs = super(CollectivityMixin, self).get_queryset()
|
|
return qs.filter(collectivity=self.collectivity)
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super(CollectivityMixin, self).get_form_kwargs()
|
|
if not kwargs.get('instance'):
|
|
kwargs['instance'] = self.model(collectivity=self.collectivity)
|
|
return kwargs
|
|
|
|
# users
|
|
class UsersView(CollectivityChildMixin, SingleTableView):
|
|
template_name = 'authentic2_pratic/users.html'
|
|
model = models.User
|
|
table_class = tables.UserTable
|
|
|
|
class UserAddView(CollectivityChildMixin, UserAddView):
|
|
permissions = []
|
|
model = models.User
|
|
title = _('Add agent')
|
|
action = _('Add')
|
|
form_class = forms.UserAddForm
|
|
template_name = 'authentic2_pratic/form.html'
|
|
success_url = '..'
|
|
fields = (
|
|
'uid',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'is_admin',
|
|
'direction',
|
|
'employee_type',
|
|
'postal_address',
|
|
'fax',
|
|
'mobile',
|
|
'phone',
|
|
'generate_password',
|
|
'password1',
|
|
'password2',
|
|
'send_mail',
|
|
'certificate_issuer_dn',
|
|
'certificate_subject_dn',
|
|
'certificate',
|
|
)
|
|
|
|
def get_success_url(self):
|
|
return reverse('a2-pratic-user-edit', kwargs={'collectivity_pk': self.object.collectivity.pk, 'pk': self.object.pk})
|
|
|
|
def get_fields(self):
|
|
fields = super(UserAddView, self).get_fields()
|
|
if 'is_superuser' in fields:
|
|
fields.remove('is_superuser')
|
|
return fields
|
|
|
|
class UserView(CollectivityChildMixin, UserEditView):
|
|
permissions = []
|
|
model = models.User
|
|
title = _('Edit agent')
|
|
form_class = forms.UserEditForm
|
|
template_name = 'authentic2_pratic/user_edit.html'
|
|
fields = (
|
|
'uid',
|
|
'first_name',
|
|
'last_name',
|
|
'email',
|
|
'is_admin',
|
|
'direction',
|
|
'employee_type',
|
|
'postal_address',
|
|
'fax',
|
|
'mobile',
|
|
'phone',
|
|
'certificate_issuer_dn',
|
|
'certificate_subject_dn',
|
|
'certificate',
|
|
)
|
|
|
|
def get_other_actions(self):
|
|
actions = list(super(UserView, self).get_other_actions())
|
|
for action in actions:
|
|
if action.url_name == 'a2-manager-user-change-password':
|
|
action.url_name = None
|
|
action.url = reverse('a2-manager-user-change-password',
|
|
kwargs={'collectivity_pk':
|
|
self.object.collectivity.pk,
|
|
'pk': self.object.pk})
|
|
return actions
|
|
|
|
def get_fields(self):
|
|
fields = super(UserView, self).get_fields()
|
|
if 'is_superuser' in fields:
|
|
fields.remove('is_superuser')
|
|
return fields
|
|
|
|
class UserDeleteView(CollectivityChildMixin, TitleMixin, AjaxFormViewMixin,
|
|
DeleteView):
|
|
model = models.User
|
|
template_name = 'authentic2_pratic/delete.html'
|
|
title = _('Delete agent')
|
|
|
|
def get_success_url(self):
|
|
return reverse('a2-pratic-users', kwargs={
|
|
'collectivity_pk': self.kwargs['collectivity_pk']})
|
|
|
|
# service instances
|
|
class ServiceInstancesView(CollectivityChildMixin, SingleTableView):
|
|
template_name = 'authentic2_pratic/service_instances.html'
|
|
model = models.ServiceInstance
|
|
table_class = tables.ServiceInstanceTable
|
|
|
|
class ServiceInstanceAddView(CollectivityChildMixin, TitleMixin, ActionMixin,
|
|
AjaxFormViewMixin, CreateView):
|
|
model = models.ServiceInstance
|
|
title = _('Add service instance')
|
|
template_name = 'authentic2_pratic/form.html'
|
|
action = _('Add')
|
|
form_class = forms.ServiceInstanceForm
|
|
success_url = '..'
|
|
|
|
|
|
class ServiceInstanceView(CollectivityChildMixin, TitleMixin,
|
|
OtherActionsMixin, AjaxFormViewMixin, UpdateView):
|
|
model = models.ServiceInstance
|
|
title = _('Edit service instance')
|
|
template_name = 'authentic2_pratic/form.html'
|
|
form_class = forms.ServiceInstanceForm
|
|
success_url = '..'
|
|
|
|
class ServiceInstanceDeleteView(CollectivityChildMixin, TitleMixin,
|
|
AjaxFormViewMixin, DeleteView):
|
|
model = models.ServiceInstance
|
|
template_name = 'authentic2_pratic/delete.html'
|
|
title = _('Delete service instance')
|
|
success_url = '../..'
|
|
|
|
# accesses
|
|
class AccessMixin(object):
|
|
def get_queryset(self):
|
|
qs = self.model.objects.all()
|
|
return qs.filter(user__collectivity=self.collectivity,
|
|
service_instance__collectivity=self.collectivity)
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super(CollectivityMixin, self).get_form_kwargs()
|
|
kwargs['collectivity'] = self.collectivity
|
|
return kwargs
|
|
|
|
class AccessesView(CollectivityMixin, TemplateView):
|
|
template_name = 'authentic2_pratic/accesses.html'
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
self.collectivity = get_object_or_404(models.Collectivity,
|
|
pk=kwargs['collectivity_pk'])
|
|
self.users = models.User.objects.filter(
|
|
collectivity=self.collectivity).order_by('first_name', 'last_name')
|
|
self.service_instances = models.ServiceInstance.objects.filter(
|
|
collectivity=self.collectivity)
|
|
return super(AccessesView, self).dispatch(request, *args, **kwargs)
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(AccessesView, self).get_context_data(**kwargs)
|
|
ctx['users'] = self.users
|
|
ctx['service_instances'] = self.service_instances
|
|
on = models.Access.objects.filter(
|
|
user__collectivity=self.collectivity,
|
|
service_instance__collectivity=self.collectivity) \
|
|
.values_list('user_id', 'service_instance_id')
|
|
ctx['accesses'] = [(user, [(si.pk, (user.pk, si.pk) in on)
|
|
for si in self.service_instances])
|
|
for user in self.users]
|
|
return ctx
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
on = []
|
|
tobuild = []
|
|
offs = []
|
|
for user in self.users:
|
|
for si in self.service_instances:
|
|
key = 'access-{user.pk}-{si.pk}'.format(
|
|
user=user, si=si)
|
|
t = Q(user__pk=user.pk, service_instance__pk=si.pk)
|
|
v = (user.pk, si.pk)
|
|
if request.POST.get(key) == 'on':
|
|
on.append(t)
|
|
tobuild.append(v)
|
|
else:
|
|
offs.append(t)
|
|
if offs:
|
|
models.Access.objects.filter(reduce(Q.__or__, offs)).delete()
|
|
if on:
|
|
found = models.Access.objects.filter(reduce(Q.__or__, on)) \
|
|
.values_list('user_id', 'service_instance_id')
|
|
for t in tobuild:
|
|
if t in found:
|
|
continue
|
|
models.Access.objects.get_or_create(user_id=t[0],
|
|
service_instance_id=t[1])
|
|
return HttpResponseRedirect('#accesses-table')
|
|
|
|
# general views
|
|
homepage = HomepageView.as_view()
|
|
|
|
# services
|
|
services = ServicesView.as_view()
|
|
service_edit = ServiceView.as_view()
|
|
service_add = ServiceAddView.as_view()
|
|
service_delete = ServiceDeleteView.as_view()
|
|
|
|
# collectivities
|
|
collectivities = CollectivitiesView.as_view()
|
|
collectivity_edit = CollectivityView.as_view()
|
|
collectivity_add = CollectivityAddView.as_view()
|
|
collectivity_delete = CollectivityDeleteView.as_view()
|
|
|
|
# collectivity users
|
|
collectivity_users = UsersView.as_view()
|
|
user_add = UserAddView.as_view()
|
|
user_edit = UserView.as_view()
|
|
user_delete = UserDeleteView.as_view()
|
|
|
|
# service instances
|
|
collectivity_service_instances = ServiceInstancesView.as_view()
|
|
service_instance_add = ServiceInstanceAddView.as_view()
|
|
service_instance_edit = ServiceInstanceView.as_view()
|
|
service_instance_delete = ServiceInstanceDeleteView.as_view()
|
|
|
|
# accesses
|
|
collectivity_accesses = AccessesView.as_view()
|
|
|
|
|
|
def get_service_links(request):
|
|
events = request.session.get(AUTHENTICATION_EVENTS_SESSION_KEY, [])
|
|
authentication_levels = set(event['how'] for event in events)
|
|
|
|
if not hasattr(request.user, 'collectivity'):
|
|
return []
|
|
service_links = []
|
|
qs = request.user.collectivity.service_instances.select_related()
|
|
qs = qs.filter(access__user=request.user)
|
|
for service_instance in qs:
|
|
needed_authent = ''
|
|
levels = dict(constants.AUTHENTICATION_LEVELS)
|
|
if not bool(set(service_instance.authentication_level) & authentication_levels):
|
|
needed_authent0 = service_instance.authentication_level
|
|
needed_authent1 = [levels[level] for level in needed_authent0]
|
|
needed_authent2 = map(unicode, needed_authent1)
|
|
needed_authent = u', '.join(needed_authent2)
|
|
if service_instance.service.is_global:
|
|
name = service_instance.service.name
|
|
slug = service_instance.service.slug
|
|
else:
|
|
name = service_instance.service.name
|
|
slug = service_instance.slug
|
|
if service_instance.metadata_url:
|
|
url = make_url('a2-idp-saml2-idp-sso',
|
|
params={'provider_id': service_instance.metadata_url})
|
|
elif service_instance.cas_service_url:
|
|
url = make_url('a2-idp-cas-login',
|
|
params={'service': service_instance.cas_service_url})
|
|
else:
|
|
url = service_instance.service_url
|
|
service_links.append((name, url, slug, needed_authent))
|
|
return service_links
|
|
|
|
|
|
def return_json(request, data):
|
|
response = HttpResponse(content_type='application/json')
|
|
json_str = json.dumps(data)
|
|
for variable in ('jsonpCallback', 'callback'):
|
|
if variable in request.GET:
|
|
json_str = '%s(%s);' % (request.GET[variable], json_str)
|
|
break
|
|
response.write(json_str)
|
|
return response
|
|
|
|
|
|
def flatten(o):
|
|
if isinstance(o, dict):
|
|
return dict((k, flatten(v)) for k, v in o.iteritems())
|
|
elif isinstance(o, (tuple, list, set)):
|
|
return o.__class__(flatten(v) for v in o)
|
|
elif isinstance(o, (bool, unicode, int, long)):
|
|
return o
|
|
else:
|
|
return unicode(o)
|
|
|
|
|
|
@login_required
|
|
def user_info(request):
|
|
attributes = get_attributes({'user': request.user})
|
|
return return_json(request, flatten(attributes))
|
|
|
|
|
|
@login_required
|
|
def agent_homepage_jsonp(request):
|
|
service_links = get_service_links(request)
|
|
data = []
|
|
for name, url, slug, needed_authent in service_links:
|
|
data.append({
|
|
'url': request.build_absolute_uri(url),
|
|
'label': name,
|
|
'authentication_level_is_enough': not bool(needed_authent),
|
|
'authentication_levels': filter(None, needed_authent.split(', ')),
|
|
})
|
|
return return_json(request, data)
|