authentic/src/authentic2/manager/user_views.py

906 lines
34 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import collections
import operator
from django.db import models
from django.utils.translation import ugettext_lazy as _, pgettext_lazy, ugettext
from django.utils.html import format_html
from django.urls import reverse
from django.core.exceptions import PermissionDenied
from django.core.mail import EmailMultiAlternatives
from django.template import loader
from django.contrib.auth import get_user_model, REDIRECT_FIELD_NAME
from django.contrib.contenttypes.models import ContentType
from django.contrib import messages
from django.views.generic import FormView, TemplateView, DetailView
from django.views.generic.edit import BaseFormView
from django.views.generic.detail import SingleObjectMixin
from django.http import Http404, FileResponse, HttpResponseRedirect
import tablib
from authentic2.models import Attribute, AttributeValue, PasswordReset
from authentic2.utils import send_password_reset_mail, redirect, select_next_url, make_url, switch_user
from authentic2.a2_rbac.utils import get_default_ou
from authentic2 import hooks
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model
from .views import (BaseTableView, BaseAddView, BaseEditView, ActionMixin,
OtherActionsMixin, Action, ExportMixin, BaseSubTableView,
HideOUColumnMixin, BaseDeleteView, BaseDetailView,
TitleMixin, PermissionMixin, MediaMixin, FormNeedsRequest)
from .tables import UserTable, UserRolesTable, OuUserRolesTable, UserAuthorizationsTable
from .forms import (UserSearchForm, UserAddForm, UserEditForm,
UserChangePasswordForm, ChooseUserRoleForm,
UserRoleSearchForm, UserChangeEmailForm, UserNewImportForm,
UserEditImportForm, ChooseUserAuthorizationsForm)
from .resources import UserResource
from .utils import get_ou_count, has_show_username
from . import app_settings
User = get_user_model()
OU = get_ou_model()
class UsersView(HideOUColumnMixin, BaseTableView):
template_name = 'authentic2/manager/users.html'
model = get_user_model()
table_class = UserTable
permissions = ['custom_user.search_user']
search_form_class = UserSearchForm
title = _('Users')
def is_ou_specified(self):
return self.search_form.is_valid() \
and self.search_form.cleaned_data.get('ou')
def get_queryset(self):
qs = super(UsersView, self).get_queryset()
qs = qs.filter(deleted__isnull=True)
qs = qs.select_related('ou')
qs = qs.prefetch_related('roles', 'roles__parent_relation__parent')
return qs
def get_search_form_kwargs(self):
kwargs = super(UsersView, self).get_search_form_kwargs()
kwargs['minimum_chars'] = app_settings.USER_SEARCH_MINIMUM_CHARS
kwargs['show_all_ou'] = app_settings.SHOW_ALL_OU
return kwargs
def filter_by_search(self, qs):
qs = super(UsersView, self).filter_by_search(qs)
if not self.search_form.is_valid():
qs = qs.filter(ou=self.request.user.ou)
return qs
def get_table(self, **kwargs):
show_username = has_show_username()
if not show_username and self.is_ou_specified():
show_username = self.is_ou_specified().show_username
if not show_username:
exclude = kwargs.setdefault('exclude', [])
if 'username' not in exclude:
exclude.append('username')
table = super(UsersView, self).get_table(**kwargs)
if self.search_form.not_enough_chars():
user_qs = self.search_form.filter_by_ou(self.get_queryset())
table.empty_text = _('Enter at least %(limit)d characters '
'(%(user_count)d users)') % {
'limit': self.search_form.minimum_chars,
'user_count': user_qs.count(),
}
return table
def get_context_data(self, **kwargs):
ctx = super(UsersView, self).get_context_data()
if get_ou_count() < 2:
ou = get_default_ou()
else:
ou = self.search_form.cleaned_data.get('ou')
if ou and self.request.user.has_ou_perm('custom_user.add_user', ou):
ctx['add_ou'] = ou
extra_actions = ctx['extra_actions'] = []
if self.request.user.has_perm('custom_user.admin_user'):
extra_actions.append({
'url': reverse('a2-manager-users-imports'),
'label': _('Import users'),
})
return ctx
users = UsersView.as_view()
class UserAddView(BaseAddView):
model = get_user_model()
title = _('Create user')
action = _('Create')
fields = [
'username',
'first_name',
'last_name',
'email',
'generate_password',
'password1',
'password2',
'reset_password_at_next_login',
'send_mail']
form_class = UserAddForm
permissions = ['custom_user.add_user']
template_name = 'authentic2/manager/user_add.html'
duplicate_users = None
def dispatch(self, request, *args, **kwargs):
qs = request.user.ous_with_perm('custom_user.add_user')
try:
self.ou = qs.get(pk=self.kwargs['ou_pk'])
except OU.DoesNotExist:
raise PermissionDenied
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
kwargs = super(UserAddView, self).get_form_kwargs()
kwargs['ou'] = self.ou
return kwargs
def get_fields(self):
fields = list(self.fields)
if not self.ou.show_username:
fields.remove('username')
i = fields.index('generate_password')
if self.request.user.is_superuser and \
'is_superuser' not in self.fields:
fields.insert(i, 'is_superuser')
i += 1
for attribute in Attribute.objects.all():
fields.insert(i, attribute.name)
i += 1
return fields
def get_success_url(self):
return select_next_url(
self.request,
default=reverse('a2-manager-user-detail', kwargs={'pk': self.object.pk}),
include_post=True,
replace={
'$UUID': self.object.uuid,
})
def get_context_data(self, **kwargs):
context = super(UserAddView, self).get_context_data(**kwargs)
context['cancel_url'] = select_next_url(
self.request,
default='../..',
field_name='cancel')
context['next'] = select_next_url(self.request, default=None, include_post=True)
context['ou'] = self.ou
context['duplicate_users'] = self.duplicate_users
return context
def form_valid(self, form):
if app_settings.CHECK_DUPLICATE_USERS:
first_name = form.cleaned_data['first_name']
last_name = form.cleaned_data['last_name']
duplicate_users = User.objects.find_duplicates(
first_name=first_name,
last_name=last_name,
birthdate=form.cleaned_data.get('birthdate'),
)
token = self.request.POST.get('confirm-creation-token')
valid_confirmation_token = bool(token == '%s %s' % (first_name, last_name))
if duplicate_users and not valid_confirmation_token:
self.duplicate_users = duplicate_users
return self.form_invalid(form)
response = super(UserAddView, self).form_valid(form)
hooks.call_hooks('event', name='manager-add-user', user=self.request.user,
instance=form.instance, form=form)
return response
def get_initial(self, *args, **kwargs):
initial = super(UserAddView, self).get_initial(*args, **kwargs)
initial.update(self.get_user_add_policies())
return initial
def get_user_add_policies(self, *args, **kwargs):
ou = OU.objects.get(pk=self.kwargs['ou_pk'])
value = ou.user_add_password_policy
return ou.USER_ADD_PASSWD_POLICY_VALUES[value]._asdict()
user_add = UserAddView.as_view()
def user_add_default_ou(request):
ou = get_default_ou()
return redirect(request, 'a2-manager-user-add', kwargs={'ou_pk': ou.id}, keep_params=True)
class UserDetailView(OtherActionsMixin, BaseDetailView):
model = get_user_model()
fields = ['username', 'ou', 'first_name', 'last_name', 'email']
form_class = UserEditForm
template_name = 'authentic2/manager/user_detail.html'
slug_field = 'uuid'
def get_queryset(self):
qs = super(UserDetailView, self).get_queryset()
qs = qs.filter(deleted__isnull=True)
return qs
@property
def title(self):
return self.object.get_full_name()
@property
def is_oidc_services(self):
return OIDCClient.objects.exists()
def get_other_actions(self):
for action in super(UserDetailView, self).get_other_actions():
yield action
yield Action('password_reset', _('Reset password'),
permission='custom_user.reset_password_user')
if self.object.is_active:
yield Action('deactivate', _('Suspend'),
permission='custom_user.activate_user')
else:
yield Action('activate', _('Activate'),
permission='custom_user.activate_user')
if PasswordReset.objects.filter(user=self.object).exists():
yield Action('delete_password_reset', _('Do not force password change on next login'),
permission='custom_user.reset_password_user')
else:
yield Action('force_password_change', _('Force password change on '
'next login'),
permission='custom_user.reset_password_user')
yield Action('change_password', _('Change user password'),
url_name='a2-manager-user-change-password',
permission='custom_user.change_password_user')
if self.request.user.is_superuser:
yield Action('su', _('Impersonate this user'),
url_name='a2-manager-user-su')
if self.object.ou and self.object.ou.validate_emails:
yield Action('change_email', _('Change user email'),
url_name='a2-manager-user-change-email',
permission='custom_user.change_email_user')
def action_force_password_change(self, request, *args, **kwargs):
PasswordReset.objects.get_or_create(user=self.object)
def action_activate(self, request, *args, **kwargs):
self.object.is_active = True
self.object.save()
def action_deactivate(self, request, *args, **kwargs):
if request.user == self.object:
messages.warning(request, _('You cannot desactivate your own '
'user'))
else:
self.object.is_active = False
self.object.save()
def action_password_reset(self, request, *args, **kwargs):
user = self.object
if not user.email:
messages.info(request, _('User has no email, it\'not possible to '
'send him am email to reset its '
'password'))
return
send_password_reset_mail(user, request=request)
messages.info(request, _('A mail was sent to %s') % self.object.email)
def action_delete_password_reset(self, request, *args, **kwargs):
PasswordReset.objects.filter(user=self.object).delete()
def action_su(self, request, *args, **kwargs):
return redirect(request, 'auth_logout',
params={REDIRECT_FIELD_NAME: switch_user.build_url(self.object)})
# Copied from PasswordResetForm implementation
def send_mail(self, subject_template_name, email_template_name,
context, to_email):
"""
Sends a django.core.mail.EmailMultiAlternatives to `to_email`.
"""
subject = loader.render_to_string(subject_template_name, context)
# Email subject *must not* contain newlines
subject = ''.join(subject.splitlines())
body = loader.render_to_string(email_template_name, context)
email_message = EmailMultiAlternatives(subject, body, to=[to_email])
email_message.send()
def get_fields(self):
fields = list(self.fields)
if not self.object.username and self.object.ou and not self.object.ou.show_username:
fields.remove('username')
for attribute in Attribute.objects.all():
if attribute.name == 'address_autocomplete':
continue
fields.append(attribute.name)
if self.request.user.is_superuser and \
'is_superuser' not in self.fields:
fields.append('is_superuser')
return fields
def get_form(self, *args, **kwargs):
form = super(UserDetailView, self).get_form(*args, **kwargs)
if 'email' in form.fields:
if self.object.email_verified:
comment = _('Email verified')
else:
comment = _('Email not verified')
form.fields['email'].help_text = format_html('<b>{0}</b>', comment)
return form
@classmethod
def has_perm_on_roles(self, user, instance):
role_qs = get_role_model().objects.all()
if app_settings.ROLE_MEMBERS_FROM_OU and instance.ou:
role_qs = role_qs.filter(ou=instance.ou)
return user.filter_by_perm('a2_rbac.manage_members_role', role_qs).exists()
def get_context_data(self, **kwargs):
kwargs['default_ou'] = get_default_ou
roles = self.object.roles_and_parents().order_by('ou__name', 'name')
role_qs = get_role_model().objects.all()
if app_settings.ROLE_MEMBERS_FROM_OU and self.object.ou:
role_qs = role_qs.filter(ou=self.object.ou)
visible_roles = self.request.user.filter_by_perm('a2_rbac.view_role', role_qs)
roles_by_ou = collections.OrderedDict()
for role in roles:
role.user_visible = bool(role in visible_roles)
roles_by_ou.setdefault(role.ou.name if role.ou else '', []).append(role)
kwargs['roles'] = roles
kwargs['roles_by_ou'] = roles_by_ou
kwargs['have_roles_on_multiple_ou'] = len(roles_by_ou.keys()) > 1
# show modify roles button only if something is possible
kwargs['can_change_roles'] = self.has_perm_on_roles(self.request.user, self.object)
user_data = []
user_data += [data for datas in hooks.call_hooks('manager_user_data', self, self.object)
for data in datas]
kwargs['user_data'] = user_data
ctx = super(UserDetailView, self).get_context_data(**kwargs)
return ctx
user_detail = UserDetailView.as_view()
class UserEditView(OtherActionsMixin, ActionMixin, BaseEditView):
model = get_user_model()
template_name = 'authentic2/manager/user_edit.html'
form_class = UserEditForm
permissions = ['custom_user.change_user']
fields = ['username', 'ou', 'first_name', 'last_name']
slug_field = 'uuid'
action = _('Change')
title = _('Edit user')
def get_fields(self):
fields = list(self.fields)
if not self.object.username and self.object.ou and not self.object.ou.show_username:
fields.remove('username')
if not self.object.ou or not self.object.ou.validate_emails:
fields.append('email')
for attribute in Attribute.objects.all():
fields.append(attribute.name)
if self.request.user.is_superuser and \
'is_superuser' not in self.fields:
fields.append('is_superuser')
return fields
def _get_next_url(self):
return select_next_url(
self.request,
default=reverse('a2-manager-user-detail', kwargs={'pk': self.object.pk}),
include_post=True)
def get_context_data(self, **kwargs):
context = super(UserEditView, self).get_context_data(**kwargs)
next_url = self._get_next_url()
context['next'] = next_url
context['cancel_url'] = next_url
return context
def get_success_url(self):
return self._get_next_url()
def form_valid(self, form):
if 'email' in form.changed_data:
self.object.email_verified = False
self.object.save()
response = super(UserEditView, self).form_valid(form)
hooks.call_hooks('event', name='manager-edit-user', user=self.request.user,
instance=form.instance, form=form)
return response
user_edit = UserEditView.as_view()
class UsersExportView(ExportMixin, UsersView):
permissions = ['custom_user.view_user']
resource_class = UserResource
export_prefix = 'users-'
@property
def csv(self):
if hasattr(self._dataset, 'csv'):
# compatiblity for tablib < 0.11
return self._dataset.csv
return self._dataset.export('csv')
def get_dataset(self):
user_resource = UserResource()
fields = user_resource._meta.export_order + ('email_verified', 'is_active', 'modified')
attributes = [attr.name for attr in Attribute.objects.all()]
headers = fields + tuple(['attribute_%s' % attr for attr in attributes])
at_mapping = {a.id: a for a in Attribute.objects.all()}
avs = AttributeValue.objects.filter(
content_type=ContentType.objects.get_for_model(get_user_model()))\
.filter(attribute__disabled=False).values()
user_attrs = collections.defaultdict(dict)
for av in avs:
user_attrs[av['object_id']][at_mapping[av['attribute_id']].name] = av['content']
def iso(rec):
if rec is None or rec == {}:
return ''
if hasattr(rec, 'strftime'):
if isinstance(rec, datetime.datetime):
_format = "%Y-%m-%d %H:%M:%S"
else:
_format = "%Y-%m-%d"
return rec.strftime(_format)
return rec
def create_record(user):
record = []
for field in fields:
if field == 'roles':
value = user_resource.dehydrate_roles(user)
else:
value = getattr(user, field)
record.append(value)
attr_d = user_attrs[user.pk]
for attr in attributes:
record.append(attr_d.get(attr))
return [iso(x) for x in record]
self._dataset = tablib.Dataset(headers=headers)
for user in self.get_data():
self._dataset.append(create_record(user))
return self
users_export = UsersExportView.as_view()
class UserChangePasswordView(BaseEditView):
template_name = 'authentic2/manager/form.html'
model = get_user_model()
form_class = UserChangePasswordForm
permissions = ['custom_user.change_password_user']
title = _('Change user password')
success_url = '..'
slug_field = 'uuid'
def get_success_message(self, cleaned_data):
if cleaned_data.get('send_mail'):
return ugettext('New password sent to %s') % self.object.email
else:
return ugettext('New password set')
def form_valid(self, form):
response = super(UserChangePasswordView, self).form_valid(form)
hooks.call_hooks('event', name='manager-change-password', user=self.request.user,
instance=form.instance, form=form)
return response
user_change_password = UserChangePasswordView.as_view()
class UserChangeEmailView(BaseEditView):
template_name = 'authentic2/manager/user_change_email.html'
model = get_user_model()
form_class = UserChangeEmailForm
permissions = ['custom_user.change_email_user']
success_url = '..'
slug_field = 'uuid'
title = _('Change user email')
def get_success_message(self, cleaned_data):
if cleaned_data['new_email'] != self.object.email:
return ugettext('A mail was sent to %s to verify it.') % cleaned_data['new_email']
return None
def form_valid(self, form):
response = super(UserChangeEmailView, self).form_valid(form)
new_email = form.cleaned_data['new_email']
hooks.call_hooks(
'event',
name='manager-change-email-request',
user=self.request.user,
instance=form.instance,
form=form,
email=new_email)
return response
user_change_email = UserChangeEmailView.as_view()
class UserRolesView(HideOUColumnMixin, BaseSubTableView):
model = get_user_model()
form_class = ChooseUserRoleForm
search_form_class = UserRoleSearchForm
success_url = '.'
slug_field = 'uuid'
@property
def template_name(self):
if self.is_ou_specified():
return 'authentic2/manager/user_ou_roles.html'
else:
return 'authentic2/manager/user_roles.html'
@property
def table_pagination(self):
if self.is_ou_specified():
return False
return None
@property
def table_class(self):
if self.is_ou_specified():
return OuUserRolesTable
else:
return UserRolesTable
def is_ou_specified(self):
'''Differentiate view of all user's roles from view of roles by OU'''
return (self.search_form.is_valid()
and self.search_form.cleaned_data.get('ou_filter') != 'all')
def get_table_queryset(self):
if self.is_ou_specified():
roles = self.object.roles.all()
User = get_user_model()
Role = get_role_model()
RoleParenting = get_role_parenting_model()
rp_qs = RoleParenting.objects.filter(child__in=roles)
qs = Role.objects.all()
qs = qs.prefetch_related(models.Prefetch(
'child_relation', queryset=rp_qs, to_attr='via'))
qs = qs.prefetch_related(models.Prefetch(
'members', queryset=User.objects.filter(pk=self.object.pk),
to_attr='member'))
qs2 = self.request.user.filter_by_perm('a2_rbac.manage_members_role', qs)
managable_ids = [str(pk) for pk in qs2.values_list('pk', flat=True)]
qs = qs.extra(select={'has_perm': 'a2_rbac_role.id in (%s)' % ', '.join(managable_ids)})
qs = qs.exclude(slug__startswith='_a2-managers-of-role')
return qs
else:
return self.object.roles_and_parents()
def get_table_data(self):
qs = super(UserRolesView, self).get_table_data()
if self.is_ou_specified():
qs = list(qs)
return qs
def authorize(self, request, *args, **kwargs):
response = super(UserRolesView, self).authorize(request, *args, **kwargs)
if response is not None:
return response
if not UserDetailView.has_perm_on_roles(request.user, self.object):
return redirect(request, 'a2-manager-user-detail', kwargs={'pk': self.object.pk})
def form_valid(self, form):
user = self.object
role = form.cleaned_data['role']
action = form.cleaned_data['action']
if action == 'add':
if user.roles.filter(pk=role.pk):
messages.warning(
self.request,
_('User {user} has already the role {role}.')
.format(user=user, role=role))
else:
user.roles.add(role)
hooks.call_hooks('event', name='manager-add-role-member',
user=self.request.user, role=role, member=user)
elif action == 'remove':
user.roles.remove(role)
hooks.call_hooks('event', name='manager-remove-role-member', user=self.request.user,
role=role, member=user)
return super(UserRolesView, self).form_valid(form)
def get_search_form_kwargs(self):
kwargs = super(UserRolesView, self).get_search_form_kwargs()
kwargs['all_ou_label'] = u''
kwargs['user'] = self.object
kwargs['role_members_from_ou'] = app_settings.ROLE_MEMBERS_FROM_OU
kwargs['show_all_ou'] = app_settings.SHOW_ALL_OU
kwargs['queryset'] = self.request.user.filter_by_perm('a2_rbac.view_role', get_role_model().objects.all())
if self.object.ou_id:
initial = kwargs.setdefault('initial', {})
initial['ou'] = str(self.object.ou_id)
return kwargs
def get_form_kwargs(self):
kwargs = super(UserRolesView, self).get_form_kwargs()
# if role members can only be from the same OU, we filter roles based on the user's ou
if app_settings.ROLE_MEMBERS_FROM_OU and self.object.ou_id:
kwargs['ou'] = self.object.ou
return kwargs
roles = UserRolesView.as_view()
class UserDeleteView(BaseDeleteView):
model = get_user_model()
title = _('Delete user')
template_name = 'authentic2/manager/user_delete.html'
def get_success_url(self):
return reverse('a2-manager-users')
def delete(self, request, *args, **kwargs):
self.get_object().mark_as_deleted()
hooks.call_hooks('event', name='manager-delete-user', user=request.user,
instance=self.object)
return HttpResponseRedirect(self.get_success_url())
user_delete = UserDeleteView.as_view()
class UserImportsView(MediaMixin, PermissionMixin, FormView):
form_class = UserNewImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_imports.html'
def post(self, request, *args, **kwargs):
from authentic2.manager import user_import
if 'delete' in request.POST:
uuid = request.POST['delete']
user_import.UserImport(uuid).delete()
return redirect(self.request, 'a2-manager-users-imports')
return super(UserImportsView, self).post(request, *args, **kwargs)
def form_valid(self, form):
user_import = form.save()
with user_import.meta_update as meta:
meta['user'] = self.request.user.get_full_name()
meta['user_pk'] = self.request.user.pk
return redirect(self.request, 'a2-manager-users-import', kwargs={'uuid': user_import.uuid})
def get_context_data(self, **kwargs):
from authentic2.manager import user_import
ctx = super(UserImportsView, self).get_context_data(**kwargs)
ctx['imports'] = sorted(user_import.UserImport.all(), key=operator.attrgetter('created'), reverse=True)
help_columns = []
field_columns = ['username', 'email', 'first_name', 'last_name']
key = 'username'
if not has_show_username():
field_columns.remove('username')
key = 'email'
for field_column in field_columns:
field = User._meta.get_field(field_column)
if Attribute.objects.filter(name=field.name).exists():
continue
help_columns.append({
'label': field.verbose_name,
'name': field.name,
'key': field.name == key,
})
for attribute in Attribute.objects.all():
kind = attribute.get_kind()
if not kind.get('csv_importable', True):
continue
help_columns.append({
'label': attribute.label,
'name': attribute.name,
'key': attribute.name == key,
})
ctx['help_columns'] = help_columns
example_data = u','.join(column['name'] + (' key' if column['key'] else '') for column in help_columns) + '\n'
example_url = 'data:text/csv;base64,%s' % base64.b64encode(example_data.encode('utf-8')).decode('ascii')
ctx['form'].fields['import_file'].help_text = format_html(
_('{0}. {1} <a download="{3}" href="{2}">{3}</a>'),
ctx['form'].fields['import_file'].help_text,
_('ex.:'),
example_url,
_('users.csv'))
return ctx
user_imports = UserImportsView.as_view()
class UserImportView(MediaMixin, PermissionMixin, FormView):
form_class = UserEditImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_import.html'
def dispatch(self, request, uuid, **kwargs):
from authentic2.manager.user_import import UserImport
self.user_import = UserImport(uuid)
if not self.user_import.exists():
raise Http404
return super(UserImportView, self).dispatch(request, uuid, **kwargs)
def get(self, request, uuid, filename=None):
if filename:
return FileResponse(self.user_import.import_file, content_type='text/csv')
return super(UserImportView, self).get(request, uuid=uuid, filename=filename)
def get_form_kwargs(self):
kwargs = super(UserImportView, self).get_form_kwargs()
kwargs['user_import'] = self.user_import
return kwargs
def post(self, request, *args, **kwargs):
from authentic2.manager import user_import
if 'delete' in request.POST:
uuid = request.POST['delete']
try:
report = self.user_import.reports[uuid]
except KeyError:
pass
else:
report.delete()
return redirect(request, 'a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
simulate = 'simulate' in request.POST
execute = 'execute' in request.POST
if simulate or execute:
report = user_import.Report.new(self.user_import)
report.run(simulate=simulate)
return redirect(request, 'a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
return super(UserImportView, self).post(request, *args, **kwargs)
def form_valid(self, form):
form.save()
return super(UserImportView, self).form_valid(form)
def get_success_url(self):
return reverse('a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
def get_context_data(self, **kwargs):
ctx = super(UserImportView, self).get_context_data(**kwargs)
ctx['user_import'] = self.user_import
ctx['reports'] = sorted(self.user_import.reports, key=operator.attrgetter('created'), reverse=True)
return ctx
user_import = UserImportView.as_view()
class UserImportReportView(MediaMixin, PermissionMixin, TemplateView):
form_class = UserEditImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_import_report.html'
def dispatch(self, request, import_uuid, report_uuid):
from authentic2.manager.user_import import UserImport
self.user_import = UserImport(import_uuid)
if not self.user_import.exists():
raise Http404
try:
self.report = self.user_import.reports[report_uuid]
except KeyError:
raise Http404
return super(UserImportReportView, self).dispatch(request, import_uuid, report_uuid)
def get_context_data(self, **kwargs):
ctx = super(UserImportReportView, self).get_context_data(**kwargs)
ctx['user_import'] = self.user_import
ctx['report'] = self.report
if self.report.simulate:
ctx['report_title'] = _('Simulation')
else:
ctx['report_title'] = _('Execution')
return ctx
user_import_report = UserImportReportView.as_view()
def me(request):
if request.user.has_perm('custom_user.change_user', request.user):
return redirect(request, 'a2-manager-user-detail', kwargs={'pk': request.user.pk})
else:
return redirect(request, 'account_management')
class UserSuView(MediaMixin, TitleMixin, PermissionMixin, DetailView):
model = User
template_name = 'authentic2/manager/user_su.html'
title = _('Switch user')
duration = 30 # seconds
class Media:
js = (
'authentic2/js/js_seconds_until.js',
)
def dispatch(self, request, *args, **kwargs):
if not request.user.is_superuser:
raise PermissionDenied
return super(UserSuView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super(UserSuView, self).get_context_data(**kwargs)
ctx['su_url'] = make_url(
'auth_logout',
params={REDIRECT_FIELD_NAME: switch_user.build_url(self.object, self.duration)},
request=self.request,
absolute=True)
ctx['duration'] = self.duration
return ctx
su = UserSuView.as_view()
class UserAuthorizationsView(FormNeedsRequest, BaseFormView, SingleObjectMixin,
BaseTableView, PermissionMixin):
permissions = ['custom_user.view_user']
template_name = 'authentic2/manager/user_authorizations.html'
title = pgettext_lazy('manager', 'Consent Management')
model = get_user_model()
table_class = UserAuthorizationsTable
form_class = ChooseUserAuthorizationsForm
success_url = '.'
object_list = None
@property
def can_manage_authorizations(self):
return self.request.user.has_perm(
'custom_user.manage_authorizations_user', self.get_object())
def get_table_data(self):
qs = OIDCAuthorization.objects.filter(user=self.get_object())
return qs
def form_valid(self, form):
response = super(UserAuthorizationsView, self).form_valid(form)
auth_id = form.cleaned_data['authorization']
if self.can_manage_authorizations:
qs = OIDCAuthorization.objects.filter(user=self.get_object())
qs = qs.filter(id=auth_id.pk)
qs.delete()
return response
user_authorizations = UserAuthorizationsView.as_view()