authentic/src/authentic2/views.py

1211 lines
51 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
from email.utils import parseaddr
import logging
import random
import re
from django.conf import settings
from django.shortcuts import render, get_object_or_404
from django.template.loader import render_to_string
from django.views.generic.edit import UpdateView, FormView
from django.views.generic import TemplateView
from django.views.generic.base import View
from django.contrib.auth import SESSION_KEY
from django import http, shortcuts
from django.core import signing
from django.core.urlresolvers import reverse
from django.core.exceptions import ValidationError
from django.contrib import messages
from django.utils import six
from django.utils.translation import ugettext as _
from django.contrib.auth import logout as auth_logout
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth.views import password_change as dj_password_change
from django.http import (HttpResponseRedirect, HttpResponseForbidden, HttpResponse)
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
from django.views.decorators.cache import never_cache
from django.views.decorators.debug import sensitive_post_parameters
from django.contrib.auth.decorators import login_required
from django.db.models.fields import FieldDoesNotExist
from django.db.models.query import Q
from django.contrib.auth import get_user_model
from django.http import Http404
from django.utils.http import urlsafe_base64_decode
from django.views.generic.edit import CreateView
from django.forms import CharField, Form
from django.core.urlresolvers import reverse_lazy
from django.http import HttpResponseBadRequest
from django.template import loader
from . import (utils, app_settings, compat, decorators, constants,
models, cbv, hooks, validators)
from .a2_rbac.utils import get_default_ou
from .a2_rbac.models import OrganizationalUnit as OU
from .forms import (
passwords as passwords_forms,
registration as registration_forms,
profile as profile_forms)
User = get_user_model()
logger = logging.getLogger(__name__)
def server_error(request, template_name='500.html'):
"""
500 error handler.
Templates: `500.html`
Context: None
"""
return render(request, template_name)
class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
model = User
template_names = ['profiles/edit_profile.html',
'authentic2/accounts_edit.html']
title = _('Edit account data')
def get_template_names(self):
template_names = []
if 'scope' in self.kwargs:
template_names.append('authentic2/accounts_edit_%s.html' % self.kwargs['scope'])
template_names.extend(self.template_names)
return template_names
@classmethod
def can_edit_profile(cls):
fields, labels = cls.get_fields()
return bool(fields) and app_settings.A2_PROFILE_CAN_EDIT_PROFILE
@classmethod
def get_fields(cls, scopes=None):
editable_profile_fields = []
for field in app_settings.A2_PROFILE_FIELDS:
if isinstance(field, (list, tuple)):
field_name = field[0]
else:
field_name = field
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
editable_profile_fields.append(field)
else:
if attribute.user_editable:
editable_profile_fields.append(field)
attributes = models.Attribute.objects.filter(user_editable=True)
if scopes:
scopes = set(scopes)
default_fields = [attribute.name for attribute in attributes if scopes & set(attribute.scopes.split())]
else:
default_fields = list(attributes.values_list('name', flat=True))
fields, labels = utils.get_fields_and_labels(
editable_profile_fields, default_fields)
if scopes:
# restrict fields to those in the scopes
fields = [field for field in fields if field in default_fields]
return fields, labels
def get_form_class(self):
if 'scope' in self.kwargs:
scopes = [self.kwargs['scope']]
else:
scopes = self.request.GET.get('scope', '').split()
fields, labels = self.get_fields(scopes=scopes)
# Email must be edited through the change email view, as it needs validation
fields = [field for field in fields if field != 'email']
return profile_forms.modelform_factory(
User, fields=fields,
labels=labels,
form=profile_forms.EditProfileForm)
def get_object(self):
return self.request.user
def get_form_kwargs(self, **kwargs):
kwargs = super(EditProfile, self).get_form_kwargs(**kwargs)
kwargs['prefix'] = 'edit-profile'
kwargs['next_url'] = utils.select_next_url(self.request, reverse('account_management'))
return kwargs
def get_success_url(self):
return utils.select_next_url(
self.request,
default=reverse('account_management'),
field_name='edit-profile-next_url',
include_post=True)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, self.get_success_url())
return super(EditProfile, self).post(request, *args, **kwargs)
def form_valid(self, form):
response = super(EditProfile, self).form_valid(form)
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
return response
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
login_required(EditProfile.as_view()))
class EmailChangeView(cbv.TemplateNamesMixin, FormView):
template_names = [
'profiles/email_change.html',
'authentic2/change_email.html'
]
title = _('Email Change')
success_url = '..'
def get_form_class(self):
if self.request.user.has_usable_password():
return profile_forms.EmailChangeForm
return profile_forms.EmailChangeFormNoPassword
def get_form_kwargs(self):
kwargs = super(EmailChangeView, self).get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, 'account_management')
return super(EmailChangeView, self).post(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
utils.send_email_change_email(self.request.user, email, request=self.request)
hooks.call_hooks('event', name='change-email', user=self.request.user, email=email)
messages.info(
self.request,
_('Your request for changing your email '
'is received. An email of validation '
'was sent to you. Please click on the '
'link contained inside.'))
logger.info('email change request')
return super(EmailChangeView, self).form_valid(form)
email_change = decorators.setting_enabled('A2_PROFILE_CAN_CHANGE_EMAIL')(
login_required(EmailChangeView.as_view()))
class EmailChangeVerifyView(TemplateView):
def get(self, request, *args, **kwargs):
if 'token' in request.GET:
try:
token = signing.loads(request.GET['token'],
max_age=app_settings.A2_EMAIL_CHANGE_TOKEN_LIFETIME)
user_pk = token['user_pk']
email = token['email']
user = User.objects.get(pk=user_pk)
non_unique = False
if app_settings.A2_EMAIL_IS_UNIQUE:
non_unique = User.objects.filter(email=email).exclude(pk=user_pk).exists()
elif user.ou and user.ou.email_is_unique:
non_unique = User.objects.filter(email=email, ou=user.ou).exclude(
pk=user_pk).exists()
if non_unique:
raise ValidationError(_('This email is already used by another account.'))
old_email = user.email
user.email = email
user.email_verified = True
user.save()
messages.info(request,
_('your request for changing your email for {0} is successful').format(email))
logger.info('user %s changed its email from %s to %s', user, old_email, email)
hooks.call_hooks('event', name='change-email-confirm', user=user, email=email)
except signing.SignatureExpired:
messages.error(request,
_('your request for changing your email is too old, try again'))
except signing.BadSignature:
messages.error(request,
_('your request for changing your email is invalid, try again'))
except ValueError:
messages.error(request,
_('your request for changing your email was not on this site, try again'))
except User.DoesNotExist:
messages.error(request,
_('your request for changing your email is for an unknown user, try again'))
except ValidationError as e:
messages.error(request, e.message)
else:
return shortcuts.redirect('account_management')
return shortcuts.redirect('email-change')
email_change_verify = EmailChangeVerifyView.as_view()
@csrf_exempt
@ensure_csrf_cookie
@never_cache
def login(request, template_name='authentic2/login.html',
redirect_field_name=REDIRECT_FIELD_NAME):
"""Displays the login form and handles the login action."""
# redirect user to homepage if already connected, if setting
# A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE is True
if (request.user.is_authenticated()
and app_settings.A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE):
return utils.redirect(request, 'auth_homepage')
redirect_to = request.GET.get(redirect_field_name)
if not redirect_to or ' ' in redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL
# Heavier security check -- redirects to http://example.com should
# not be allowed, but things like /view/?param=http://example.com
# should be allowed. This regex checks if there is a '//' *before* a
# question mark.
elif '//' in redirect_to and re.match(r'[^\?]*//', redirect_to):
redirect_to = settings.LOGIN_REDIRECT_URL
nonce = request.GET.get(constants.NONCE_FIELD_NAME)
authenticators = utils.get_backends('AUTH_FRONTENDS')
blocks = []
registration_url = utils.get_registration_url(
request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME))
context = {
'cancel': nonce is not None,
'can_reset_password': app_settings.A2_USER_CAN_RESET_PASSWORD is not False,
'registration_authorized': getattr(settings, 'REGISTRATION_OPEN', True),
'registration_url': registration_url,
}
# Cancel button
if request.method == "POST" \
and constants.CANCEL_FIELD_NAME in request.POST:
return utils.continue_to_next_url(request, params={'cancel': 1})
# Create blocks
for authenticator in authenticators:
# Legacy API
if not hasattr(authenticator, 'login'):
fid = authenticator.id
name = authenticator.name
form_class = authenticator.form()
submit_name = 'submit-%s' % fid
block = {
'id': fid,
'name': name,
'authenticator': authenticator
}
if request.method == 'POST' and submit_name in request.POST:
form = form_class(data=request.POST)
if form.is_valid():
if request.session.test_cookie_worked():
request.session.delete_test_cookie()
return authenticator.post(request, form, nonce, redirect_to)
block['form'] = form
else:
block['form'] = form_class()
blocks.append(block)
else: # New frontends API
auth_blocks = []
parameters = {'request': request,
'context': context}
# check if the authenticator has multiple instances
if hasattr(authenticator, 'instances'):
for instance_id, instance in authenticator.instances(**parameters):
parameters['instance'] = instance
parameters['instance_id'] = instance_id
block = utils.get_authenticator_method(authenticator, 'login', parameters)
# update block id in order to separate instances
block['id'] = '%s_%s' % (block['id'], instance_id)
auth_blocks.append(block)
else:
auth_blocks.append(utils.get_authenticator_method(authenticator, 'login', parameters))
# If a login frontend method returns an HttpResponse with a status code != 200
# this response is returned.
for block in auth_blocks:
if block:
if block['status_code'] != 200:
return block['response']
block['is_hidden'] = False
blocks.append(block)
# TODO: remove attribute below after cleaning up the templates
blocks[-1]['is_hidden'] = False
# Old frontends API
for block in blocks:
fid = block['id']
if 'form' not in block:
continue
authenticator = block['authenticator']
context.update({
'submit_name': 'submit-%s' % fid,
redirect_field_name: redirect_to,
'form': block['form']
})
if hasattr(authenticator, 'get_context'):
context.update(authenticator.get_context())
sub_template_name = authenticator.template()
block['content'] = render_to_string(sub_template_name, context, request=request)
request.session.set_test_cookie()
# legacy context variable
rendered_forms = [(block['name'], block['content']) for block in blocks]
context.update({
'methods': rendered_forms,
# new definition
'blocks': collections.OrderedDict((block['id'], block) for block in blocks),
'visible_blocks_count': len([x for x in blocks if not x['is_hidden']]),
redirect_field_name: redirect_to,
})
return render(request, template_name, context)
def service_list(request):
'''Compute the service list to show on user homepage'''
return utils.accumulate_from_backends(request, 'service_list')
class Homepage(cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/homepage.html', 'authentic2/homepage.html']
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_HOMEPAGE_URL:
return utils.redirect(request, app_settings.A2_HOMEPAGE_URL)
return login_required(super(Homepage, self).dispatch)(request, *args, **kwargs)
def get_context_data(self, **kwargs):
ctx = super(Homepage, self).get_context_data(**kwargs)
ctx['account_management'] = 'account_management'
ctx['authorized_services'] = service_list(self.request)
return ctx
homepage = Homepage.as_view()
class ProfileView(cbv.TemplateNamesMixin, TemplateView):
template_names = ['idp/account_management.html', 'authentic2/accounts.html']
title = _('Your account')
def dispatch(self, request, *args, **kwargs):
if app_settings.A2_ACCOUNTS_URL:
return utils.redirect(request, app_settings.A2_ACCOUNTS_URL)
return super(ProfileView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super(ProfileView, self).get_context_data(**kwargs)
frontends = utils.get_backends('AUTH_FRONTENDS')
request = self.request
if request.method == "POST":
for frontend in frontends:
if 'submit-%s' % frontend.id in request.POST:
form = frontend.form()(data=request.POST)
if form.is_valid():
if request.session.test_cookie_worked():
request.session.delete_test_cookie()
return frontend.post(request, form, None, '/profile')
# User attributes management
profile = []
field_names = app_settings.A2_PROFILE_FIELDS
if not field_names:
field_names = list(app_settings.A2_REGISTRATION_FIELDS)
for field_name in getattr(request.user, 'USER_PROFILE', []):
if field_name not in field_names:
field_names.append(field_name)
qs = models.Attribute.objects.filter(Q(user_editable=True) | Q(user_visible=True))
qs = qs.values_list('name', flat=True)
for field_name in qs:
if field_name not in field_names:
field_names.append(field_name)
attributes = []
for field_name in field_names:
title = None
if isinstance(field_name, (list, tuple)):
if len(field_name) > 1:
title = field_name[1]
field_name = field_name[0]
try:
attribute = models.Attribute.objects.get(name=field_name)
except models.Attribute.DoesNotExist:
attribute = None
if attribute:
if not attribute.user_visible:
continue
html_value = attribute.get_kind().get('html_value', lambda a, b: b)
qs = models.AttributeValue.objects.with_owner(request.user)
qs = qs.filter(attribute=attribute)
qs = qs.select_related()
value = [at_value.to_python() for at_value in qs]
value = filter(None, value)
value = [html_value(attribute, at_value) for at_value in value]
if not title:
title = six.text_type(attribute)
else:
# fallback to model attributes
try:
field = request.user._meta.get_field(field_name)
except FieldDoesNotExist:
continue
if not title:
title = field.verbose_name
value = getattr(self.request.user, field_name, None)
attribute = models.Attribute(name=field_name, label=title)
raw_value = None
if value:
if callable(value):
value = value()
if not isinstance(value, (list, tuple)):
value = (value,)
raw_value = value
value = map(six.text_type, value)
if value or app_settings.A2_PROFILE_DISPLAY_EMPTY_FIELDS:
profile.append((title, value))
attributes.append({'attribute': attribute, 'values': raw_value})
# Credentials management
parameters = {'request': request,
'context': context}
profiles = [utils.get_authenticator_method(frontend, 'profile', parameters) for frontend in frontends]
# Old frontends data structure for templates
blocks = [block['content'] for block in profiles if block]
# New frontends data structure for templates
blocks_by_id = collections.OrderedDict((block['id'], block) for block in profiles if block)
idp_backends = utils.get_backends()
# Get actions for federation management
federation_management = []
if app_settings.A2_PROFILE_CAN_MANAGE_FEDERATION:
for idp_backend in idp_backends:
if hasattr(idp_backend, 'federation_management'):
federation_management.extend(idp_backend.federation_management(request))
context.update({
'frontends_block': blocks,
'frontends_block_by_id': blocks_by_id,
'profile': profile,
'attributes': attributes,
'allow_account_deletion': app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT,
'allow_profile_edit': EditProfile.can_edit_profile(),
'allow_email_change': app_settings.A2_PROFILE_CAN_CHANGE_EMAIL,
# TODO: deprecated should be removed when publik-base-theme is updated
'allow_password_change': utils.user_can_change_password(request=request),
'federation_management': federation_management,
})
hooks.call_hooks('modify_context_data', self, context)
return context
profile = login_required(ProfileView.as_view())
def logout_list(request):
'''Return logout links from idp backends'''
return utils.accumulate_from_backends(request, 'logout_list')
def redirect_logout_list(request):
'''Return redirect logout links from idp backends'''
return utils.accumulate_from_backends(request, 'redirect_logout_list')
def logout(request,
next_url=None,
do_local=True,
check_referer=True):
'''Logout first check if a logout request is authorized, i.e.
that logout was done using a POST with CSRF token or with a GET
from the same site.
Logout endpoints of IdP module must re-user the view by setting
check_referer and do_local to False.
'''
next_url = next_url or utils.select_next_url(request, settings.LOGIN_REDIRECT_URL)
ctx = {}
ctx['next_url'] = next_url
ctx['redir_timeout'] = 60
local_logout_done = False
if request.user.is_authenticated():
if check_referer and not utils.check_referer(request):
return render(request, 'authentic2/logout_confirm.html', ctx)
do_local = do_local and 'local' in request.GET
if not do_local:
fragments = logout_list(request)
if fragments:
# Full logout with iframes
next_url = utils.make_url('auth_logout', params={'local': 'ok'},
next_url=next_url,
sign_next_url=True)
ctx['next_url'] = next_url
ctx['logout_list'] = fragments
ctx['message'] = _('Logging out from all your services')
return render(request, 'authentic2/logout.html', ctx)
# Get redirection targets for full logout with redirections
# (needed before local logout)
targets = redirect_logout_list(request)
logger.debug('Accumulated redirections : {}'.format(targets))
# Local logout
auth_logout(request)
logger.info('Logged out')
local_logout_done = True
# Last redirection will be the current next_url
targets.append(next_url)
# Put redirection targets in session (after local logout)
request.session['logout_redirections'] = targets
logger.debug('All planned redirections : {}'.format(targets))
# Full logout by redirections if any
targets = request.session.pop('logout_redirections', None)
if targets:
# Full logout with redirections
logger.debug('Redirections queue: {}'.format(targets))
next_url = targets.pop(0)
request.session['logout_redirections'] = targets
logger.debug('Next redirection : {}'.format(next_url))
response = shortcuts.redirect(next_url)
if local_logout_done:
response.set_cookie('a2_just_logged_out', 1, max_age=60)
return response
def login_password_profile(request, *args, **kwargs):
context = kwargs.pop('context', {})
can_change_password = utils.user_can_change_password(request=request)
has_usable_password = request.user.has_usable_password()
context.update({
'can_change_password': can_change_password,
'has_usable_password': has_usable_password,
})
return render_to_string(['auth/login_password_profile.html',
'authentic2/login_password_profile.html'],
context, request=request)
class LoggedInView(View):
'''JSONP web service to detect if an user is logged'''
http_method_names = [u'get']
def check_referrer(self):
'''Check if the given referer is authorized'''
referer = self.request.META.get('HTTP_REFERER', '')
for valid_referer in app_settings.VALID_REFERERS:
if referer.startswith(valid_referer):
return True
return False
def get(self, request, *args, **kwargs):
if not self.check_referrer():
return HttpResponseForbidden()
callback = request.GET.get('callback')
content = u'{0}({1})'.format(callback, int(request.user.is_authenticated()))
return HttpResponse(content, content_type='application/json')
logged_in = never_cache(LoggedInView.as_view())
def csrf_failure_view(request, reason=""):
messages.warning(request, _('The page is out of date, it was reloaded for you'))
return HttpResponseRedirect(request.get_full_path())
class PasswordResetView(cbv.NextURLViewMixin, FormView):
'''Ask for an email and send a password reset link by mail'''
form_class = passwords_forms.PasswordResetForm
title = _('Password Reset')
next_url_default = '/'
def get_template_names(self):
return [
'authentic2/password_reset_form.html',
'registration/password_reset_form.html',
]
def get_form_kwargs(self, **kwargs):
kwargs = super(PasswordResetView, self).get_form_kwargs(**kwargs)
initial = kwargs.setdefault('initial', {})
initial['next_url'] = utils.select_next_url(self.request, '')
return kwargs
def get_context_data(self, **kwargs):
ctx = super(PasswordResetView, self).get_context_data(**kwargs)
if app_settings.A2_USER_CAN_RESET_PASSWORD is False:
raise Http404('Password reset is not allowed.')
ctx['title'] = _('Password reset')
return ctx
def form_valid(self, form):
form.save()
# return to next URL
messages.info(self.request, _('If your email address exists in our '
'database, you will receive an email '
'containing instructions to reset '
'your password'))
return super(PasswordResetView, self).form_valid(form)
password_reset = PasswordResetView.as_view()
class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
'''Validate password reset link, show a set password form and login
the user.
'''
form_class = passwords_forms.SetPasswordForm
title = _('Password Reset')
def get_template_names(self):
return [
'registration/password_reset_confirm.html',
'authentic2/password_reset_confirm.html',
]
def dispatch(self, request, *args, **kwargs):
validlink = True
uidb64 = kwargs['uidb64']
self.token = token = kwargs['token']
UserModel = get_user_model()
# checked by URLconf
assert uidb64 is not None and token is not None
try:
uid = urlsafe_base64_decode(uidb64)
# use authenticate to eventually get an LDAPUser
self.user = utils.authenticate(request, user=UserModel._default_manager.get(pk=uid))
except (TypeError, ValueError, OverflowError,
UserModel.DoesNotExist):
validlink = False
messages.warning(request, _('User not found'))
if validlink and not compat.default_token_generator.check_token(self.user, token):
validlink = False
messages.warning(request, _('You reset password link is invalid or has expired'))
if not validlink:
return utils.redirect(request, self.get_success_url())
can_reset_password = utils.get_user_flag(user=self.user,
name='can_reset_password',
default=self.user.has_usable_password())
if not can_reset_password:
messages.warning(
request,
_('It\'s not possible to reset your password. Please contact an administrator.'))
return utils.redirect(request, self.get_success_url())
return super(PasswordResetConfirmView, self).dispatch(request, *args,
**kwargs)
def get_context_data(self, **kwargs):
ctx = super(PasswordResetConfirmView, self).get_context_data(**kwargs)
# compatibility with existing templates !
ctx['title'] = _('Enter new password')
ctx['validlink'] = True
return ctx
def get_form_kwargs(self):
kwargs = super(PasswordResetConfirmView, self).get_form_kwargs()
kwargs['user'] = self.user
return kwargs
def form_valid(self, form):
# Changing password by mail validate the email
form.user.email_verified = True
form.save()
hooks.call_hooks('event', name='password-reset-confirm', user=form.user, token=self.token,
form=form)
logger.info(u'password reset for user %s with token %r',
self.user, self.token[:9])
return self.finish()
def finish(self):
return utils.simulate_authentication(self.request, self.user, 'email')
password_reset_confirm = PasswordResetConfirmView.as_view()
def valid_token(method):
def f(request, *args, **kwargs):
try:
request.token = signing.loads(kwargs['registration_token'].replace(' ', ''),
max_age=settings.ACCOUNT_ACTIVATION_DAYS * 3600 * 24)
except signing.SignatureExpired:
messages.warning(request, _('Your activation key is expired'))
return utils.redirect(request, 'registration_register')
except signing.BadSignature:
messages.warning(request, _('Activation failed'))
return utils.redirect(request, 'registration_register')
return method(request, *args, **kwargs)
return f
class BaseRegistrationView(FormView):
form_class = registration_forms.RegistrationForm
template_name = 'registration/registration_form.html'
title = _('Registration')
def dispatch(self, request, *args, **kwargs):
if not getattr(settings, 'REGISTRATION_OPEN', True):
raise Http404('Registration is not open.')
self.token = {}
self.ou = get_default_ou()
# load pre-filled values
if request.GET.get('token'):
try:
self.token = signing.loads(
request.GET.get('token'),
max_age=settings.ACCOUNT_ACTIVATION_DAYS * 3600 * 24)
except (TypeError, ValueError, signing.BadSignature) as e:
logger.warning(u'registration_view: invalid token: %s', e)
return HttpResponseBadRequest('invalid token', content_type='text/plain')
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
self.next_url = self.token.pop(REDIRECT_FIELD_NAME, utils.select_next_url(request, None))
return super(BaseRegistrationView, self).dispatch(request, *args, **kwargs)
def form_valid(self, form):
email = form.cleaned_data.pop('email')
for field in form.cleaned_data:
self.token[field] = form.cleaned_data[field]
# propagate service to the registration completion view
if constants.SERVICE_FIELD_NAME in self.request.GET:
self.token[constants.SERVICE_FIELD_NAME] = \
self.request.GET[constants.SERVICE_FIELD_NAME]
self.token.pop(REDIRECT_FIELD_NAME, None)
self.token.pop('email', None)
utils.send_registration_mail(self.request, email, next_url=self.next_url,
ou=self.ou, **self.token)
self.request.session['registered_email'] = email
return utils.redirect(self.request, 'registration_complete', params={REDIRECT_FIELD_NAME: self.next_url})
def get_context_data(self, **kwargs):
context = super(BaseRegistrationView, self).get_context_data(**kwargs)
parameters = {'request': self.request,
'context': context}
blocks = [utils.get_authenticator_method(authenticator, 'registration', parameters)
for authenticator in utils.get_backends('AUTH_FRONTENDS')]
context['frontends'] = collections.OrderedDict((block['id'], block)
for block in blocks if block)
return context
class RegistrationView(cbv.ValidateCSRFMixin, BaseRegistrationView):
pass
class RegistrationCompletionView(CreateView):
model = get_user_model()
success_url = 'auth_homepage'
def get_template_names(self):
if self.users and 'create' not in self.request.GET:
return ['registration/registration_completion_choose.html']
else:
return ['registration/registration_completion_form.html']
def get_success_url(self):
try:
redirect_url, next_field = app_settings.A2_REGISTRATION_REDIRECT
except Exception:
redirect_url = app_settings.A2_REGISTRATION_REDIRECT
next_field = REDIRECT_FIELD_NAME
if self.token and self.token.get(REDIRECT_FIELD_NAME):
url = self.token[REDIRECT_FIELD_NAME]
if redirect_url:
url = utils.make_url(redirect_url, params={next_field: url})
else:
if redirect_url:
url = redirect_url
else:
url = utils.make_url(self.success_url)
return url
def dispatch(self, request, *args, **kwargs):
self.token = request.token
self.authentication_method = self.token.get('authentication_method', 'email')
self.email = request.token['email']
if 'ou' in self.token:
self.ou = OU.objects.get(pk=self.token['ou'])
else:
self.ou = get_default_ou()
self.users = User.objects.filter(email__iexact=self.email) \
.order_by('date_joined')
if self.ou:
self.users = self.users.filter(ou=self.ou)
self.email_is_unique = app_settings.A2_EMAIL_IS_UNIQUE \
or app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE
if self.ou:
self.email_is_unique |= self.ou.email_is_unique
self.init_fields_labels_and_help_texts()
# if registration is done during an SSO add the service to the registration event
self.service = self.token.get(constants.SERVICE_FIELD_NAME)
return super(RegistrationCompletionView, self) \
.dispatch(request, *args, **kwargs)
def init_fields_labels_and_help_texts(self):
attributes = models.Attribute.objects.filter(
asked_on_registration=True)
default_fields = attributes.values_list('name', flat=True)
required_fields = models.Attribute.objects.filter(required=True) \
.values_list('name', flat=True)
fields, labels = utils.get_fields_and_labels(
app_settings.A2_REGISTRATION_FIELDS,
default_fields,
app_settings.A2_REGISTRATION_REQUIRED_FIELDS,
app_settings.A2_REQUIRED_FIELDS,
models.Attribute.objects.filter(required=True).values_list('name', flat=True))
help_texts = {}
if app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL:
labels['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL
if app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT:
help_texts['username'] = \
app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT
required = list(app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + \
list(required_fields)
if 'email' in fields:
fields.remove('email')
for field in self.token.get('skip_fields') or []:
if field in fields:
fields.remove(field)
self.fields = fields
self.labels = labels
self.required = required
self.help_texts = help_texts
def get_form_class(self):
if not self.token.get('valid_email', True):
self.fields.append('email')
self.required.append('email')
form_class = registration_forms.RegistrationCompletionForm
if self.token.get('no_password', False):
form_class = registration_forms.RegistrationCompletionFormNoPassword
form_class = profile_forms.modelform_factory(
self.model,
form=form_class,
fields=self.fields,
labels=self.labels,
required=self.required,
help_texts=self.help_texts)
if 'username' in self.fields and app_settings.A2_REGISTRATION_FORM_USERNAME_REGEX:
# Keep existing field label and help_text
old_field = form_class.base_fields['username']
field = CharField(
max_length=256,
label=old_field.label,
help_text=old_field.help_text,
validators=[validators.UsernameValidator()])
form_class = type('RegistrationForm', (form_class,), {'username': field})
return form_class
def get_form_kwargs(self, **kwargs):
'''Initialize mail from token'''
kwargs = super(RegistrationCompletionView, self).get_form_kwargs(**kwargs)
if 'ou' in self.token:
ou = get_object_or_404(OU, id=self.token['ou'])
else:
ou = get_default_ou()
attributes = {'email': self.email, 'ou': ou}
for key in self.token:
if key in app_settings.A2_PRE_REGISTRATION_FIELDS:
attributes[key] = self.token[key]
logger.debug(u'attributes %s', attributes)
prefilling_list = utils.accumulate_from_backends(self.request, 'registration_form_prefill')
logger.debug(u'prefilling_list %s', prefilling_list)
# Build a single meaningful prefilling with sets of values
prefilling = {}
for p in prefilling_list:
for name, values in p.items():
if name in self.fields:
prefilling.setdefault(name, set()).update(values)
logger.debug(u'prefilling %s', prefilling)
for name, values in prefilling.items():
attributes[name] = ' '.join(values)
logger.debug(u'attributes with prefilling %s', attributes)
if self.token.get('user_id'):
kwargs['instance'] = User.objects.get(id=self.token.get('user_id'))
else:
init_kwargs = {}
for key in ('email', 'first_name', 'last_name', 'ou'):
if key in attributes:
init_kwargs[key] = attributes[key]
kwargs['instance'] = get_user_model()(**init_kwargs)
return kwargs
def get_form(self, form_class=None):
form = super(RegistrationCompletionView, self).get_form(form_class=form_class)
hooks.call_hooks('front_modify_form', self, form)
return form
def get_context_data(self, **kwargs):
ctx = super(RegistrationCompletionView, self).get_context_data(**kwargs)
ctx['token'] = self.token
ctx['users'] = self.users
ctx['email'] = self.email
ctx['email_is_unique'] = self.email_is_unique
ctx['create'] = 'create' in self.request.GET
return ctx
def get(self, request, *args, **kwargs):
if len(self.users) == 1 and self.email_is_unique:
# Found one user, EMAIL is unique, log her in
utils.simulate_authentication(
request, self.users[0],
method=self.authentication_method,
service_slug=self.service)
return utils.redirect(request, self.get_success_url())
confirm_data = self.token.get('confirm_data', False)
if confirm_data == 'required':
fields_to_confirm = self.required
else:
fields_to_confirm = self.fields
if (all(field in self.token for field in fields_to_confirm)
and (not confirm_data or confirm_data == 'required')):
# We already have every fields
form_kwargs = self.get_form_kwargs()
form_class = self.get_form_class()
data = self.token
if 'password' in data:
data['password1'] = data['password']
data['password2'] = data['password']
del data['password']
form_kwargs['data'] = data
form = form_class(**form_kwargs)
if form.is_valid():
user = form.save()
return self.registration_success(request, user, form)
self.get_form = lambda *args, **kwargs: form
return super(RegistrationCompletionView, self).get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if self.users and self.email_is_unique:
# email is unique, users already exist, creating a new one is forbidden !
return utils.redirect(
request, request.resolver_match.view_name, args=self.args,
kwargs=self.kwargs)
if 'uid' in request.POST:
uid = request.POST['uid']
for user in self.users:
if str(user.id) == uid:
utils.simulate_authentication(
request, user,
method=self.authentication_method,
service_slug=self.service)
return utils.redirect(request, self.get_success_url())
return super(RegistrationCompletionView, self).post(request, *args, **kwargs)
def form_valid(self, form):
# remove verified fields from form, this allows an authentication
# method to provide verified data fields and to present it to the user,
# while preventing the user to modify them.
for av in models.AttributeValue.objects.with_owner(form.instance):
if av.verified and av.attribute.name in form.fields:
del form.fields[av.attribute.name]
if ('email' in self.request.POST
and ('email' not in self.token or self.request.POST['email'] != self.token['email'])
and not self.token.get('skip_email_check')):
# If an email is submitted it must be validated or be the same as in the token
data = form.cleaned_data
data['no_password'] = self.token.get('no_password', False)
utils.send_registration_mail(
self.request,
ou=self.ou,
next_url=self.get_success_url(),
**data)
self.request.session['registered_email'] = form.cleaned_data['email']
return utils.redirect(self.request, 'registration_complete')
super(RegistrationCompletionView, self).form_valid(form)
return self.registration_success(self.request, form.instance, form)
def registration_success(self, request, user, form):
hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
authentication_method=self.authentication_method,
token=request.token, service=self.service)
utils.simulate_authentication(
request, user,
method=self.authentication_method,
service_slug=self.service)
message_template = loader.get_template('authentic2/registration_success_message.html')
messages.info(self.request, message_template.render(request=request))
self.send_registration_success_email(user)
return utils.redirect(request, self.get_success_url())
def send_registration_success_email(self, user):
if not user.email:
return
template_names = [
'authentic2/registration_success'
]
login_url = self.request.build_absolute_uri(settings.LOGIN_URL)
utils.send_templated_mail(user, template_names=template_names,
context={
'user': user,
'email': user.email,
'site': self.request.get_host(),
'login_url': login_url,
},
request=self.request)
registration_completion = valid_token(RegistrationCompletionView.as_view())
class DeleteView(TemplateView):
template_name = 'authentic2/accounts_delete_request.html'
title = _('Request account deletion')
def dispatch(self, request, *args, **kwargs):
if not app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT:
return utils.redirect(request, '..')
return super(DeleteView, self).dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return utils.redirect(request, 'account_management')
utils.send_account_deletion_code(self.request, self.request.user)
messages.info(request,
_("An account deletion validation email has been sent to your email address."))
return utils.redirect(request, 'account_management')
class ValidateDeletionView(TemplateView):
template_name = 'authentic2/accounts_delete_validation.html'
title = _('Confirm account deletion')
user = None
def dispatch(self, request, *args, **kwargs):
try:
deletion_token = signing.loads(kwargs['deletion_token'],
max_age=app_settings.A2_DELETION_REQUEST_LIFETIME)
user_pk = deletion_token['user_pk']
self.user = get_user_model().objects.get(pk=user_pk)
# A user account wont be deactived twice
if not self.user.is_active:
raise ValidationError(
_('This account had previously been deactivated and will be deleted soon.'))
logger.info('user %s confirmed the deletion of their own account', self.user)
except signing.SignatureExpired:
error = _('The account deletion request is too old, try again')
except signing.BadSignature:
error = _('The account deletion request is invalid, try again')
except ValueError:
error = _('The account deletion request was not on this site, try again')
except ValidationError as e:
error = e.message
except get_user_model().DoesNotExist:
error = _('This account has previously been deleted.')
else:
return super(ValidateDeletionView, self).dispatch(request, *args, **kwargs)
messages.error(request, error)
return utils.redirect(request, 'auth_homepage')
def post(self, request, *args, **kwargs):
if 'cancel' not in request.POST:
utils.send_account_deletion_mail(self.request, self.user)
models.DeletedUser.objects.delete_user(self.user)
self.user.email += '#%d' % random.randint(1, 10000000)
self.user.email_verified = False
self.user.save(update_fields=['email', 'email_verified'])
logger.info(u'deletion of account %s performed', self.user)
hooks.call_hooks('event', name='delete-account', user=self.user)
if self.user == request.user:
# No validation message displayed, as the user will surely
# notice their own account deletion...
return utils.redirect(request, 'auth_logout')
# No real use for cancel_url or next_url here, assuming the link
# has been received by email. We instead redirect the user to the
# homepage.
messages.info(request, _('Deletion performed.'))
return utils.redirect(request, 'auth_homepage')
def get_context_data(self, **kwargs):
ctx = super(ValidateDeletionView, self).get_context_data(**kwargs)
ctx['user'] = self.user # Not necessarily the user in request
return ctx
class RegistrationCompleteView(TemplateView):
template_name = 'registration/registration_complete.html'
def get_context_data(self, **kwargs):
kwargs['next_url'] = utils.select_next_url(self.request, settings.LOGIN_REDIRECT_URL)
kwargs['from_email'] = settings.DEFAULT_FROM_EMAIL
kwargs['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
return super(RegistrationCompleteView, self).get_context_data(
account_activation_days=settings.ACCOUNT_ACTIVATION_DAYS,
**kwargs)
registration_complete = RegistrationCompleteView.as_view()
@sensitive_post_parameters()
@login_required
@decorators.setting_enabled('A2_REGISTRATION_CAN_CHANGE_PASSWORD')
def password_change(request, *args, **kwargs):
kwargs['password_change_form'] = passwords_forms.PasswordChangeForm
post_change_redirect = kwargs.pop('post_change_redirect', None)
if 'next_url' in request.POST and request.POST['next_url']:
post_change_redirect = request.POST['next_url']
elif REDIRECT_FIELD_NAME in request.GET:
post_change_redirect = request.GET[REDIRECT_FIELD_NAME]
elif post_change_redirect is None:
post_change_redirect = reverse('account_management')
if not utils.user_can_change_password(request=request):
messages.warning(request, _('Password change is forbidden'))
return utils.redirect(request, post_change_redirect)
if 'cancel' in request.POST:
return utils.redirect(request, post_change_redirect)
kwargs['post_change_redirect'] = post_change_redirect
extra_context = kwargs.setdefault('extra_context', {})
extra_context['view'] = password_change
extra_context[REDIRECT_FIELD_NAME] = post_change_redirect
if not request.user.has_usable_password():
kwargs['password_change_form'] = passwords_forms.SetPasswordForm
response = dj_password_change(request, *args, **kwargs)
if isinstance(response, HttpResponseRedirect):
hooks.call_hooks('event', name='change-password', user=request.user, request=request)
messages.info(request, _('Password changed'))
return response
password_change.title = _('Password Change')
password_change.do_not_call_in_templates = True
def notimplemented_view(request):
raise NotImplementedError
class SuView(View):
def get(self, request, token):
user = utils.get_su_user(token)
if not user:
raise Http404
return utils.simulate_authentication(request, user, 'su')
su = SuView.as_view()