authentic/src/authentic2/views.py

1689 lines
67 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import logging
import re
import time
from email.utils import parseaddr
from django import shortcuts
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.contrib.auth import logout as auth_logout
from django.contrib.auth.decorators import login_required
from django.contrib.auth.views import PasswordChangeView as DjPasswordChangeView
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.db.models import Count
from django.db.models.query import Q
from django.db.transaction import atomic
from django.forms import CharField
from django.http import (
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponseRedirect,
)
from django.shortcuts import get_object_or_404, render
from django.template import loader
from django.template.loader import render_to_string
from django.urls import reverse, reverse_lazy
from django.utils import timezone
from django.utils.translation import gettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, requires_csrf_token
from django.views.defaults import permission_denied as django_permission_denied
from django.views.generic import ListView, TemplateView
from django.views.generic.base import RedirectView, View
from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView
from ratelimit.utils import is_ratelimited
from authentic2.a2_rbac.models import Role
from authentic2.custom_user.models import iter_attributes
from authentic2.forms import authentication as authentication_forms
from authentic2_idp_oidc.models import OIDCAuthorization
from . import app_settings, attribute_kinds, cbv, constants, decorators, models, validators
from .a2_rbac.models import OrganizationalUnit as OU
from .a2_rbac.utils import get_default_ou
from .forms import passwords as passwords_forms
from .forms import profile as profile_forms
from .forms import registration as registration_forms
from .models import Lock
from .utils import crypto, hooks
from .utils import misc as utils_misc
from .utils import switch_user as utils_switch_user
from .utils.evaluate import make_condition_context
from .utils.service import get_service, set_home_url
from .utils.view_decorators import enable_view_restriction
from .utils.views import csrf_token_check
User = get_user_model()
logger = logging.getLogger(__name__)
class HomeURLMixin:
def dispatch(self, request, *args, **kwargs):
set_home_url(request)
return super().dispatch(request, *args, **kwargs)
class EditProfile(HomeURLMixin, cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
model = User
template_names = ['profiles/edit_profile.html', 'authentic2/accounts_edit.html']
title = _('Edit account data')
def get_template_names(self):
template_names = []
if 'scope' in self.kwargs:
template_names.append('authentic2/accounts_edit_%s.html' % self.kwargs['scope'])
template_names.extend(self.template_names)
return template_names
@classmethod
def can_edit_profile(cls):
fields, dummy_labels = cls.get_fields()
return bool(fields) and app_settings.A2_PROFILE_CAN_EDIT_PROFILE
@classmethod
def get_fields(cls, scopes=None):
editable_profile_fields = []
for field in app_settings.A2_PROFILE_FIELDS:
if isinstance(field, (list, tuple)):
field_name = field[0]
else:
field_name = field
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
editable_profile_fields.append(field)
else:
if attribute.user_editable:
editable_profile_fields.append(field)
attributes = models.Attribute.objects.filter(user_editable=True)
if scopes:
scopes = set(scopes)
default_fields = [
attribute.name for attribute in attributes if scopes & set(attribute.scopes.split())
]
else:
default_fields = list(attributes.values_list('name', flat=True))
fields, labels = utils_misc.get_fields_and_labels(editable_profile_fields, default_fields)
if scopes:
# restrict fields to those in the scopes
fields = [field for field in fields if field in default_fields]
return fields, labels
def get_form_class(self):
if 'scope' in self.kwargs:
scopes = [self.kwargs['scope']]
else:
scopes = self.request.GET.get('scope', '').split()
fields, labels = self.get_fields(scopes=scopes)
# Email must be edited through the change email view, as it needs validation
fields = [field for field in fields if field != 'email']
return profile_forms.modelform_factory(
User, fields=fields, labels=labels, form=profile_forms.EditProfileForm
)
def get_object(self):
return self.request.user
def get_form_kwargs(self, **kwargs):
kwargs = super().get_form_kwargs(**kwargs)
kwargs['next_url'] = utils_misc.select_next_url(self.request, reverse('account_management'))
return kwargs
def get_success_url(self):
return utils_misc.select_next_url(
self.request, default=reverse('account_management'), field_name='next_url', include_post=True
)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils_misc.redirect(request, self.get_success_url())
return super().post(request, *args, **kwargs)
def form_valid(self, form):
response = super().form_valid(form)
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
self.request.journal.record('user.profile.edit', form=form)
return response
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
login_required(EditProfile.as_view())
)
class EditRequired(EditProfile):
template_names = ['authentic2/accounts_edit_required.html']
def dispatch(self, request, *args, **kwargs):
self.missing_attributes = request.user.get_missing_required_on_login_attributes()
if not self.missing_attributes:
return utils_misc.redirect(request, self.get_success_url())
return super().dispatch(request, *args, **kwargs)
@classmethod
def get_fields(cls, scopes=None):
# only show the required fields
attribute_names = models.Attribute.objects.filter(required_on_login=True, disabled=False).values_list(
'name', flat=True
)
fields, labels = utils_misc.get_fields_and_labels(attribute_names)
return fields, labels
edit_required_profile = login_required(EditRequired.as_view())
class RecentAuthenticationMixin:
last_authentication_max_age = 600 # 10 minutes
def reauthenticate(self, action, message):
methods = [event['how'] for event in utils_misc.get_authentication_events(self.request)]
return utils_misc.login_require(
self.request,
token={
'action': action,
'message': message,
'methods': methods,
},
)
def has_recent_authentication(self):
age = time.time() - utils_misc.last_authentication_event(request=self.request)['when']
return age < self.last_authentication_max_age
class EmailChangeView(HomeURLMixin, RecentAuthenticationMixin, cbv.TemplateNamesMixin, FormView):
template_names = ['profiles/email_change.html', 'authentic2/change_email.html']
title = _('Email Change')
success_url = '..'
def can_validate_with_password(self):
last_event = utils_misc.last_authentication_event(self.request)
return last_event and last_event['how'] == 'password-on-https'
def get_form_class(self):
if self.can_validate_with_password():
return profile_forms.EmailChangeForm
return profile_forms.EmailChangeFormNoPassword
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def has_recent_authentication(self):
age = time.time() - utils_misc.last_authentication_event(request=self.request)['when']
return age < self.last_authentication_max_age
def dispatch(self, request, *args, **kwargs):
if not self.can_validate_with_password() and not self.has_recent_authentication():
return self.reauthenticate(
action='email-change',
message=_('You must re-authenticate to change your email address.'),
)
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils_misc.redirect(request, 'account_management')
return super().post(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
utils_misc.send_email_change_email(self.request.user, email, request=self.request)
hooks.call_hooks('event', name='change-email', user=self.request.user, email=email)
messages.info(
self.request,
_(
'Your request for changing your email is received. An email of validation was sent to you.'
' Please click on the link contained inside.'
),
)
logger.info('email change request')
self.request.journal.record(
'user.email.change.request', user=self.request.user, session=self.request.session, new_email=email
)
return super().form_valid(form)
email_change = decorators.setting_enabled('A2_PROFILE_CAN_CHANGE_EMAIL')(
login_required(EmailChangeView.as_view())
)
class EmailChangeVerifyView(TemplateView):
def get(self, request, *args, **kwargs):
if 'token' in request.GET:
try:
token = crypto.loads(
request.GET['token'], max_age=app_settings.A2_EMAIL_CHANGE_TOKEN_LIFETIME
)
user_pk = token['user_pk']
email = token['email']
user = User.objects.get(pk=user_pk)
non_unique = False
if app_settings.A2_EMAIL_IS_UNIQUE:
non_unique = User.objects.filter(email__iexact=email).exclude(pk=user_pk).exists()
elif user.ou and user.ou.email_is_unique:
non_unique = (
User.objects.filter(email__iexact=email, ou=user.ou).exclude(pk=user_pk).exists()
)
if non_unique:
raise ValidationError(_('This email is already used by another account.'))
old_email = user.email
user.email = email
user.set_email_verified(True)
user.save()
messages.info(
request, _('your request for changing your email for {0} is successful').format(email)
)
logger.info('user %s changed its email from %s to %s', user, old_email, email)
hooks.call_hooks('event', name='change-email-confirm', user=user, email=email)
request.journal.record(
'user.email.change',
user=user,
session=request.session,
old_email=old_email,
new_email=user.email,
)
except crypto.SignatureExpired:
messages.error(request, _('your request for changing your email is too old, try again'))
except crypto.BadSignature:
messages.error(request, _('your request for changing your email is invalid, try again'))
except ValueError:
messages.error(
request, _('your request for changing your email was not on this site, try again')
)
except User.DoesNotExist:
messages.error(
request, _('your request for changing your email is for an unknown user, try again')
)
except ValidationError as e:
messages.error(request, e.message)
else:
return shortcuts.redirect('account_management')
return shortcuts.redirect('email-change')
email_change_verify = EmailChangeVerifyView.as_view()
@csrf_exempt
@ensure_csrf_cookie
@never_cache
def login(request, template_name='authentic2/login.html', redirect_field_name=REDIRECT_FIELD_NAME):
"""Displays the login form and handles the login action."""
request.login_token = token = {}
if 'token' in request.GET:
try:
token.update(crypto.loads(request.GET['token']))
logger.debug('login: got token %s', token)
except (crypto.SignatureExpired, crypto.BadSignature, ValueError):
logger.warning('login: bad token')
methods = token.get('methods', [])
# redirect user to homepage if already connected, if setting
# A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE is True
if request.user.is_authenticated and app_settings.A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE:
return utils_misc.redirect(request, 'auth_homepage')
redirect_to = request.GET.get(redirect_field_name)
if not redirect_to or ' ' in redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL
# Heavier security check -- redirects to http://example.com should
# not be allowed, but things like /view/?param=http://example.com
# should be allowed. This regex checks if there is a '//' *before* a
# question mark.
elif '//' in redirect_to and re.match(r'[^\?]*//', redirect_to):
redirect_to = settings.LOGIN_REDIRECT_URL
nonce = request.GET.get(constants.NONCE_FIELD_NAME)
authenticators = utils_misc.get_authenticators()
blocks = []
registration_url = utils_misc.get_registration_url(request)
context = {
'cancel': app_settings.A2_LOGIN_DISPLAY_A_CANCEL_BUTTON and nonce is not None,
'can_reset_password': app_settings.A2_USER_CAN_RESET_PASSWORD is not False,
'registration_authorized': getattr(settings, 'REGISTRATION_OPEN', True),
'registration_url': registration_url,
}
# Cancel button
if request.method == "POST" and constants.CANCEL_FIELD_NAME in request.POST:
return utils_misc.continue_to_next_url(request, params={'cancel': 1})
# Create blocks
for authenticator in authenticators:
if methods and not set(authenticator.how) & set(methods):
continue
auth_blocks = []
parameters = {'request': request, 'context': context}
login_hint = set(request.session.get('login-hint', []))
show_ctx = make_condition_context(request=request, login_hint=login_hint)
service = get_service(request)
if service:
show_ctx['service_ou_slug'] = service.ou and service.ou.slug
show_ctx['service_slug'] = service.slug
show_ctx['service'] = service
else:
show_ctx['service_ou_slug'] = ''
show_ctx['service_slug'] = ''
show_ctx['service'] = None
if authenticator.shown(ctx=show_ctx):
context['block_index'] = len(blocks)
auth_blocks.append(utils_misc.get_authenticator_method(authenticator, 'login', parameters))
# If a login frontend method returns an HttpResponse with a status code != 200
# this response is returned.
for block in auth_blocks:
if block:
if block['status_code'] != 200:
return block['response']
blocks.append(block)
# run the only available authenticator if able to autorun
if len(blocks) == 1:
block = blocks[0]
authenticator = block['authenticator']
if hasattr(authenticator, 'autorun'):
if 'message' in token:
messages.info(request, token['message'])
return authenticator.autorun(request, block.get('id'))
context.update(
{
'blocks': collections.OrderedDict((block['id'], block) for block in blocks),
redirect_field_name: redirect_to,
}
)
if 'message' in token:
messages.info(request, token['message'])
return render(request, template_name, context)
def service_list(request):
'''Compute the service list to show on user homepage'''
return utils_misc.accumulate_from_backends(request, 'service_list')
class Homepage(cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/homepage.html', 'authentic2/homepage.html']
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_HOMEPAGE_URL:
home_url = app_settings.A2_HOMEPAGE_URL
if request.user.is_authenticated and request.user.ou and request.user.ou.home_url:
home_url = request.user.ou.home_url
return utils_misc.redirect(request, home_url, resolve=False)
return login_required(super().dispatch)(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['account_management'] = 'account_management'
ctx['authorized_services'] = service_list(self.request)
return ctx
homepage = enable_view_restriction(Homepage.as_view())
class ProfileView(HomeURLMixin, cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/account_management.html', 'authentic2/accounts.html']
title = _('Your account')
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_ACCOUNTS_URL:
return utils_misc.redirect(request, app_settings.A2_ACCOUNTS_URL)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
frontends = utils_misc.get_authenticators()
request = self.request
if request.method == "POST":
for frontend in frontends:
if 'submit-%s' % frontend.get_identifier() in request.POST:
form = frontend.form()(data=request.POST)
if form.is_valid():
return frontend.post(request, form, None, '/profile')
# User attributes management
profile = []
field_names = app_settings.A2_PROFILE_FIELDS
if not field_names:
field_names = list(app_settings.A2_REGISTRATION_FIELDS)
for field_name in getattr(request.user, 'USER_PROFILE', []):
if field_name not in field_names:
field_names.append(field_name)
qs = models.Attribute.objects.filter(Q(user_editable=True) | Q(user_visible=True))
qs = qs.values_list('name', flat=True)
for field_name in qs:
if field_name not in field_names:
field_names.append(field_name)
attributes = []
for field_name in field_names:
title = None
if isinstance(field_name, (list, tuple)):
if len(field_name) > 1:
title = field_name[1]
field_name = field_name[0]
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
attribute = None
if attribute:
if not attribute.user_visible:
continue
html_value = attribute.get_kind().get('html_value', lambda a, b: b)
qs = models.AttributeValue.objects.with_owner(request.user)
qs = qs.filter(attribute=attribute)
qs = qs.select_related()
value = [at_value.to_python() for at_value in qs]
value = filter(None, value)
value = [html_value(attribute, at_value) for at_value in value]
if not title:
title = str(attribute)
else:
# fallback to model attributes
try:
field = request.user._meta.get_field(field_name)
except FieldDoesNotExist:
continue
if not title:
title = field.verbose_name
value = getattr(self.request.user, field_name, None)
attribute = models.Attribute(name=field_name, label=title)
raw_value = None
if value:
if callable(value):
value = value()
if not isinstance(value, (list, tuple)):
value = (value,)
raw_value = value
value = [str(v) for v in value]
if value or app_settings.A2_PROFILE_DISPLAY_EMPTY_FIELDS:
profile.append((title, value))
attributes.append({'attribute': attribute, 'values': raw_value})
# Credentials management
parameters = {'request': request, 'context': context}
profiles = [
utils_misc.get_authenticator_method(frontend, 'profile', parameters) for frontend in frontends
]
# Old frontends data structure for templates
blocks = [block['content'] for block in profiles if block]
# New frontends data structure for templates
blocks_by_id = collections.OrderedDict((block['id'], block) for block in profiles if block)
idp_backends = utils_misc.get_backends()
# Get actions for federation management
federation_management = []
if app_settings.A2_PROFILE_CAN_MANAGE_FEDERATION:
for idp_backend in idp_backends:
if hasattr(idp_backend, 'federation_management'):
federation_management.extend(idp_backend.federation_management(request))
context.update(
{
'frontends_block': blocks,
'frontends_block_by_id': blocks_by_id,
'profile': profile,
'attributes': attributes,
'allow_account_deletion': app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT,
'allow_profile_edit': EditProfile.can_edit_profile(),
'allow_email_change': app_settings.A2_PROFILE_CAN_CHANGE_EMAIL,
'allow_authorization_management': False,
# TODO: deprecated should be removed when publik-base-theme is updated
'allow_password_change': utils_misc.user_can_change_password(request=request),
'federation_management': federation_management,
}
)
if (
'authentic2_idp_oidc' in settings.INSTALLED_APPS
and app_settings.A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS
):
from authentic2_idp_oidc.models import OIDCClient
context['allow_authorization_management'] = OIDCClient.objects.filter(
authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_SERVICE
).exists()
hooks.call_hooks('modify_context_data', self, context)
return context
profile = enable_view_restriction(login_required(ProfileView.as_view()))
def logout_list(request):
'''Return logout links from idp backends'''
return utils_misc.accumulate_from_backends(request, 'logout_list')
def redirect_logout_list(request):
'''Return redirect logout URLs from idp backends or authenticators'''
redirect_logout_list = []
for urls in hooks.call_hooks('redirect_logout_list', request=request):
if urls:
redirect_logout_list.extend(urls)
return redirect_logout_list
def logout(request, next_url=None, do_local=True, check_referer=True):
"""Logout first check if a logout request is authorized, i.e.
that logout was done using a POST with CSRF token or with a GET
from the same site.
Logout endpoints of IdP module must re-user the view by setting
check_referer and do_local to False.
"""
next_url = next_url or utils_misc.select_next_url(request, settings.LOGIN_REDIRECT_URL)
if request.user.is_authenticated:
if check_referer and not utils_misc.check_referer(request):
return render(request, 'authentic2/logout_confirm.html', {'next_url': next_url})
fragments = logout_list(request)
do_local = do_local and 'local' in request.GET
if not do_local and fragments:
# Full logout with iframes
local_logout_next_url = utils_misc.make_url(
'auth_logout', params={'local': 'ok'}, next_url=next_url, sign_next_url=True
)
ctx = {}
ctx['next_url'] = local_logout_next_url
ctx['redir_timeout'] = 60
ctx['logout_list'] = fragments
ctx['message'] = _('Logging out from all your services')
return render(request, 'authentic2/logout.html', ctx)
# Get redirection targets for full logout with redirections
# (needed before local logout)
targets = redirect_logout_list(request)
# Last redirection will be the current next_url
targets.append(next_url)
# Local logout
request.journal.record('user.logout')
auth_logout(request)
if targets:
# Full logout with redirections
next_url = targets.pop(0)
if targets:
# Put redirection targets in session
request.session['logout_redirections'] = targets
response = shortcuts.redirect(next_url)
response.set_cookie('a2_just_logged_out', 1, max_age=60, samesite='Lax')
return response
else:
# continue redirections after logout
targets = request.session.pop('logout_redirections', None)
if targets:
# Full logout with redirections
next_url = targets.pop(0)
request.session['logout_redirections'] = targets
return shortcuts.redirect(next_url)
def login_password_login(request, authenticator, *args, **kwargs):
def get_service_ous(service):
roles = Role.objects.filter(allowed_services=service).children()
if not roles:
return []
service_ou_ids = []
qs = (
User.objects.filter(roles__in=roles)
.values_list('ou')
.annotate(count=Count('ou'))
.order_by('-count')
)
for ou_id, dummy_count in qs:
if not ou_id:
continue
service_ou_ids.append(ou_id)
if not service_ou_ids:
return []
return OU.objects.filter(pk__in=service_ou_ids)
def get_preferred_ous(request):
service = get_service(request)
preferred_ous_cookie = utils_misc.get_remember_cookie(request, 'preferred-ous')
preferred_ous = []
if preferred_ous_cookie:
preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie))
# for the special case of services open to only one OU, pre-select it
if service:
for ou in get_service_ous(service):
if ou in preferred_ous:
continue
preferred_ous.append(ou)
return preferred_ous
context = kwargs.get('context', {})
is_post = request.method == 'POST' and 'login-password-submit' in request.POST
data = request.POST if is_post else None
initial = {}
preferred_ous = []
request.failed_logins = {}
# Special handling when the form contains an OU selector
if authenticator.include_ou_selector:
preferred_ous = get_preferred_ous(request)
if preferred_ous:
initial['ou'] = preferred_ous[0]
form = authentication_forms.AuthenticationForm(
request=request, data=data, initial=initial, preferred_ous=preferred_ous, authenticator=authenticator
)
if request.user.is_authenticated and request.login_token.get('action'):
form.initial['username'] = request.user.username or request.user.email
form.fields['username'].widget.attrs['readonly'] = True
form.fields['password'].widget.attrs['autofocus'] = True
else:
form.fields['username'].widget.attrs['autofocus'] = not (bool(context.get('block_index')))
if app_settings.A2_ACCEPT_EMAIL_AUTHENTICATION:
form.fields['username'].label = _('Username or email')
if app_settings.A2_USERNAME_LABEL:
form.fields['username'].label = app_settings.A2_USERNAME_LABEL
is_secure = request.is_secure
context['submit_name'] = 'login-password-submit'
context['authenticator'] = authenticator
if is_post:
csrf_token_check(request, form)
if form.is_valid():
if is_secure:
how = 'password-on-https'
else:
how = 'password'
if form.cleaned_data.get('remember_me'):
request.session['remember_me'] = True
request.session.set_expiry(authenticator.remember_me)
response = utils_misc.login(request, form.get_user(), how)
if 'ou' in form.fields:
utils_misc.prepend_remember_cookie(
request, response, 'preferred-ous', form.cleaned_data['ou'].pk
)
if hasattr(request, 'needs_password_change'):
del request.needs_password_change
return utils_misc.redirect(
request, 'password_change', params={'next': response.url}, resolve=True
)
return response
else:
username = form.cleaned_data.get('username', '').strip()
if request.failed_logins:
for user, failure_data in request.failed_logins.items():
request.journal.record(
'user.login.failure',
user=user,
reason=failure_data.get('reason', None),
username=username,
)
elif username:
request.journal.record('user.login.failure', username=username)
context['form'] = form
return render(request, 'authentic2/login_password_form.html', context)
def login_password_profile(request, *args, **kwargs):
context = kwargs.pop('context', {})
can_change_password = utils_misc.user_can_change_password(request=request)
has_usable_password = request.user.has_usable_password()
context.update(
{
'can_change_password': can_change_password,
'has_usable_password': has_usable_password,
}
)
return render_to_string(
['auth/login_password_profile.html', 'authentic2/login_password_profile.html'],
context,
request=request,
)
class LoggedInView(View):
'''JSONP web service to detect if an user is logged'''
http_method_names = ['get']
def check_referrer(self):
'''Check if the given referer is authorized'''
referer = self.request.headers.get('Referer', '')
for valid_referer in app_settings.VALID_REFERERS:
if referer.startswith(valid_referer):
return True
return False
def get(self, request, *args, **kwargs):
if not self.check_referrer():
return HttpResponseForbidden()
callback = request.GET.get('callback')
content = f'{callback}({int(request.user.is_authenticated)})'
return HttpResponse(content, content_type='application/json')
logged_in = never_cache(LoggedInView.as_view())
def csrf_failure_view(request, reason=""):
messages.warning(request, _('The page is out of date, it was reloaded for you'))
return HttpResponseRedirect(request.get_full_path())
class PasswordResetView(FormView):
'''Ask for an email and send a password reset link by mail'''
form_class = passwords_forms.PasswordResetForm
title = _('Password Reset')
def get_success_url(self):
return reverse('password_reset_instructions')
def get_template_names(self):
return [
'authentic2/password_reset_form.html',
'registration/password_reset_form.html',
]
def get_form_kwargs(self, **kwargs):
kwargs = super().get_form_kwargs(**kwargs)
initial = kwargs.setdefault('initial', {})
initial['next_url'] = utils_misc.select_next_url(self.request, '')
return kwargs
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
if app_settings.A2_USER_CAN_RESET_PASSWORD is False:
raise Http404('Password reset is not allowed.')
ctx['title'] = _('Password reset')
return ctx
def form_valid(self, form):
if form.is_robot():
return utils_misc.redirect(
self.request,
self.get_success_url(),
params={
'robot': 'on',
},
)
email_field = 'email_or_username' if app_settings.A2_USER_CAN_RESET_PASSWORD_BY_USERNAME else 'email'
email = form.cleaned_data.get(email_field)
# if an email has already been sent, warn once before allowing resend
token = models.Token.objects.filter(
kind='pw-reset', content__email__iexact=email, expires__gt=timezone.now()
).exists()
resend_key = 'pw-reset-allow-resend'
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
self.request.session[resend_key] = True
form.add_error(
email_field,
_(
'An email has already been sent to %s. Click "Validate" again if you really want it to be'
' sent again.'
)
% email,
)
return self.form_invalid(form)
self.request.session[resend_key] = False
if is_ratelimited(
self.request,
key='post:email',
group='pw-reset-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
increment=True,
):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
email_field,
_(
'Multiple emails have already been sent to this address. Further attempts are blocked,'
' please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
if is_ratelimited(
self.request,
key='ip',
group='pw-reset-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
increment=True,
):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
email_field,
_(
'Multiple password reset attempts have already been made from this IP address. No further'
' email will be sent, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
form.save()
self.request.session['reset_email'] = email
return super().form_valid(form)
password_reset = PasswordResetView.as_view()
class PasswordResetInstructionsView(TemplateView):
template_name = 'registration/password_reset_instructions.html'
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
return ctx
password_reset_instructions = PasswordResetInstructionsView.as_view()
class TokenLoginView(RedirectView):
def get_redirect_url(self, *args, **kwargs):
token = kwargs['token'].replace(' ', '')
try:
token = models.Token.use('login', token, delete=False)
except models.Token.DoesNotExist:
messages.warning(self.request, _('Login token is unknown or expired'))
return reverse('auth_homepage')
except (TypeError, ValueError):
messages.warning(self.request, _('Login token is invalid'))
return reverse('auth_homepage')
uid = token.content['user']
user = User.objects.get(pk=uid)
utils_misc.simulate_authentication(self.request, user, 'token', record=True)
return reverse('auth_homepage')
token_login = TokenLoginView.as_view()
class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
"""Validate password reset link, show a set password form and login
the user.
"""
form_class = passwords_forms.SetPasswordForm
title = _('Password Reset')
def get_template_names(self):
return [
'registration/password_reset_confirm.html',
'authentic2/password_reset_confirm.html',
]
def dispatch(self, request, *args, **kwargs):
token = kwargs['token'].replace(' ', '')
try:
self.token = models.Token.use('pw-reset', token, delete=False)
except models.Token.DoesNotExist:
messages.warning(request, _('Password reset token is unknown or expired'))
return utils_misc.redirect(request, self.get_success_url())
except (TypeError, ValueError):
messages.warning(request, _('Password reset token is invalid'))
return utils_misc.redirect(request, self.get_success_url())
uid = self.token.content['user']
try:
# use authenticate to eventually get an LDAPUser
self.user = utils_misc.authenticate(request, user=User._default_manager.get(pk=uid))
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
messages.warning(request, _('User not found'))
return utils_misc.redirect(request, self.get_success_url())
can_reset_password = utils_misc.get_user_flag(
user=self.user, name='can_reset_password', default=self.user.has_usable_password()
)
if (
can_reset_password is False
or can_reset_password is None
and not app_settings.A2_USER_CAN_RESET_PASSWORD
):
messages.warning(
request, _('It\'s not possible to reset your password. Please contact an administrator.')
)
return utils_misc.redirect(request, self.get_success_url())
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
# compatibility with existing templates !
ctx['title'] = _('Enter new password')
ctx['validlink'] = True
return ctx
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.user
return kwargs
def form_valid(self, form):
# Changing password by mail validate the email
form.user.set_email_verified(True)
form.save()
hooks.call_hooks('event', name='password-reset-confirm', user=form.user, token=self.token, form=form)
logger.info('password reset for user %s with token %r', self.user, self.token.uuid)
self.token.delete()
return self.finish()
def finish(self):
response = utils_misc.simulate_authentication(self.request, self.user, 'email')
self.request.journal.record('user.password.reset')
return response
password_reset_confirm = PasswordResetConfirmView.as_view()
class BaseRegistrationView(HomeURLMixin, FormView):
form_class = registration_forms.RegistrationForm
template_name = 'registration/registration_form.html'
title = _('Registration')
def dispatch(self, request, *args, **kwargs):
if not getattr(settings, 'REGISTRATION_OPEN', True):
raise Http404('Registration is not open.')
self.token = {}
self.ou = get_default_ou()
# load pre-filled values
if request.GET.get('token'):
try:
self.token = crypto.loads(
request.GET.get('token'), max_age=settings.ACCOUNT_ACTIVATION_DAYS * 3600 * 24
)
except (TypeError, ValueError, crypto.BadSignature) as e:
logger.warning('registration_view: invalid token: %s', e)
return HttpResponseBadRequest('invalid token', content_type='text/plain')
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
self.next_url = self.token.pop(REDIRECT_FIELD_NAME, utils_misc.select_next_url(request, None))
set_home_url(request, self.next_url)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
if form.is_robot():
return utils_misc.redirect(
self.request,
'registration_complete',
params={
REDIRECT_FIELD_NAME: self.next_url,
'robot': 'on',
},
)
email = form.cleaned_data.pop('email')
# if an email has already been sent, warn once before allowing resend
token = models.Token.objects.filter(
kind='registration', content__email__iexact=email, expires__gt=timezone.now()
).exists()
resend_key = 'registration-allow-resend'
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
self.request.session[resend_key] = True
form.add_error(
'email',
_(
'An email has already been sent to %s. Click "Validate" again if you really want it to be'
' sent again.'
)
% email,
)
return self.form_invalid(form)
self.request.session[resend_key] = False
if is_ratelimited(
self.request,
key='post:email',
group='registration-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
increment=True,
):
form.add_error(
'email',
_(
'Multiple emails have already been sent to this address. Further attempts are blocked,'
' please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
if is_ratelimited(
self.request,
key='ip',
group='registration-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
increment=True,
):
form.add_error(
'email',
_(
'Multiple registration attempts have already been made from this IP address. No further'
' email will be sent, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
for field in form.cleaned_data:
self.token[field] = form.cleaned_data[field]
self.token.pop(REDIRECT_FIELD_NAME, None)
self.token.pop('email', None)
utils_misc.send_registration_mail(
self.request, email, next_url=self.next_url, ou=self.ou, **self.token
)
self.request.session['registered_email'] = email
return utils_misc.redirect(
self.request, 'registration_complete', params={REDIRECT_FIELD_NAME: self.next_url}
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
parameters = {'request': self.request, 'context': context}
blocks = [
utils_misc.get_authenticator_method(authenticator, 'registration', parameters)
for authenticator in utils_misc.get_authenticators()
]
context['frontends'] = collections.OrderedDict((block['id'], block) for block in blocks if block)
return context
class RegistrationView(cbv.ValidateCSRFMixin, BaseRegistrationView):
pass
class RegistrationCompletionView(CreateView):
model = get_user_model()
success_url = 'auth_homepage'
def get_template_names(self):
if self.users and 'create' not in self.request.GET:
return ['registration/registration_completion_choose.html']
else:
return ['registration/registration_completion_form.html']
def get_success_url(self):
try:
redirect_url, next_field = app_settings.A2_REGISTRATION_REDIRECT
except Exception:
redirect_url = app_settings.A2_REGISTRATION_REDIRECT
next_field = REDIRECT_FIELD_NAME
if self.token and self.token.get(REDIRECT_FIELD_NAME):
url = self.token[REDIRECT_FIELD_NAME]
if redirect_url:
url = utils_misc.make_url(redirect_url, params={next_field: url})
else:
if redirect_url:
url = redirect_url
else:
url = utils_misc.make_url(self.success_url)
return url
@atomic(savepoint=False)
def dispatch(self, request, *args, **kwargs):
registration_token = kwargs['registration_token'].replace(' ', '')
try:
token = models.Token.use('registration', registration_token, delete=False)
except models.Token.DoesNotExist:
messages.warning(request, _('Your activation key is unknown or expired'))
return utils_misc.redirect(request, 'registration_register')
except (TypeError, ValueError):
messages.warning(request, _('Activation failed'))
return utils_misc.redirect(request, 'registration_register')
self.token_obj = token
self.token = token.content
# allow access to token content from external authentication backends
request.token = self.token
self.authentication_method = self.token.get('authentication_method', 'email')
self.email = self.token['email']
Lock.lock_email(self.email)
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
else:
self.ou = get_default_ou()
self.users = User.objects.filter(email__iexact=self.email).order_by('date_joined')
if self.ou:
self.users = self.users.filter(ou=self.ou)
self.email_is_unique = app_settings.A2_EMAIL_IS_UNIQUE or app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE
if self.ou:
self.email_is_unique |= self.ou.email_is_unique
self.init_fields_labels_and_help_texts()
set_home_url(request, self.get_success_url())
return super().dispatch(request, *args, **kwargs)
def init_fields_labels_and_help_texts(self):
attributes = models.Attribute.objects.filter(asked_on_registration=True)
default_fields = attributes.values_list('name', flat=True)
required_fields = models.Attribute.objects.filter(required=True).values_list('name', flat=True)
fields, labels = utils_misc.get_fields_and_labels(
app_settings.A2_REGISTRATION_FIELDS,
default_fields,
app_settings.A2_REGISTRATION_REQUIRED_FIELDS,
app_settings.A2_REQUIRED_FIELDS,
models.Attribute.objects.filter(required=True).values_list('name', flat=True),
)
help_texts = {}
if app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL:
labels['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL
if app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT:
help_texts['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT
required = list(app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_fields)
if 'email' in fields:
fields.remove('email')
for field in self.token.get('skip_fields') or []:
if field in fields:
fields.remove(field)
self.fields = fields
self.labels = labels
self.required = required
self.help_texts = help_texts
def get_form_class(self):
if not self.token.get('valid_email', True):
self.fields.append('email')
self.required.append('email')
form_class = registration_forms.RegistrationCompletionForm
if self.token.get('no_password', False):
form_class = registration_forms.RegistrationCompletionFormNoPassword
form_class = profile_forms.modelform_factory(
self.model,
form=form_class,
fields=self.fields,
labels=self.labels,
required=self.required,
help_texts=self.help_texts,
)
if 'username' in self.fields and app_settings.A2_REGISTRATION_FORM_USERNAME_REGEX:
# Keep existing field label and help_text
old_field = form_class.base_fields['username']
field = CharField(
max_length=256,
label=old_field.label,
help_text=old_field.help_text,
validators=[validators.UsernameValidator()],
)
form_class = type('RegistrationForm', (form_class,), {'username': field})
return form_class
def get_form_kwargs(self, **kwargs):
'''Initialize mail from token'''
kwargs = super().get_form_kwargs(**kwargs)
if 'ou' in self.token:
ou = get_object_or_404(OU, id=self.token['ou'])
else:
ou = get_default_ou()
attributes = {'email': self.email, 'ou': ou}
for key in self.token:
if key in app_settings.A2_PRE_REGISTRATION_FIELDS:
attributes[key] = self.token[key]
logger.debug('attributes %s', attributes)
prefilling_list = utils_misc.accumulate_from_backends(self.request, 'registration_form_prefill')
logger.debug('prefilling_list %s', prefilling_list)
# Build a single meaningful prefilling with sets of values
prefilling = {}
for p in prefilling_list:
for name, values in p.items():
if name in self.fields:
prefilling.setdefault(name, set()).update(values)
logger.debug('prefilling %s', prefilling)
for name, values in prefilling.items():
attributes[name] = ' '.join(values)
logger.debug('attributes with prefilling %s', attributes)
if self.token.get('user_id'):
kwargs['instance'] = User.objects.get(id=self.token.get('user_id'))
else:
init_kwargs = {}
for key in ('email', 'first_name', 'last_name', 'ou'):
if key in attributes:
init_kwargs[key] = attributes[key]
kwargs['instance'] = get_user_model()(**init_kwargs)
return kwargs
def get_form(self, form_class=None):
form = super().get_form(form_class=form_class)
hooks.call_hooks('front_modify_form', self, form)
return form
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['token'] = self.token
ctx['users'] = self.users
ctx['email'] = self.email
ctx['email_is_unique'] = self.email_is_unique
ctx['create'] = 'create' in self.request.GET
return ctx
def get(self, request, *args, **kwargs):
if len(self.users) == 1 and self.email_is_unique:
# Found one user, EMAIL is unique, log her in
utils_misc.simulate_authentication(request, self.users[0], method=self.authentication_method)
return utils_misc.redirect(request, self.get_success_url())
confirm_data = self.token.get('confirm_data', False)
if confirm_data == 'required':
fields_to_confirm = self.required
else:
fields_to_confirm = self.fields
if all(field in self.token for field in fields_to_confirm) and (
not confirm_data or confirm_data == 'required'
):
# We already have every fields
form_kwargs = self.get_form_kwargs()
form_class = self.get_form_class()
data = self.token
if 'password' in data:
data['password1'] = data['password']
data['password2'] = data['password']
del data['password']
form_kwargs['data'] = data
form = form_class(**form_kwargs)
if form.is_valid():
user = form.save()
self.process_registration(request, user, form)
return self.registration_success(request, user)
self.get_form = lambda *args, **kwargs: form
return super().get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if self.users and self.email_is_unique:
# email is unique, users already exist, creating a new one is forbidden !
return utils_misc.redirect(
request, request.resolver_match.view_name, args=self.args, kwargs=self.kwargs
)
if 'uid' in request.POST:
uid = request.POST['uid']
for user in self.users:
if str(user.id) == uid:
utils_misc.simulate_authentication(request, user, method=self.authentication_method)
return utils_misc.redirect(request, self.get_success_url())
return super().post(request, *args, **kwargs)
def form_valid(self, form):
# remove verified fields from form, this allows an authentication
# method to provide verified data fields and to present it to the user,
# while preventing the user to modify them.
for av in models.AttributeValue.objects.with_owner(form.instance):
if av.verified and av.attribute.name in form.fields:
del form.fields[av.attribute.name]
if (
'email' in self.request.POST
and ('email' not in self.token or self.request.POST['email'] != self.token['email'])
and not self.token.get('skip_email_check')
):
# If an email is submitted it must be validated or be the same as in the token
data = form.cleaned_data.copy()
# handle complex attributes
for attribute in iter_attributes():
if attribute.name not in data:
continue
kind = attribute.get_kind()
if kind['serialize'] is attribute_kinds.identity:
continue
data[attribute.name] = kind['serialize'](data[attribute.name])
data['no_password'] = self.token.get('no_password', False)
utils_misc.send_registration_mail(
self.request, ou=self.ou, next_url=self.get_success_url(), **data
)
self.token_obj.delete()
self.request.session['registered_email'] = form.cleaned_data['email']
return utils_misc.redirect(self.request, 'registration_complete')
count, dummy = self.token_obj.delete()
# prevent duplicate user creations in case several requests are processed in parallel
if count:
super().form_valid(form) # user creation happens here
user = form.instance
self.process_registration(self.request, user, form)
else:
try:
user = User.objects.get(email=self.email)
except User.DoesNotExist:
messages.warning(self.request, _('An error occured during account creation.'))
return utils_misc.redirect(self.request, 'registration_register')
return self.registration_success(self.request, user)
def process_registration(self, request, user, form):
request.journal.record('user.registration', user=user, session=None, how=self.authentication_method)
hooks.call_hooks(
'event',
name='registration',
user=user,
form=form,
view=self,
authentication_method=self.authentication_method,
token=self.token,
service=get_service(request),
)
self.send_registration_success_email(user)
def registration_success(self, request, user):
utils_misc.simulate_authentication(request, user, method=self.authentication_method)
message_template = loader.get_template('authentic2/registration_success_message.html')
messages.info(self.request, message_template.render(request=request))
return utils_misc.redirect(request, self.get_success_url())
def send_registration_success_email(self, user):
if not user.email:
return
template_names = ['authentic2/registration_success']
login_url = self.request.build_absolute_uri(settings.LOGIN_URL)
utils_misc.send_templated_mail(
user,
template_names=template_names,
context={
'user': user,
'email': user.email,
'site': self.request.get_host(),
'login_url': login_url,
'method': self.authentication_method,
},
request=self.request,
)
registration_completion = RegistrationCompletionView.as_view()
class AccountDeleteView(HomeURLMixin, RecentAuthenticationMixin, TemplateView):
template_name = 'authentic2/accounts_delete_request.html'
title = _('Request account deletion')
def dispatch(self, request, *args, **kwargs):
if not app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT:
return utils_misc.redirect(request, '..')
if not self.request.user.email_verified and not self.has_recent_authentication():
return self.reauthenticate(
action='account-delete', message=_('You must re-authenticate to delete your account.')
)
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils_misc.redirect(request, 'account_management')
if self.request.user.email_verified:
utils_misc.send_account_deletion_code(self.request, self.request.user)
messages.info(
request, _("An account deletion validation email has been sent to your email address.")
)
else:
deletion_url = utils_misc.build_deletion_url(request, prompt=False)
return logout(
request,
next_url=deletion_url,
check_referer=False,
)
return utils_misc.redirect(request, 'account_management')
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['email'] = self.request.user.email
return ctx
class ValidateDeletionView(TemplateView):
template_name = 'authentic2/accounts_delete_validation.html'
title = _('Confirm account deletion')
user = None
prompt = True
def dispatch(self, request, *args, **kwargs):
error = None
try:
deletion_token = crypto.loads(
kwargs['deletion_token'], max_age=app_settings.A2_DELETION_REQUEST_LIFETIME
)
self.prompt = deletion_token.get('prompt', self.prompt)
user_pk = deletion_token['user_pk']
self.user = get_user_model().objects.get(pk=user_pk)
# A user account wont be deactived twice
if not self.user.is_active:
raise ValidationError(_('This account is inactive, it cannot be deleted.'))
logger.info('user %s confirmed the deletion of their own account', self.user)
except crypto.SignatureExpired:
error = _('The account deletion request is too old, try again')
except crypto.BadSignature:
error = _('The account deletion request is invalid, try again')
except ValueError:
error = _('The account deletion request was not on this site, try again')
except ValidationError as e:
error = e.message
except get_user_model().DoesNotExist:
error = _('This account has previously been deleted.')
if error:
messages.error(request, error)
return utils_misc.redirect(request, 'auth_homepage')
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
if not self.prompt:
return self.delete_account(request)
return super().get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' not in request.POST:
return self.delete_account(request)
return utils_misc.redirect(request, 'auth_homepage')
def delete_account(self, request):
utils_misc.send_account_deletion_mail(self.request, self.user)
logger.info('deletion of account %s performed', self.user)
hooks.call_hooks('event', name='delete-account', user=self.user)
request.journal.record('user.deletion', user=self.user)
is_deleted_user_logged = self.user == request.user
self.user.delete()
messages.info(request, _('Deletion performed.'))
# No real use for cancel_url or next_url here, assuming the link
# has been received by email. We instead redirect the user to the
# homepage.
if is_deleted_user_logged:
return logout(request, check_referer=False)
return utils_misc.redirect(request, 'auth_homepage')
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['user'] = self.user # Not necessarily the user in request
return ctx
class RegistrationCompleteView(TemplateView):
template_name = 'registration/registration_complete.html'
def get_context_data(self, **kwargs):
kwargs['next_url'] = utils_misc.select_next_url(self.request, settings.LOGIN_REDIRECT_URL)
kwargs['from_email'] = settings.DEFAULT_FROM_EMAIL
kwargs['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
return super().get_context_data(account_activation_days=settings.ACCOUNT_ACTIVATION_DAYS, **kwargs)
registration_complete = RegistrationCompleteView.as_view()
class PasswordChangeView(HomeURLMixin, DjPasswordChangeView):
title = _('Password Change')
do_not_call_in_templates = True
def dispatch(self, request, *args, **kwargs):
if 'next_url' in request.POST and request.POST['next_url']:
self.post_change_redirect = request.POST['next_url']
elif REDIRECT_FIELD_NAME in request.GET:
self.post_change_redirect = request.GET[REDIRECT_FIELD_NAME]
else:
self.post_change_redirect = reverse('account_management')
if not utils_misc.user_can_change_password(request=request):
messages.warning(request, _('Password change is forbidden'))
return utils_misc.redirect(request, self.post_change_redirect)
hooks.call_hooks('password_change_view', request=self.request)
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils_misc.redirect(request, self.post_change_redirect)
return super().post(request, *args, **kwargs)
def form_valid(self, form):
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
models.PasswordReset.objects.filter(user=self.request.user).delete()
try:
response = super().form_valid(form)
except utils_misc.PasswordChangeError as e:
messages.error(self.request, e.message)
return utils_misc.redirect(self.request, self.post_change_redirect)
messages.info(self.request, _('Password changed'))
self.request.journal.record('user.password.change', session=self.request.session)
return response
def get_form_class(self):
if self.request.user.has_usable_password():
return passwords_forms.PasswordChangeForm
else:
return passwords_forms.SetPasswordForm
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx[REDIRECT_FIELD_NAME] = self.post_change_redirect
return ctx
password_change = decorators.setting_enabled('A2_REGISTRATION_CAN_CHANGE_PASSWORD')(
PasswordChangeView.as_view()
)
class SuView(View):
def get(self, request, uuid):
user = utils_switch_user.resolve_token(uuid)
if not user:
raise Http404
# LDAP ad-hoc behaviour
if user.userexternalid_set.exists():
user = utils_misc.authenticate(request, user=user)
return utils_misc.simulate_authentication(request, user, 'su')
su = SuView.as_view()
class Consents(HomeURLMixin, ListView):
template_name = 'authentic2/consents.html'
title = _('Consent Management')
model = OIDCAuthorization
context_object_name = 'consents'
def get_queryset(self):
return super().get_queryset().filter(user=self.request.user)
consents = decorators.setting_enabled('A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS')(Consents.as_view())
class ConsentDelete(DeleteView):
title = _('Consent Delete')
model = OIDCAuthorization
success_url = reverse_lazy('consents')
def get(self, request, *args, **kwargs):
return HttpResponseRedirect(self.get_success_url())
def delete(self, request, *args, **kwargs):
response = super().delete(request, *args, **kwargs)
self.request.journal.record('user.service.sso.unauthorization', service=self.object.client)
return response
consent_delete = decorators.setting_enabled('A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS')(
ConsentDelete.as_view()
)
def old_view_redirect(request, to, message=None):
'''Redirect old URL to new URL, eventually showing a message.'''
if message:
messages.info(request, message)
return utils_misc.redirect(request, to=to)
class DisplayMessageAndContinueView(TemplateView):
template_name = 'authentic2/display_message_and_continue.html'
def get(self, request, *args, **kwargs):
self.url = utils_misc.select_next_url(self.request, reverse('account_management'))
self.only_info = True
storage = messages.get_messages(request)
if not storage:
return utils_misc.redirect(request, self.url, resolve=False)
for message in storage:
if message.level not in (messages.INFO, messages.SUCCESS):
# If there are warning or error messages, the intermediate page must not redirect
# automatically but should ask for an user confirmation
self.only_info = False
storage.used = False
return super().get(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['url'] = self.url
ctx['only_info'] = self.only_info
return ctx
display_message_and_continue = DisplayMessageAndContinueView.as_view()
@requires_csrf_token
def permission_denied(request, exception):
if request.path.startswith('/manage/'):
from authentic2.manager.views import permission_denied
return permission_denied(request, exception=exception)
return django_permission_denied(request, exception)