authentic/src/authentic2/views.py

1444 lines
59 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import logging
import re
from email.utils import parseaddr
from django import shortcuts
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.contrib.auth import logout as auth_logout
from django.contrib.auth.decorators import login_required
from django.contrib.auth.views import PasswordChangeView as DjPasswordChangeView
from django.core import signing
from django.core.exceptions import ValidationError
from django.db.models.fields import FieldDoesNotExist
from django.db.models.query import Q
from django.forms import CharField
from django.http import (
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
HttpResponseRedirect,
)
from django.shortcuts import get_object_or_404, render
from django.template import loader
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import ugettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
from django.views.generic import TemplateView
from django.views.generic.base import View
from django.views.generic.edit import CreateView, FormView, UpdateView
from ratelimit.utils import is_ratelimited
from authentic2.custom_user.models import iter_attributes
from . import app_settings, attribute_kinds, cbv, constants, decorators, hooks, models, utils, validators
from .a2_rbac.models import OrganizationalUnit as OU
from .a2_rbac.utils import get_default_ou
from .forms import passwords as passwords_forms
from .forms import profile as profile_forms
from .forms import registration as registration_forms
from .utils import switch_user
from .utils.evaluate import HTTPHeaders
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref
User = get_user_model()
logger = logging.getLogger(__name__)
class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
model = User
template_names = ['profiles/edit_profile.html', 'authentic2/accounts_edit.html']
title = _('Edit account data')
def get_template_names(self):
template_names = []
if 'scope' in self.kwargs:
template_names.append('authentic2/accounts_edit_%s.html' % self.kwargs['scope'])
template_names.extend(self.template_names)
return template_names
@classmethod
def can_edit_profile(cls):
fields, labels = cls.get_fields()
return bool(fields) and app_settings.A2_PROFILE_CAN_EDIT_PROFILE
@classmethod
def get_fields(cls, scopes=None):
editable_profile_fields = []
for field in app_settings.A2_PROFILE_FIELDS:
if isinstance(field, (list, tuple)):
field_name = field[0]
else:
field_name = field
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
editable_profile_fields.append(field)
else:
if attribute.user_editable:
editable_profile_fields.append(field)
attributes = models.Attribute.objects.filter(user_editable=True)
if scopes:
scopes = set(scopes)
default_fields = [
attribute.name for attribute in attributes if scopes & set(attribute.scopes.split())
]
else:
default_fields = list(attributes.values_list('name', flat=True))
fields, labels = utils.get_fields_and_labels(editable_profile_fields, default_fields)
if scopes:
# restrict fields to those in the scopes
fields = [field for field in fields if field in default_fields]
return fields, labels
def get_form_class(self):
if 'scope' in self.kwargs:
scopes = [self.kwargs['scope']]
else:
scopes = self.request.GET.get('scope', '').split()
fields, labels = self.get_fields(scopes=scopes)
# Email must be edited through the change email view, as it needs validation
fields = [field for field in fields if field != 'email']
return profile_forms.modelform_factory(
User, fields=fields, labels=labels, form=profile_forms.EditProfileForm
)
def get_object(self):
return self.request.user
def get_form_kwargs(self, **kwargs):
kwargs = super(EditProfile, self).get_form_kwargs(**kwargs)
kwargs['next_url'] = utils.select_next_url(self.request, reverse('account_management'))
return kwargs
def get_success_url(self):
return utils.select_next_url(
self.request, default=reverse('account_management'), field_name='next_url', include_post=True
)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, self.get_success_url())
return super(EditProfile, self).post(request, *args, **kwargs)
def form_valid(self, form):
response = super(EditProfile, self).form_valid(form)
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
self.request.journal.record('user.profile.edit', form=form)
return response
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
login_required(EditProfile.as_view())
)
class EmailChangeView(cbv.TemplateNamesMixin, FormView):
template_names = ['profiles/email_change.html', 'authentic2/change_email.html']
title = _('Email Change')
success_url = '..'
def get_form_class(self):
if self.request.user.has_usable_password():
return profile_forms.EmailChangeForm
return profile_forms.EmailChangeFormNoPassword
def get_form_kwargs(self):
kwargs = super(EmailChangeView, self).get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, 'account_management')
return super(EmailChangeView, self).post(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
utils.send_email_change_email(self.request.user, email, request=self.request)
hooks.call_hooks('event', name='change-email', user=self.request.user, email=email)
messages.info(
self.request,
_(
'Your request for changing your email '
'is received. An email of validation '
'was sent to you. Please click on the '
'link contained inside.'
),
)
logger.info('email change request')
return super(EmailChangeView, self).form_valid(form)
email_change = decorators.setting_enabled('A2_PROFILE_CAN_CHANGE_EMAIL')(
login_required(EmailChangeView.as_view())
)
class EmailChangeVerifyView(TemplateView):
def get(self, request, *args, **kwargs):
if 'token' in request.GET:
try:
token = signing.loads(
request.GET['token'], max_age=app_settings.A2_EMAIL_CHANGE_TOKEN_LIFETIME
)
user_pk = token['user_pk']
email = token['email']
user = User.objects.get(pk=user_pk)
non_unique = False
if app_settings.A2_EMAIL_IS_UNIQUE:
non_unique = User.objects.filter(email=email).exclude(pk=user_pk).exists()
elif user.ou and user.ou.email_is_unique:
non_unique = User.objects.filter(email=email, ou=user.ou).exclude(pk=user_pk).exists()
if non_unique:
raise ValidationError(_('This email is already used by another account.'))
old_email = user.email
user.email = email
user.email_verified = True
user.save()
messages.info(
request, _('your request for changing your email for {0} is successful').format(email)
)
logger.info('user %s changed its email from %s to %s', user, old_email, email)
hooks.call_hooks('event', name='change-email-confirm', user=user, email=email)
except signing.SignatureExpired:
messages.error(request, _('your request for changing your email is too old, try again'))
except signing.BadSignature:
messages.error(request, _('your request for changing your email is invalid, try again'))
except ValueError:
messages.error(
request, _('your request for changing your email was not on this site, try again')
)
except User.DoesNotExist:
messages.error(
request, _('your request for changing your email is for an unknown user, try again')
)
except ValidationError as e:
messages.error(request, e.message)
else:
return shortcuts.redirect('account_management')
return shortcuts.redirect('email-change')
email_change_verify = EmailChangeVerifyView.as_view()
@csrf_exempt
@ensure_csrf_cookie
@never_cache
def login(request, template_name='authentic2/login.html', redirect_field_name=REDIRECT_FIELD_NAME):
"""Displays the login form and handles the login action."""
# redirect user to homepage if already connected, if setting
# A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE is True
if request.user.is_authenticated and app_settings.A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE:
return utils.redirect(request, 'auth_homepage')
redirect_to = request.GET.get(redirect_field_name)
service = get_service_from_request(request)
if not redirect_to or ' ' in redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL
# Heavier security check -- redirects to http://example.com should
# not be allowed, but things like /view/?param=http://example.com
# should be allowed. This regex checks if there is a '//' *before* a
# question mark.
elif '//' in redirect_to and re.match(r'[^\?]*//', redirect_to):
redirect_to = settings.LOGIN_REDIRECT_URL
nonce = request.GET.get(constants.NONCE_FIELD_NAME)
authenticators = utils.get_backends('AUTH_FRONTENDS')
blocks = []
registration_url = utils.get_registration_url(request, service=service)
context = {
'cancel': app_settings.A2_LOGIN_DISPLAY_A_CANCEL_BUTTON and nonce is not None,
'can_reset_password': app_settings.A2_USER_CAN_RESET_PASSWORD is not False,
'registration_authorized': getattr(settings, 'REGISTRATION_OPEN', True),
'registration_url': registration_url,
}
# Cancel button
if request.method == "POST" and constants.CANCEL_FIELD_NAME in request.POST:
return utils.continue_to_next_url(request, params={'cancel': 1})
# Create blocks
for authenticator in authenticators:
# Legacy API
if not hasattr(authenticator, 'login'):
fid = authenticator.id
name = authenticator.name
form_class = authenticator.form()
submit_name = 'submit-%s' % fid
block = {'id': fid, 'name': name, 'authenticator': authenticator}
if request.method == 'POST' and submit_name in request.POST:
form = form_class(data=request.POST)
if form.is_valid():
return authenticator.post(request, form, nonce, redirect_to)
block['form'] = form
else:
block['form'] = form_class()
blocks.append(block)
else: # New frontends API
auth_blocks = []
parameters = {'request': request, 'context': context}
remote_addr = request.META.get('REMOTE_ADDR')
login_hint = set(request.session.get('login-hint', []))
show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint, headers=HTTPHeaders(request))
if service:
show_ctx['service_ou_slug'] = service.ou and service.ou.slug
show_ctx['service_slug'] = service.slug
show_ctx['service'] = service
# check if the authenticator has multiple instances
if hasattr(authenticator, 'instances'):
for instance_id, instance in authenticator.instances(**parameters):
parameters['instance'] = instance
parameters['instance_id'] = instance_id
if not authenticator.shown(instance_id=instance_id, ctx=show_ctx):
continue
block = utils.get_authenticator_method(authenticator, 'login', parameters)
# update block id in order to separate instances
block['id'] = '%s_%s' % (block['id'], instance_id)
auth_blocks.append(block)
else:
if authenticator.shown(ctx=show_ctx):
auth_blocks.append(utils.get_authenticator_method(authenticator, 'login', parameters))
# If a login frontend method returns an HttpResponse with a status code != 200
# this response is returned.
for block in auth_blocks:
if block:
if block['status_code'] != 200:
return block['response']
blocks.append(block)
# run the only available authenticator if able to autorun
if len(blocks) == 1:
block = blocks[0]
authenticator = block['authenticator']
if hasattr(authenticator, 'autorun'):
return authenticator.autorun(request, block['id'])
# Old frontends API
for block in blocks:
fid = block['id']
if 'form' not in block:
continue
authenticator = block['authenticator']
context.update(
{'submit_name': 'submit-%s' % fid, redirect_field_name: redirect_to, 'form': block['form']}
)
if hasattr(authenticator, 'get_context'):
context.update(authenticator.get_context())
sub_template_name = authenticator.template()
block['content'] = render_to_string(sub_template_name, context, request=request)
# legacy context variable
rendered_forms = [(block['name'], block['content']) for block in blocks]
context.update(
{
'methods': rendered_forms,
# new definition
'blocks': collections.OrderedDict((block['id'], block) for block in blocks),
redirect_field_name: redirect_to,
}
)
return render(request, template_name, context)
def service_list(request):
'''Compute the service list to show on user homepage'''
return utils.accumulate_from_backends(request, 'service_list')
class Homepage(cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/homepage.html', 'authentic2/homepage.html']
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_HOMEPAGE_URL:
return utils.redirect(request, app_settings.A2_HOMEPAGE_URL)
return login_required(super(Homepage, self).dispatch)(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super(Homepage, self).get_context_data(**kwargs)
ctx['account_management'] = 'account_management'
ctx['authorized_services'] = service_list(self.request)
return ctx
homepage = Homepage.as_view()
class ProfileView(cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/account_management.html', 'authentic2/accounts.html']
title = _('Your account')
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_ACCOUNTS_URL:
return utils.redirect(request, app_settings.A2_ACCOUNTS_URL)
return super(ProfileView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ProfileView, self).get_context_data(**kwargs)
frontends = utils.get_backends('AUTH_FRONTENDS')
request = self.request
if request.method == "POST":
for frontend in frontends:
if 'submit-%s' % frontend.id in request.POST:
form = frontend.form()(data=request.POST)
if form.is_valid():
return frontend.post(request, form, None, '/profile')
# User attributes management
profile = []
field_names = app_settings.A2_PROFILE_FIELDS
if not field_names:
field_names = list(app_settings.A2_REGISTRATION_FIELDS)
for field_name in getattr(request.user, 'USER_PROFILE', []):
if field_name not in field_names:
field_names.append(field_name)
qs = models.Attribute.objects.filter(Q(user_editable=True) | Q(user_visible=True))
qs = qs.values_list('name', flat=True)
for field_name in qs:
if field_name not in field_names:
field_names.append(field_name)
attributes = []
for field_name in field_names:
title = None
if isinstance(field_name, (list, tuple)):
if len(field_name) > 1:
title = field_name[1]
field_name = field_name[0]
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
attribute = None
if attribute:
if not attribute.user_visible:
continue
html_value = attribute.get_kind().get('html_value', lambda a, b: b)
qs = models.AttributeValue.objects.with_owner(request.user)
qs = qs.filter(attribute=attribute)
qs = qs.select_related()
value = [at_value.to_python() for at_value in qs]
value = filter(None, value)
value = [html_value(attribute, at_value) for at_value in value]
if not title:
title = str(attribute)
else:
# fallback to model attributes
try:
field = request.user._meta.get_field(field_name)
except FieldDoesNotExist:
continue
if not title:
title = field.verbose_name
value = getattr(self.request.user, field_name, None)
attribute = models.Attribute(name=field_name, label=title)
raw_value = None
if value:
if callable(value):
value = value()
if not isinstance(value, (list, tuple)):
value = (value,)
raw_value = value
value = [str(v) for v in value]
if value or app_settings.A2_PROFILE_DISPLAY_EMPTY_FIELDS:
profile.append((title, value))
attributes.append({'attribute': attribute, 'values': raw_value})
# Credentials management
parameters = {'request': request, 'context': context}
profiles = [utils.get_authenticator_method(frontend, 'profile', parameters) for frontend in frontends]
# Old frontends data structure for templates
blocks = [block['content'] for block in profiles if block]
# New frontends data structure for templates
blocks_by_id = collections.OrderedDict((block['id'], block) for block in profiles if block)
idp_backends = utils.get_backends()
# Get actions for federation management
federation_management = []
if app_settings.A2_PROFILE_CAN_MANAGE_FEDERATION:
for idp_backend in idp_backends:
if hasattr(idp_backend, 'federation_management'):
federation_management.extend(idp_backend.federation_management(request))
context.update(
{
'frontends_block': blocks,
'frontends_block_by_id': blocks_by_id,
'profile': profile,
'attributes': attributes,
'allow_account_deletion': app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT,
'allow_profile_edit': EditProfile.can_edit_profile(),
'allow_email_change': app_settings.A2_PROFILE_CAN_CHANGE_EMAIL,
'allow_authorization_management': False,
# TODO: deprecated should be removed when publik-base-theme is updated
'allow_password_change': utils.user_can_change_password(request=request),
'federation_management': federation_management,
}
)
if (
'authentic2_idp_oidc' in settings.INSTALLED_APPS
and app_settings.A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS
):
from authentic2_idp_oidc.models import OIDCClient
context['allow_authorization_management'] = OIDCClient.objects.filter(
authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_SERVICE
).exists()
hooks.call_hooks('modify_context_data', self, context)
return context
profile = login_required(ProfileView.as_view())
def logout_list(request):
'''Return logout links from idp backends'''
return utils.accumulate_from_backends(request, 'logout_list')
def redirect_logout_list(request):
'''Return redirect logout links from idp backends'''
return utils.accumulate_from_backends(request, 'redirect_logout_list')
def logout(request, next_url=None, do_local=True, check_referer=True):
"""Logout first check if a logout request is authorized, i.e.
that logout was done using a POST with CSRF token or with a GET
from the same site.
Logout endpoints of IdP module must re-user the view by setting
check_referer and do_local to False.
"""
next_url = next_url or utils.select_next_url(request, settings.LOGIN_REDIRECT_URL)
ctx = {}
ctx['next_url'] = next_url
ctx['redir_timeout'] = 60
local_logout_done = False
if request.user.is_authenticated:
if check_referer and not utils.check_referer(request):
return render(request, 'authentic2/logout_confirm.html', ctx)
do_local = do_local and 'local' in request.GET
if not do_local:
fragments = logout_list(request)
if fragments:
# Full logout with iframes
next_url = utils.make_url(
'auth_logout', params={'local': 'ok'}, next_url=next_url, sign_next_url=True
)
ctx['next_url'] = next_url
ctx['logout_list'] = fragments
ctx['message'] = _('Logging out from all your services')
return render(request, 'authentic2/logout.html', ctx)
# Get redirection targets for full logout with redirections
# (needed before local logout)
targets = redirect_logout_list(request)
logger.debug('Accumulated redirections : {}'.format(targets))
# Local logout
request.journal.record('user.logout')
auth_logout(request)
logger.info('Logged out')
local_logout_done = True
# Last redirection will be the current next_url
targets.append(next_url)
# Put redirection targets in session (after local logout)
request.session['logout_redirections'] = targets
logger.debug('All planned redirections : {}'.format(targets))
# Full logout by redirections if any
targets = request.session.pop('logout_redirections', None)
if targets:
# Full logout with redirections
logger.debug('Redirections queue: {}'.format(targets))
next_url = targets.pop(0)
request.session['logout_redirections'] = targets
logger.debug('Next redirection : {}'.format(next_url))
response = shortcuts.redirect(next_url)
if local_logout_done:
response.set_cookie('a2_just_logged_out', 1, max_age=60)
return response
def login_password_profile(request, *args, **kwargs):
context = kwargs.pop('context', {})
can_change_password = utils.user_can_change_password(request=request)
has_usable_password = request.user.has_usable_password()
context.update(
{
'can_change_password': can_change_password,
'has_usable_password': has_usable_password,
}
)
return render_to_string(
['auth/login_password_profile.html', 'authentic2/login_password_profile.html'],
context,
request=request,
)
class LoggedInView(View):
'''JSONP web service to detect if an user is logged'''
http_method_names = [u'get']
def check_referrer(self):
'''Check if the given referer is authorized'''
referer = self.request.META.get('HTTP_REFERER', '')
for valid_referer in app_settings.VALID_REFERERS:
if referer.startswith(valid_referer):
return True
return False
def get(self, request, *args, **kwargs):
if not self.check_referrer():
return HttpResponseForbidden()
callback = request.GET.get('callback')
content = u'{0}({1})'.format(callback, int(request.user.is_authenticated))
return HttpResponse(content, content_type='application/json')
logged_in = never_cache(LoggedInView.as_view())
def csrf_failure_view(request, reason=""):
messages.warning(request, _('The page is out of date, it was reloaded for you'))
return HttpResponseRedirect(request.get_full_path())
class PasswordResetView(FormView):
'''Ask for an email and send a password reset link by mail'''
form_class = passwords_forms.PasswordResetForm
title = _('Password Reset')
def get_success_url(self):
return reverse('password_reset_instructions')
def get_template_names(self):
return [
'authentic2/password_reset_form.html',
'registration/password_reset_form.html',
]
def get_form_kwargs(self, **kwargs):
kwargs = super(PasswordResetView, self).get_form_kwargs(**kwargs)
initial = kwargs.setdefault('initial', {})
initial['next_url'] = utils.select_next_url(self.request, '')
return kwargs
def get_context_data(self, **kwargs):
ctx = super(PasswordResetView, self).get_context_data(**kwargs)
if app_settings.A2_USER_CAN_RESET_PASSWORD is False:
raise Http404('Password reset is not allowed.')
ctx['title'] = _('Password reset')
return ctx
def form_valid(self, form):
if form.is_robot():
return utils.redirect(
self.request,
self.get_success_url(),
params={
'robot': 'on',
},
)
email = form.cleaned_data.get('email') or form.cleaned_data.get('email_or_username')
# if an email has already been sent, warn once before allowing resend
token = models.Token.objects.filter(
kind='pw-reset', content__email=email, expires__gt=timezone.now()
).exists()
resend_key = 'pw-reset-allow-resend'
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
self.request.session[resend_key] = True
form.add_error(
'email',
_(
'An email has already been sent to %s. Click "Validate" again if '
'you really want it to be sent again.'
)
% email,
)
return self.form_invalid(form)
self.request.session[resend_key] = False
if is_ratelimited(
self.request,
key='post:email',
group='pw-reset-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
increment=True,
):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_(
'Multiple emails have already been sent to this address. Further attempts are '
'blocked, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
if is_ratelimited(
self.request,
key='ip',
group='pw-reset-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
increment=True,
):
self.request.journal.record('user.password.reset.failure', email=email)
form.add_error(
'email',
_(
'Multiple password reset attempts have already been made from this IP address. No '
'further email will be sent, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
form.save()
self.request.session['reset_email'] = email
return super(PasswordResetView, self).form_valid(form)
password_reset = PasswordResetView.as_view()
class PasswordResetInstructionsView(TemplateView):
template_name = 'registration/password_reset_instructions.html'
def get_context_data(self, **kwargs):
ctx = super(PasswordResetInstructionsView, self).get_context_data(**kwargs)
ctx['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
return ctx
password_reset_instructions = PasswordResetInstructionsView.as_view()
class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
"""Validate password reset link, show a set password form and login
the user.
"""
form_class = passwords_forms.SetPasswordForm
title = _('Password Reset')
def get_template_names(self):
return [
'registration/password_reset_confirm.html',
'authentic2/password_reset_confirm.html',
]
def dispatch(self, request, *args, **kwargs):
token = kwargs['token'].replace(' ', '')
try:
self.token = models.Token.use('pw-reset', token, delete=False)
except models.Token.DoesNotExist:
messages.warning(request, _('Password reset token is unknown or expired'))
return utils.redirect(request, self.get_success_url())
except (TypeError, ValueError):
messages.warning(request, _('Password reset token is invalid'))
return utils.redirect(request, self.get_success_url())
uid = self.token.content['user']
try:
# use authenticate to eventually get an LDAPUser
self.user = utils.authenticate(request, user=User._default_manager.get(pk=uid))
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
messages.warning(request, _('User not found'))
return utils.redirect(request, self.get_success_url())
can_reset_password = utils.get_user_flag(
user=self.user, name='can_reset_password', default=self.user.has_usable_password()
)
if not can_reset_password:
messages.warning(
request, _('It\'s not possible to reset your password. Please contact an administrator.')
)
return utils.redirect(request, self.get_success_url())
return super(PasswordResetConfirmView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super(PasswordResetConfirmView, self).get_context_data(**kwargs)
# compatibility with existing templates !
ctx['title'] = _('Enter new password')
ctx['validlink'] = True
return ctx
def get_form_kwargs(self):
kwargs = super(PasswordResetConfirmView, self).get_form_kwargs()
kwargs['user'] = self.user
return kwargs
def form_valid(self, form):
# Changing password by mail validate the email
form.user.email_verified = True
form.save()
hooks.call_hooks('event', name='password-reset-confirm', user=form.user, token=self.token, form=form)
logger.info(u'password reset for user %s with token %r', self.user, self.token.uuid)
self.token.delete()
return self.finish()
def finish(self):
response = utils.simulate_authentication(self.request, self.user, 'email')
self.request.journal.record('user.password.reset')
return response
password_reset_confirm = PasswordResetConfirmView.as_view()
class BaseRegistrationView(FormView):
form_class = registration_forms.RegistrationForm
template_name = 'registration/registration_form.html'
title = _('Registration')
def dispatch(self, request, *args, **kwargs):
if not getattr(settings, 'REGISTRATION_OPEN', True):
raise Http404('Registration is not open.')
self.token = {}
self.ou = get_default_ou()
# load pre-filled values
if request.GET.get('token'):
try:
self.token = signing.loads(
request.GET.get('token'), max_age=settings.ACCOUNT_ACTIVATION_DAYS * 3600 * 24
)
except (TypeError, ValueError, signing.BadSignature) as e:
logger.warning(u'registration_view: invalid token: %s', e)
return HttpResponseBadRequest('invalid token', content_type='text/plain')
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
self.next_url = self.token.pop(REDIRECT_FIELD_NAME, utils.select_next_url(request, None))
return super(BaseRegistrationView, self).dispatch(request, *args, **kwargs)
def form_valid(self, form):
if form.is_robot():
return utils.redirect(
self.request,
'registration_complete',
params={
REDIRECT_FIELD_NAME: self.next_url,
'robot': 'on',
},
)
email = form.cleaned_data.pop('email')
# if an email has already been sent, warn once before allowing resend
token = models.Token.objects.filter(
kind='registration', content__email=email, expires__gt=timezone.now()
).exists()
resend_key = 'registration-allow-resend'
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
self.request.session[resend_key] = True
form.add_error(
'email',
_(
'An email has already been sent to %s. Click "Validate" again if '
'you really want it to be sent again.'
)
% email,
)
return self.form_invalid(form)
self.request.session[resend_key] = False
if is_ratelimited(
self.request,
key='post:email',
group='registration-email',
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
increment=True,
):
form.add_error(
'email',
_(
'Multiple emails have already been sent to this address. Further attempts are '
'blocked, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
if is_ratelimited(
self.request,
key='ip',
group='registration-email',
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
increment=True,
):
form.add_error(
'email',
_(
'Multiple registration attempts have already been made from this IP address. No '
'further email will be sent, please check your spam folder or try again later.'
),
)
return self.form_invalid(form)
for field in form.cleaned_data:
self.token[field] = form.cleaned_data[field]
# propagate service to the registration completion view
service = get_service_from_request(self.request)
if service:
set_service_ref(self.token, service)
self.token.pop(REDIRECT_FIELD_NAME, None)
self.token.pop('email', None)
utils.send_registration_mail(self.request, email, next_url=self.next_url, ou=self.ou, **self.token)
self.request.session['registered_email'] = email
return utils.redirect(
self.request, 'registration_complete', params={REDIRECT_FIELD_NAME: self.next_url}
)
def get_context_data(self, **kwargs):
context = super(BaseRegistrationView, self).get_context_data(**kwargs)
parameters = {'request': self.request, 'context': context}
blocks = [
utils.get_authenticator_method(authenticator, 'registration', parameters)
for authenticator in utils.get_backends('AUTH_FRONTENDS')
]
context['frontends'] = collections.OrderedDict((block['id'], block) for block in blocks if block)
return context
class RegistrationView(cbv.ValidateCSRFMixin, BaseRegistrationView):
pass
class RegistrationCompletionView(CreateView):
model = get_user_model()
success_url = 'auth_homepage'
def get_template_names(self):
if self.users and 'create' not in self.request.GET:
return ['registration/registration_completion_choose.html']
else:
return ['registration/registration_completion_form.html']
def get_success_url(self):
try:
redirect_url, next_field = app_settings.A2_REGISTRATION_REDIRECT
except Exception:
redirect_url = app_settings.A2_REGISTRATION_REDIRECT
next_field = REDIRECT_FIELD_NAME
if self.token and self.token.get(REDIRECT_FIELD_NAME):
url = self.token[REDIRECT_FIELD_NAME]
if redirect_url:
url = utils.make_url(redirect_url, params={next_field: url})
else:
if redirect_url:
url = redirect_url
else:
url = utils.make_url(self.success_url)
return url
def dispatch(self, request, *args, **kwargs):
registration_token = kwargs['registration_token'].replace(' ', '')
try:
token = models.Token.use('registration', registration_token, delete=False)
except models.Token.DoesNotExist:
messages.warning(request, _('Your activation key is unknown or expired'))
return utils.redirect(request, 'registration_register')
except (TypeError, ValueError):
messages.warning(request, _('Activation failed'))
return utils.redirect(request, 'registration_register')
self.token_obj = token
self.token = token.content
# allow access to token content from external authentication backends
request.token = self.token
self.authentication_method = self.token.get('authentication_method', 'email')
self.email = self.token['email']
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
else:
self.ou = get_default_ou()
self.users = User.objects.filter(email__iexact=self.email).order_by('date_joined')
if self.ou:
self.users = self.users.filter(ou=self.ou)
self.email_is_unique = app_settings.A2_EMAIL_IS_UNIQUE or app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE
if self.ou:
self.email_is_unique |= self.ou.email_is_unique
self.init_fields_labels_and_help_texts()
# if registration is done during an SSO add the service to the registration event
self.service = get_service_from_token(self.token)
return super(RegistrationCompletionView, self).dispatch(request, *args, **kwargs)
def init_fields_labels_and_help_texts(self):
attributes = models.Attribute.objects.filter(asked_on_registration=True)
default_fields = attributes.values_list('name', flat=True)
required_fields = models.Attribute.objects.filter(required=True).values_list('name', flat=True)
fields, labels = utils.get_fields_and_labels(
app_settings.A2_REGISTRATION_FIELDS,
default_fields,
app_settings.A2_REGISTRATION_REQUIRED_FIELDS,
app_settings.A2_REQUIRED_FIELDS,
models.Attribute.objects.filter(required=True).values_list('name', flat=True),
)
help_texts = {}
if app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL:
labels['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL
if app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT:
help_texts['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT
required = list(app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_fields)
if 'email' in fields:
fields.remove('email')
for field in self.token.get('skip_fields') or []:
if field in fields:
fields.remove(field)
self.fields = fields
self.labels = labels
self.required = required
self.help_texts = help_texts
def get_form_class(self):
if not self.token.get('valid_email', True):
self.fields.append('email')
self.required.append('email')
form_class = registration_forms.RegistrationCompletionForm
if self.token.get('no_password', False):
form_class = registration_forms.RegistrationCompletionFormNoPassword
form_class = profile_forms.modelform_factory(
self.model,
form=form_class,
fields=self.fields,
labels=self.labels,
required=self.required,
help_texts=self.help_texts,
)
if 'username' in self.fields and app_settings.A2_REGISTRATION_FORM_USERNAME_REGEX:
# Keep existing field label and help_text
old_field = form_class.base_fields['username']
field = CharField(
max_length=256,
label=old_field.label,
help_text=old_field.help_text,
validators=[validators.UsernameValidator()],
)
form_class = type('RegistrationForm', (form_class,), {'username': field})
return form_class
def get_form_kwargs(self, **kwargs):
'''Initialize mail from token'''
kwargs = super(RegistrationCompletionView, self).get_form_kwargs(**kwargs)
if 'ou' in self.token:
ou = get_object_or_404(OU, id=self.token['ou'])
else:
ou = get_default_ou()
attributes = {'email': self.email, 'ou': ou}
for key in self.token:
if key in app_settings.A2_PRE_REGISTRATION_FIELDS:
attributes[key] = self.token[key]
logger.debug(u'attributes %s', attributes)
prefilling_list = utils.accumulate_from_backends(self.request, 'registration_form_prefill')
logger.debug(u'prefilling_list %s', prefilling_list)
# Build a single meaningful prefilling with sets of values
prefilling = {}
for p in prefilling_list:
for name, values in p.items():
if name in self.fields:
prefilling.setdefault(name, set()).update(values)
logger.debug(u'prefilling %s', prefilling)
for name, values in prefilling.items():
attributes[name] = ' '.join(values)
logger.debug(u'attributes with prefilling %s', attributes)
if self.token.get('user_id'):
kwargs['instance'] = User.objects.get(id=self.token.get('user_id'))
else:
init_kwargs = {}
for key in ('email', 'first_name', 'last_name', 'ou'):
if key in attributes:
init_kwargs[key] = attributes[key]
kwargs['instance'] = get_user_model()(**init_kwargs)
return kwargs
def get_form(self, form_class=None):
form = super(RegistrationCompletionView, self).get_form(form_class=form_class)
hooks.call_hooks('front_modify_form', self, form)
return form
def get_context_data(self, **kwargs):
ctx = super(RegistrationCompletionView, self).get_context_data(**kwargs)
ctx['token'] = self.token
ctx['users'] = self.users
ctx['email'] = self.email
ctx['email_is_unique'] = self.email_is_unique
ctx['create'] = 'create' in self.request.GET
return ctx
def get(self, request, *args, **kwargs):
if len(self.users) == 1 and self.email_is_unique:
# Found one user, EMAIL is unique, log her in
utils.simulate_authentication(
request, self.users[0], method=self.authentication_method, service=self.service
)
return utils.redirect(request, self.get_success_url())
confirm_data = self.token.get('confirm_data', False)
if confirm_data == 'required':
fields_to_confirm = self.required
else:
fields_to_confirm = self.fields
if all(field in self.token for field in fields_to_confirm) and (
not confirm_data or confirm_data == 'required'
):
# We already have every fields
form_kwargs = self.get_form_kwargs()
form_class = self.get_form_class()
data = self.token
if 'password' in data:
data['password1'] = data['password']
data['password2'] = data['password']
del data['password']
form_kwargs['data'] = data
form = form_class(**form_kwargs)
if form.is_valid():
user = form.save()
return self.registration_success(request, user, form)
self.get_form = lambda *args, **kwargs: form
return super(RegistrationCompletionView, self).get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if self.users and self.email_is_unique:
# email is unique, users already exist, creating a new one is forbidden !
return utils.redirect(
request, request.resolver_match.view_name, args=self.args, kwargs=self.kwargs
)
if 'uid' in request.POST:
uid = request.POST['uid']
for user in self.users:
if str(user.id) == uid:
utils.simulate_authentication(
request, user, method=self.authentication_method, service=self.service
)
return utils.redirect(request, self.get_success_url())
return super(RegistrationCompletionView, self).post(request, *args, **kwargs)
def form_valid(self, form):
# remove verified fields from form, this allows an authentication
# method to provide verified data fields and to present it to the user,
# while preventing the user to modify them.
for av in models.AttributeValue.objects.with_owner(form.instance):
if av.verified and av.attribute.name in form.fields:
del form.fields[av.attribute.name]
if (
'email' in self.request.POST
and ('email' not in self.token or self.request.POST['email'] != self.token['email'])
and not self.token.get('skip_email_check')
):
# If an email is submitted it must be validated or be the same as in the token
data = form.cleaned_data.copy()
# handle complex attributes
for attribute in iter_attributes():
if attribute.name not in data:
continue
kind = attribute.get_kind()
if kind['serialize'] == attribute_kinds.identity:
continue
data[attribute.name] = kind['serialize'](data[attribute.name])
data['no_password'] = self.token.get('no_password', False)
utils.send_registration_mail(self.request, ou=self.ou, next_url=self.get_success_url(), **data)
self.token_obj.delete()
self.request.session['registered_email'] = form.cleaned_data['email']
return utils.redirect(self.request, 'registration_complete')
super(RegistrationCompletionView, self).form_valid(form)
return self.registration_success(self.request, form.instance, form)
def registration_success(self, request, user, form):
request.journal.record('user.registration', user=user, session=None, how=self.authentication_method)
hooks.call_hooks(
'event',
name='registration',
user=user,
form=form,
view=self,
authentication_method=self.authentication_method,
token=self.token,
service=self.service and self.service.slug,
)
self.token_obj.delete()
utils.simulate_authentication(request, user, method=self.authentication_method, service=self.service)
message_template = loader.get_template('authentic2/registration_success_message.html')
messages.info(self.request, message_template.render(request=request))
self.send_registration_success_email(user)
return utils.redirect(request, self.get_success_url())
def send_registration_success_email(self, user):
if not user.email:
return
template_names = ['authentic2/registration_success']
login_url = self.request.build_absolute_uri(settings.LOGIN_URL)
utils.send_templated_mail(
user,
template_names=template_names,
context={
'user': user,
'email': user.email,
'site': self.request.get_host(),
'login_url': login_url,
'method': self.authentication_method,
},
request=self.request,
)
registration_completion = RegistrationCompletionView.as_view()
class DeleteView(TemplateView):
template_name = 'authentic2/accounts_delete_request.html'
title = _('Request account deletion')
def dispatch(self, request, *args, **kwargs):
if not app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT:
return utils.redirect(request, '..')
return super(DeleteView, self).dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, 'account_management')
utils.send_account_deletion_code(self.request, self.request.user)
messages.info(request, _("An account deletion validation email has been sent to your email address."))
return utils.redirect(request, 'account_management')
def get_context_data(self, **kwargs):
ctx = super(DeleteView, self).get_context_data(**kwargs)
ctx['email'] = self.request.user.email
return ctx
class ValidateDeletionView(TemplateView):
template_name = 'authentic2/accounts_delete_validation.html'
title = _('Confirm account deletion')
user = None
def dispatch(self, request, *args, **kwargs):
try:
deletion_token = signing.loads(
kwargs['deletion_token'], max_age=app_settings.A2_DELETION_REQUEST_LIFETIME
)
user_pk = deletion_token['user_pk']
self.user = get_user_model().objects.get(pk=user_pk)
# A user account wont be deactived twice
if not self.user.is_active:
raise ValidationError(_('This account is inactive, it cannot be deleted.'))
logger.info('user %s confirmed the deletion of their own account', self.user)
except signing.SignatureExpired:
error = _('The account deletion request is too old, try again')
except signing.BadSignature:
error = _('The account deletion request is invalid, try again')
except ValueError:
error = _('The account deletion request was not on this site, try again')
except ValidationError as e:
error = e.message
except get_user_model().DoesNotExist:
error = _('This account has previously been deleted.')
else:
return super(ValidateDeletionView, self).dispatch(request, *args, **kwargs)
messages.error(request, error)
return utils.redirect(request, 'auth_homepage')
def post(self, request, *args, **kwargs):
if 'cancel' not in request.POST:
utils.send_account_deletion_mail(self.request, self.user)
logger.info(u'deletion of account %s performed', self.user)
hooks.call_hooks('event', name='delete-account', user=self.user)
request.journal.record('user.deletion', user=self.user)
is_deleted_user_logged = self.user == request.user
self.user.delete()
messages.info(request, _('Deletion performed.'))
# No real use for cancel_url or next_url here, assuming the link
# has been received by email. We instead redirect the user to the
# homepage.
if is_deleted_user_logged:
return logout(request, check_referer=False)
return utils.redirect(request, 'auth_homepage')
def get_context_data(self, **kwargs):
ctx = super(ValidateDeletionView, self).get_context_data(**kwargs)
ctx['user'] = self.user # Not necessarily the user in request
return ctx
class RegistrationCompleteView(TemplateView):
template_name = 'registration/registration_complete.html'
def get_context_data(self, **kwargs):
kwargs['next_url'] = utils.select_next_url(self.request, settings.LOGIN_REDIRECT_URL)
kwargs['from_email'] = settings.DEFAULT_FROM_EMAIL
kwargs['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
return super(RegistrationCompleteView, self).get_context_data(
account_activation_days=settings.ACCOUNT_ACTIVATION_DAYS, **kwargs
)
registration_complete = RegistrationCompleteView.as_view()
class PasswordChangeView(DjPasswordChangeView):
title = _('Password Change')
do_not_call_in_templates = True
def dispatch(self, request, *args, **kwargs):
if 'next_url' in request.POST and request.POST['next_url']:
self.post_change_redirect = request.POST['next_url']
elif REDIRECT_FIELD_NAME in request.GET:
self.post_change_redirect = request.GET[REDIRECT_FIELD_NAME]
else:
self.post_change_redirect = reverse('account_management')
if not utils.user_can_change_password(request=request):
messages.warning(request, _('Password change is forbidden'))
return utils.redirect(request, self.post_change_redirect)
return super(PasswordChangeView, self).dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, self.post_change_redirect)
return super(PasswordChangeView, self).post(request, *args, **kwargs)
def form_valid(self, form):
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
messages.info(self.request, _('Password changed'))
models.PasswordReset.objects.filter(user=self.request.user).delete()
response = super(PasswordChangeView, self).form_valid(form)
self.request.journal.record('user.password.change', session=self.request.session)
return response
def get_form_class(self):
if self.request.user.has_usable_password():
return passwords_forms.PasswordChangeForm
else:
return passwords_forms.SetPasswordForm
def get_context_data(self, **kwargs):
ctx = super(PasswordChangeView, self).get_context_data(**kwargs)
ctx[REDIRECT_FIELD_NAME] = self.post_change_redirect
return ctx
password_change = decorators.setting_enabled('A2_REGISTRATION_CAN_CHANGE_PASSWORD')(
PasswordChangeView.as_view()
)
class SuView(View):
def get(self, request, uuid):
user = switch_user.resolve_token(uuid)
if not user:
raise Http404
# LDAP ad-hoc behaviour
if user.userexternalid_set.exists():
user = utils.authenticate(request, user=user)
return utils.simulate_authentication(request, user, 'su')
su = SuView.as_view()
class AuthorizedOauthServicesView(TemplateView):
template_name = 'authentic2/accounts_authorized_oauth_services.html'
title = _('Consent Management')
def get_context_data(self, **kwargs):
from authentic2_idp_oidc.models import OIDCAuthorization
context = super(AuthorizedOauthServicesView, self).get_context_data(**kwargs)
context['authorized_oauth_services'] = OIDCAuthorization.objects.filter(user=self.request.user)
return context
def post(self, request, *args, **kwargs):
from authentic2_idp_oidc.models import OIDCAuthorization
qs = OIDCAuthorization.objects.filter(user=request.user)
auth_id = request.POST.get('auth_id')
if auth_id:
qs = qs.filter(id=auth_id)
qs.delete()
return HttpResponseRedirect(reverse('authorized-oauth-services'))
authorized_oauth_services = decorators.setting_enabled('A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS')(
AuthorizedOauthServicesView.as_view()
)
def old_view_redirect(request, to, message=None):
'''Redirect old URL to new URL, eventually showing a message.'''
if message:
messages.info(request, message)
return utils.redirect(request, to=to)
class DisplayMessageAndContinueView(TemplateView):
template_name = 'authentic2/display_message_and_continue.html'
def get(self, request, *args, **kwargs):
self.url = utils.select_next_url(self.request, reverse('account_management'))
self.only_info = True
storage = messages.get_messages(request)
if not len(storage):
return utils.redirect(request, self.url, resolve=False)
for message in storage:
if message.level != messages.INFO:
# If there are warning or error messages, the intermediate page must not redirect
# automatically but should ask for an user confirmation
self.only_info = False
storage.used = False
return super().get(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['url'] = self.url
ctx['only_info'] = self.only_info
return ctx
display_message_and_continue = DisplayMessageAndContinueView.as_view()