1435 lines
58 KiB
Python
1435 lines
58 KiB
Python
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import logging
|
|
import re
|
|
from email.utils import parseaddr
|
|
|
|
from django import shortcuts
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
|
|
from django.contrib.auth import logout as auth_logout
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.auth.views import PasswordChangeView as DjPasswordChangeView
|
|
from django.core import signing
|
|
from django.core.exceptions import ValidationError
|
|
from django.db.models.fields import FieldDoesNotExist
|
|
from django.db.models.query import Q
|
|
from django.forms import CharField
|
|
from django.http import (
|
|
Http404,
|
|
HttpResponse,
|
|
HttpResponseBadRequest,
|
|
HttpResponseForbidden,
|
|
HttpResponseRedirect,
|
|
)
|
|
from django.shortcuts import get_object_or_404, render
|
|
from django.template import loader
|
|
from django.template.loader import render_to_string
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.translation import ugettext as _
|
|
from django.views.decorators.cache import never_cache
|
|
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
|
|
from django.views.generic import TemplateView
|
|
from django.views.generic.base import View
|
|
from django.views.generic.edit import CreateView, FormView, UpdateView
|
|
from ratelimit.utils import is_ratelimited
|
|
|
|
from authentic2.custom_user.models import iter_attributes
|
|
|
|
from . import app_settings, attribute_kinds, cbv, constants, decorators, hooks, models, utils, validators
|
|
from .a2_rbac.models import OrganizationalUnit as OU
|
|
from .a2_rbac.utils import get_default_ou
|
|
from .forms import passwords as passwords_forms
|
|
from .forms import profile as profile_forms
|
|
from .forms import registration as registration_forms
|
|
from .utils import switch_user
|
|
from .utils.evaluate import HTTPHeaders
|
|
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref
|
|
|
|
User = get_user_model()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView):
|
|
model = User
|
|
template_names = ['profiles/edit_profile.html', 'authentic2/accounts_edit.html']
|
|
title = _('Edit account data')
|
|
|
|
def get_template_names(self):
|
|
template_names = []
|
|
if 'scope' in self.kwargs:
|
|
template_names.append('authentic2/accounts_edit_%s.html' % self.kwargs['scope'])
|
|
template_names.extend(self.template_names)
|
|
return template_names
|
|
|
|
@classmethod
|
|
def can_edit_profile(cls):
|
|
fields, labels = cls.get_fields()
|
|
return bool(fields) and app_settings.A2_PROFILE_CAN_EDIT_PROFILE
|
|
|
|
@classmethod
|
|
def get_fields(cls, scopes=None):
|
|
editable_profile_fields = []
|
|
for field in app_settings.A2_PROFILE_FIELDS:
|
|
if isinstance(field, (list, tuple)):
|
|
field_name = field[0]
|
|
else:
|
|
field_name = field
|
|
try:
|
|
attribute = models.Attribute.objects.get(name=field_name)
|
|
except models.Attribute.DoesNotExist:
|
|
editable_profile_fields.append(field)
|
|
else:
|
|
if attribute.user_editable:
|
|
editable_profile_fields.append(field)
|
|
attributes = models.Attribute.objects.filter(user_editable=True)
|
|
if scopes:
|
|
scopes = set(scopes)
|
|
default_fields = [
|
|
attribute.name for attribute in attributes if scopes & set(attribute.scopes.split())
|
|
]
|
|
else:
|
|
default_fields = list(attributes.values_list('name', flat=True))
|
|
fields, labels = utils.get_fields_and_labels(editable_profile_fields, default_fields)
|
|
if scopes:
|
|
# restrict fields to those in the scopes
|
|
fields = [field for field in fields if field in default_fields]
|
|
return fields, labels
|
|
|
|
def get_form_class(self):
|
|
if 'scope' in self.kwargs:
|
|
scopes = [self.kwargs['scope']]
|
|
else:
|
|
scopes = self.request.GET.get('scope', '').split()
|
|
fields, labels = self.get_fields(scopes=scopes)
|
|
# Email must be edited through the change email view, as it needs validation
|
|
fields = [field for field in fields if field != 'email']
|
|
return profile_forms.modelform_factory(
|
|
User, fields=fields, labels=labels, form=profile_forms.EditProfileForm
|
|
)
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
def get_form_kwargs(self, **kwargs):
|
|
kwargs = super(EditProfile, self).get_form_kwargs(**kwargs)
|
|
kwargs['next_url'] = utils.select_next_url(self.request, reverse('account_management'))
|
|
return kwargs
|
|
|
|
def get_success_url(self):
|
|
return utils.select_next_url(
|
|
self.request, default=reverse('account_management'), field_name='next_url', include_post=True
|
|
)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return utils.redirect(request, self.get_success_url())
|
|
return super(EditProfile, self).post(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
response = super(EditProfile, self).form_valid(form)
|
|
hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form)
|
|
self.request.journal.record('user.profile.edit', form=form)
|
|
return response
|
|
|
|
|
|
edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')(
|
|
login_required(EditProfile.as_view())
|
|
)
|
|
|
|
|
|
class EmailChangeView(cbv.TemplateNamesMixin, FormView):
|
|
template_names = ['profiles/email_change.html', 'authentic2/change_email.html']
|
|
title = _('Email Change')
|
|
success_url = '..'
|
|
|
|
def get_form_class(self):
|
|
if self.request.user.has_usable_password():
|
|
return profile_forms.EmailChangeForm
|
|
return profile_forms.EmailChangeFormNoPassword
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super(EmailChangeView, self).get_form_kwargs()
|
|
kwargs['user'] = self.request.user
|
|
return kwargs
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return utils.redirect(request, 'account_management')
|
|
return super(EmailChangeView, self).post(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
email = form.cleaned_data['email']
|
|
utils.send_email_change_email(self.request.user, email, request=self.request)
|
|
hooks.call_hooks('event', name='change-email', user=self.request.user, email=email)
|
|
messages.info(
|
|
self.request,
|
|
_(
|
|
'Your request for changing your email '
|
|
'is received. An email of validation '
|
|
'was sent to you. Please click on the '
|
|
'link contained inside.'
|
|
),
|
|
)
|
|
logger.info('email change request')
|
|
return super(EmailChangeView, self).form_valid(form)
|
|
|
|
|
|
email_change = decorators.setting_enabled('A2_PROFILE_CAN_CHANGE_EMAIL')(
|
|
login_required(EmailChangeView.as_view())
|
|
)
|
|
|
|
|
|
class EmailChangeVerifyView(TemplateView):
|
|
def get(self, request, *args, **kwargs):
|
|
if 'token' in request.GET:
|
|
try:
|
|
token = signing.loads(
|
|
request.GET['token'], max_age=app_settings.A2_EMAIL_CHANGE_TOKEN_LIFETIME
|
|
)
|
|
user_pk = token['user_pk']
|
|
email = token['email']
|
|
user = User.objects.get(pk=user_pk)
|
|
non_unique = False
|
|
if app_settings.A2_EMAIL_IS_UNIQUE:
|
|
non_unique = User.objects.filter(email=email).exclude(pk=user_pk).exists()
|
|
elif user.ou and user.ou.email_is_unique:
|
|
non_unique = User.objects.filter(email=email, ou=user.ou).exclude(pk=user_pk).exists()
|
|
if non_unique:
|
|
raise ValidationError(_('This email is already used by another account.'))
|
|
old_email = user.email
|
|
user.email = email
|
|
user.email_verified = True
|
|
user.save()
|
|
messages.info(
|
|
request, _('your request for changing your email for {0} is successful').format(email)
|
|
)
|
|
logger.info('user %s changed its email from %s to %s', user, old_email, email)
|
|
hooks.call_hooks('event', name='change-email-confirm', user=user, email=email)
|
|
except signing.SignatureExpired:
|
|
messages.error(request, _('your request for changing your email is too old, try again'))
|
|
except signing.BadSignature:
|
|
messages.error(request, _('your request for changing your email is invalid, try again'))
|
|
except ValueError:
|
|
messages.error(
|
|
request, _('your request for changing your email was not on this site, try again')
|
|
)
|
|
except User.DoesNotExist:
|
|
messages.error(
|
|
request, _('your request for changing your email is for an unknown user, try again')
|
|
)
|
|
except ValidationError as e:
|
|
messages.error(request, e.message)
|
|
else:
|
|
return shortcuts.redirect('account_management')
|
|
return shortcuts.redirect('email-change')
|
|
|
|
|
|
email_change_verify = EmailChangeVerifyView.as_view()
|
|
|
|
|
|
@csrf_exempt
|
|
@ensure_csrf_cookie
|
|
@never_cache
|
|
def login(request, template_name='authentic2/login.html', redirect_field_name=REDIRECT_FIELD_NAME):
|
|
"""Displays the login form and handles the login action."""
|
|
|
|
# redirect user to homepage if already connected, if setting
|
|
# A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE is True
|
|
if request.user.is_authenticated and app_settings.A2_LOGIN_REDIRECT_AUTHENTICATED_USERS_TO_HOMEPAGE:
|
|
return utils.redirect(request, 'auth_homepage')
|
|
|
|
redirect_to = request.GET.get(redirect_field_name)
|
|
|
|
service = get_service_from_request(request)
|
|
|
|
if not redirect_to or ' ' in redirect_to:
|
|
redirect_to = settings.LOGIN_REDIRECT_URL
|
|
# Heavier security check -- redirects to http://example.com should
|
|
# not be allowed, but things like /view/?param=http://example.com
|
|
# should be allowed. This regex checks if there is a '//' *before* a
|
|
# question mark.
|
|
elif '//' in redirect_to and re.match(r'[^\?]*//', redirect_to):
|
|
redirect_to = settings.LOGIN_REDIRECT_URL
|
|
nonce = request.GET.get(constants.NONCE_FIELD_NAME)
|
|
|
|
authenticators = utils.get_backends('AUTH_FRONTENDS')
|
|
|
|
blocks = []
|
|
|
|
registration_url = utils.get_registration_url(request, service=service)
|
|
|
|
context = {
|
|
'cancel': app_settings.A2_LOGIN_DISPLAY_A_CANCEL_BUTTON and nonce is not None,
|
|
'can_reset_password': app_settings.A2_USER_CAN_RESET_PASSWORD is not False,
|
|
'registration_authorized': getattr(settings, 'REGISTRATION_OPEN', True),
|
|
'registration_url': registration_url,
|
|
}
|
|
|
|
# Cancel button
|
|
if request.method == "POST" and constants.CANCEL_FIELD_NAME in request.POST:
|
|
return utils.continue_to_next_url(request, params={'cancel': 1})
|
|
|
|
# Create blocks
|
|
for authenticator in authenticators:
|
|
# Legacy API
|
|
if not hasattr(authenticator, 'login'):
|
|
fid = authenticator.id
|
|
name = authenticator.name
|
|
form_class = authenticator.form()
|
|
submit_name = 'submit-%s' % fid
|
|
block = {'id': fid, 'name': name, 'authenticator': authenticator}
|
|
if request.method == 'POST' and submit_name in request.POST:
|
|
form = form_class(data=request.POST)
|
|
if form.is_valid():
|
|
return authenticator.post(request, form, nonce, redirect_to)
|
|
block['form'] = form
|
|
else:
|
|
block['form'] = form_class()
|
|
blocks.append(block)
|
|
else: # New frontends API
|
|
auth_blocks = []
|
|
parameters = {'request': request, 'context': context}
|
|
remote_addr = request.META.get('REMOTE_ADDR')
|
|
login_hint = set(request.session.get('login-hint', []))
|
|
show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint, headers=HTTPHeaders(request))
|
|
if service:
|
|
show_ctx['service_ou_slug'] = service.ou and service.ou.slug
|
|
show_ctx['service_slug'] = service.slug
|
|
show_ctx['service'] = service
|
|
# check if the authenticator has multiple instances
|
|
if hasattr(authenticator, 'instances'):
|
|
for instance_id, instance in authenticator.instances(**parameters):
|
|
parameters['instance'] = instance
|
|
parameters['instance_id'] = instance_id
|
|
if not authenticator.shown(instance_id=instance_id, ctx=show_ctx):
|
|
continue
|
|
block = utils.get_authenticator_method(authenticator, 'login', parameters)
|
|
# update block id in order to separate instances
|
|
block['id'] = '%s_%s' % (block['id'], instance_id)
|
|
auth_blocks.append(block)
|
|
else:
|
|
if authenticator.shown(ctx=show_ctx):
|
|
auth_blocks.append(utils.get_authenticator_method(authenticator, 'login', parameters))
|
|
# If a login frontend method returns an HttpResponse with a status code != 200
|
|
# this response is returned.
|
|
for block in auth_blocks:
|
|
if block:
|
|
if block['status_code'] != 200:
|
|
return block['response']
|
|
blocks.append(block)
|
|
|
|
# run the only available authenticator if able to autorun
|
|
if len(blocks) == 1:
|
|
block = blocks[0]
|
|
authenticator = block['authenticator']
|
|
if hasattr(authenticator, 'autorun'):
|
|
return authenticator.autorun(request, block['id'])
|
|
|
|
# Old frontends API
|
|
for block in blocks:
|
|
fid = block['id']
|
|
if 'form' not in block:
|
|
continue
|
|
authenticator = block['authenticator']
|
|
context.update(
|
|
{'submit_name': 'submit-%s' % fid, redirect_field_name: redirect_to, 'form': block['form']}
|
|
)
|
|
if hasattr(authenticator, 'get_context'):
|
|
context.update(authenticator.get_context())
|
|
sub_template_name = authenticator.template()
|
|
block['content'] = render_to_string(sub_template_name, context, request=request)
|
|
|
|
# legacy context variable
|
|
rendered_forms = [(block['name'], block['content']) for block in blocks]
|
|
context.update(
|
|
{
|
|
'methods': rendered_forms,
|
|
# new definition
|
|
'blocks': collections.OrderedDict((block['id'], block) for block in blocks),
|
|
redirect_field_name: redirect_to,
|
|
}
|
|
)
|
|
return render(request, template_name, context)
|
|
|
|
|
|
def service_list(request):
|
|
'''Compute the service list to show on user homepage'''
|
|
return utils.accumulate_from_backends(request, 'service_list')
|
|
|
|
|
|
class Homepage(cbv.TemplateNamesMixin, TemplateView):
|
|
template_names = ['idp/homepage.html', 'authentic2/homepage.html']
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if app_settings.A2_HOMEPAGE_URL:
|
|
return utils.redirect(request, app_settings.A2_HOMEPAGE_URL)
|
|
return login_required(super(Homepage, self).dispatch)(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(Homepage, self).get_context_data(**kwargs)
|
|
ctx['account_management'] = 'account_management'
|
|
ctx['authorized_services'] = service_list(self.request)
|
|
return ctx
|
|
|
|
|
|
homepage = Homepage.as_view()
|
|
|
|
|
|
class ProfileView(cbv.TemplateNamesMixin, TemplateView):
|
|
template_names = ['idp/account_management.html', 'authentic2/accounts.html']
|
|
title = _('Your account')
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if app_settings.A2_ACCOUNTS_URL:
|
|
return utils.redirect(request, app_settings.A2_ACCOUNTS_URL)
|
|
return super(ProfileView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(ProfileView, self).get_context_data(**kwargs)
|
|
frontends = utils.get_backends('AUTH_FRONTENDS')
|
|
|
|
request = self.request
|
|
|
|
if request.method == "POST":
|
|
for frontend in frontends:
|
|
if 'submit-%s' % frontend.id in request.POST:
|
|
form = frontend.form()(data=request.POST)
|
|
if form.is_valid():
|
|
return frontend.post(request, form, None, '/profile')
|
|
# User attributes management
|
|
profile = []
|
|
field_names = app_settings.A2_PROFILE_FIELDS
|
|
if not field_names:
|
|
field_names = list(app_settings.A2_REGISTRATION_FIELDS)
|
|
for field_name in getattr(request.user, 'USER_PROFILE', []):
|
|
if field_name not in field_names:
|
|
field_names.append(field_name)
|
|
qs = models.Attribute.objects.filter(Q(user_editable=True) | Q(user_visible=True))
|
|
qs = qs.values_list('name', flat=True)
|
|
for field_name in qs:
|
|
if field_name not in field_names:
|
|
field_names.append(field_name)
|
|
attributes = []
|
|
for field_name in field_names:
|
|
title = None
|
|
if isinstance(field_name, (list, tuple)):
|
|
if len(field_name) > 1:
|
|
title = field_name[1]
|
|
field_name = field_name[0]
|
|
|
|
try:
|
|
attribute = models.Attribute.objects.get(name=field_name)
|
|
except models.Attribute.DoesNotExist:
|
|
attribute = None
|
|
|
|
if attribute:
|
|
if not attribute.user_visible:
|
|
continue
|
|
html_value = attribute.get_kind().get('html_value', lambda a, b: b)
|
|
qs = models.AttributeValue.objects.with_owner(request.user)
|
|
qs = qs.filter(attribute=attribute)
|
|
qs = qs.select_related()
|
|
value = [at_value.to_python() for at_value in qs]
|
|
value = filter(None, value)
|
|
value = [html_value(attribute, at_value) for at_value in value]
|
|
if not title:
|
|
title = str(attribute)
|
|
else:
|
|
# fallback to model attributes
|
|
try:
|
|
field = request.user._meta.get_field(field_name)
|
|
except FieldDoesNotExist:
|
|
continue
|
|
if not title:
|
|
title = field.verbose_name
|
|
value = getattr(self.request.user, field_name, None)
|
|
attribute = models.Attribute(name=field_name, label=title)
|
|
|
|
raw_value = None
|
|
if value:
|
|
if callable(value):
|
|
value = value()
|
|
if not isinstance(value, (list, tuple)):
|
|
value = (value,)
|
|
raw_value = value
|
|
value = [str(v) for v in value]
|
|
if value or app_settings.A2_PROFILE_DISPLAY_EMPTY_FIELDS:
|
|
profile.append((title, value))
|
|
attributes.append({'attribute': attribute, 'values': raw_value})
|
|
|
|
# Credentials management
|
|
parameters = {'request': request, 'context': context}
|
|
profiles = [utils.get_authenticator_method(frontend, 'profile', parameters) for frontend in frontends]
|
|
# Old frontends data structure for templates
|
|
blocks = [block['content'] for block in profiles if block]
|
|
# New frontends data structure for templates
|
|
blocks_by_id = collections.OrderedDict((block['id'], block) for block in profiles if block)
|
|
|
|
idp_backends = utils.get_backends()
|
|
# Get actions for federation management
|
|
federation_management = []
|
|
if app_settings.A2_PROFILE_CAN_MANAGE_FEDERATION:
|
|
for idp_backend in idp_backends:
|
|
if hasattr(idp_backend, 'federation_management'):
|
|
federation_management.extend(idp_backend.federation_management(request))
|
|
context.update(
|
|
{
|
|
'frontends_block': blocks,
|
|
'frontends_block_by_id': blocks_by_id,
|
|
'profile': profile,
|
|
'attributes': attributes,
|
|
'allow_account_deletion': app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT,
|
|
'allow_profile_edit': EditProfile.can_edit_profile(),
|
|
'allow_email_change': app_settings.A2_PROFILE_CAN_CHANGE_EMAIL,
|
|
'allow_authorization_management': False,
|
|
# TODO: deprecated should be removed when publik-base-theme is updated
|
|
'allow_password_change': utils.user_can_change_password(request=request),
|
|
'federation_management': federation_management,
|
|
}
|
|
)
|
|
|
|
if (
|
|
'authentic2_idp_oidc' in settings.INSTALLED_APPS
|
|
and app_settings.A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS
|
|
):
|
|
from authentic2_idp_oidc.models import OIDCClient
|
|
|
|
context['allow_authorization_management'] = OIDCClient.objects.filter(
|
|
authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_SERVICE
|
|
).exists()
|
|
|
|
hooks.call_hooks('modify_context_data', self, context)
|
|
return context
|
|
|
|
|
|
profile = login_required(ProfileView.as_view())
|
|
|
|
|
|
def logout_list(request):
|
|
'''Return logout links from idp backends'''
|
|
return utils.accumulate_from_backends(request, 'logout_list')
|
|
|
|
|
|
def redirect_logout_list(request):
|
|
'''Return redirect logout links from idp backends'''
|
|
return utils.accumulate_from_backends(request, 'redirect_logout_list')
|
|
|
|
|
|
def logout(request, next_url=None, do_local=True, check_referer=True):
|
|
"""Logout first check if a logout request is authorized, i.e.
|
|
that logout was done using a POST with CSRF token or with a GET
|
|
from the same site.
|
|
|
|
Logout endpoints of IdP module must re-user the view by setting
|
|
check_referer and do_local to False.
|
|
"""
|
|
next_url = next_url or utils.select_next_url(request, settings.LOGIN_REDIRECT_URL)
|
|
|
|
ctx = {}
|
|
ctx['next_url'] = next_url
|
|
ctx['redir_timeout'] = 60
|
|
local_logout_done = False
|
|
|
|
if request.user.is_authenticated:
|
|
if check_referer and not utils.check_referer(request):
|
|
return render(request, 'authentic2/logout_confirm.html', ctx)
|
|
do_local = do_local and 'local' in request.GET
|
|
if not do_local:
|
|
fragments = logout_list(request)
|
|
if fragments:
|
|
# Full logout with iframes
|
|
next_url = utils.make_url(
|
|
'auth_logout', params={'local': 'ok'}, next_url=next_url, sign_next_url=True
|
|
)
|
|
ctx['next_url'] = next_url
|
|
ctx['logout_list'] = fragments
|
|
ctx['message'] = _('Logging out from all your services')
|
|
return render(request, 'authentic2/logout.html', ctx)
|
|
# Get redirection targets for full logout with redirections
|
|
# (needed before local logout)
|
|
targets = redirect_logout_list(request)
|
|
logger.debug('Accumulated redirections : {}'.format(targets))
|
|
# Local logout
|
|
request.journal.record('user.logout')
|
|
auth_logout(request)
|
|
logger.info('Logged out')
|
|
local_logout_done = True
|
|
# Last redirection will be the current next_url
|
|
targets.append(next_url)
|
|
# Put redirection targets in session (after local logout)
|
|
request.session['logout_redirections'] = targets
|
|
logger.debug('All planned redirections : {}'.format(targets))
|
|
# Full logout by redirections if any
|
|
targets = request.session.pop('logout_redirections', None)
|
|
if targets:
|
|
# Full logout with redirections
|
|
logger.debug('Redirections queue: {}'.format(targets))
|
|
next_url = targets.pop(0)
|
|
request.session['logout_redirections'] = targets
|
|
logger.debug('Next redirection : {}'.format(next_url))
|
|
response = shortcuts.redirect(next_url)
|
|
if local_logout_done:
|
|
response.set_cookie('a2_just_logged_out', 1, max_age=60)
|
|
return response
|
|
|
|
|
|
def login_password_profile(request, *args, **kwargs):
|
|
context = kwargs.pop('context', {})
|
|
can_change_password = utils.user_can_change_password(request=request)
|
|
has_usable_password = request.user.has_usable_password()
|
|
context.update(
|
|
{
|
|
'can_change_password': can_change_password,
|
|
'has_usable_password': has_usable_password,
|
|
}
|
|
)
|
|
return render_to_string(
|
|
['auth/login_password_profile.html', 'authentic2/login_password_profile.html'],
|
|
context,
|
|
request=request,
|
|
)
|
|
|
|
|
|
class LoggedInView(View):
|
|
'''JSONP web service to detect if an user is logged'''
|
|
|
|
http_method_names = [u'get']
|
|
|
|
def check_referrer(self):
|
|
'''Check if the given referer is authorized'''
|
|
referer = self.request.META.get('HTTP_REFERER', '')
|
|
for valid_referer in app_settings.VALID_REFERERS:
|
|
if referer.startswith(valid_referer):
|
|
return True
|
|
return False
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
if not self.check_referrer():
|
|
return HttpResponseForbidden()
|
|
callback = request.GET.get('callback')
|
|
content = u'{0}({1})'.format(callback, int(request.user.is_authenticated))
|
|
return HttpResponse(content, content_type='application/json')
|
|
|
|
|
|
logged_in = never_cache(LoggedInView.as_view())
|
|
|
|
|
|
def csrf_failure_view(request, reason=""):
|
|
messages.warning(request, _('The page is out of date, it was reloaded for you'))
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
|
|
class PasswordResetView(FormView):
|
|
'''Ask for an email and send a password reset link by mail'''
|
|
|
|
form_class = passwords_forms.PasswordResetForm
|
|
title = _('Password Reset')
|
|
|
|
def get_success_url(self):
|
|
return reverse('password_reset_instructions')
|
|
|
|
def get_template_names(self):
|
|
return [
|
|
'authentic2/password_reset_form.html',
|
|
'registration/password_reset_form.html',
|
|
]
|
|
|
|
def get_form_kwargs(self, **kwargs):
|
|
kwargs = super(PasswordResetView, self).get_form_kwargs(**kwargs)
|
|
initial = kwargs.setdefault('initial', {})
|
|
initial['next_url'] = utils.select_next_url(self.request, '')
|
|
return kwargs
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(PasswordResetView, self).get_context_data(**kwargs)
|
|
if app_settings.A2_USER_CAN_RESET_PASSWORD is False:
|
|
raise Http404('Password reset is not allowed.')
|
|
ctx['title'] = _('Password reset')
|
|
return ctx
|
|
|
|
def form_valid(self, form):
|
|
email = form.cleaned_data.get('email') or form.cleaned_data.get('email_or_username')
|
|
|
|
# if an email has already been sent, warn once before allowing resend
|
|
token = models.Token.objects.filter(
|
|
kind='pw-reset', content__email=email, expires__gt=timezone.now()
|
|
).exists()
|
|
resend_key = 'pw-reset-allow-resend'
|
|
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
|
|
self.request.session[resend_key] = True
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'An email has already been sent to %s. Click "Validate" again if '
|
|
'you really want it to be sent again.'
|
|
)
|
|
% email,
|
|
)
|
|
return self.form_invalid(form)
|
|
self.request.session[resend_key] = False
|
|
|
|
if is_ratelimited(
|
|
self.request,
|
|
key='post:email',
|
|
group='pw-reset-email',
|
|
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
|
|
increment=True,
|
|
):
|
|
self.request.journal.record('user.password.reset.failure', email=email)
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'Multiple emails have already been sent to this address. Further attempts are '
|
|
'blocked, please check your spam folder or try again later.'
|
|
),
|
|
)
|
|
return self.form_invalid(form)
|
|
if is_ratelimited(
|
|
self.request,
|
|
key='ip',
|
|
group='pw-reset-email',
|
|
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
|
|
increment=True,
|
|
):
|
|
self.request.journal.record('user.password.reset.failure', email=email)
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'Multiple password reset attempts have already been made from this IP address. No '
|
|
'further email will be sent, please check your spam folder or try again later.'
|
|
),
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
form.save()
|
|
self.request.session['reset_email'] = email
|
|
return super(PasswordResetView, self).form_valid(form)
|
|
|
|
|
|
password_reset = PasswordResetView.as_view()
|
|
|
|
|
|
class PasswordResetInstructionsView(TemplateView):
|
|
template_name = 'registration/password_reset_instructions.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(PasswordResetInstructionsView, self).get_context_data(**kwargs)
|
|
ctx['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
|
|
return ctx
|
|
|
|
|
|
password_reset_instructions = PasswordResetInstructionsView.as_view()
|
|
|
|
|
|
class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView):
|
|
"""Validate password reset link, show a set password form and login
|
|
the user.
|
|
"""
|
|
|
|
form_class = passwords_forms.SetPasswordForm
|
|
title = _('Password Reset')
|
|
|
|
def get_template_names(self):
|
|
return [
|
|
'registration/password_reset_confirm.html',
|
|
'authentic2/password_reset_confirm.html',
|
|
]
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
token = kwargs['token'].replace(' ', '')
|
|
try:
|
|
self.token = models.Token.use('pw-reset', token, delete=False)
|
|
except models.Token.DoesNotExist:
|
|
messages.warning(request, _('Password reset token is unknown or expired'))
|
|
return utils.redirect(request, self.get_success_url())
|
|
except (TypeError, ValueError):
|
|
messages.warning(request, _('Password reset token is invalid'))
|
|
return utils.redirect(request, self.get_success_url())
|
|
|
|
uid = self.token.content['user']
|
|
try:
|
|
# use authenticate to eventually get an LDAPUser
|
|
self.user = utils.authenticate(request, user=User._default_manager.get(pk=uid))
|
|
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
|
|
messages.warning(request, _('User not found'))
|
|
return utils.redirect(request, self.get_success_url())
|
|
|
|
can_reset_password = utils.get_user_flag(
|
|
user=self.user, name='can_reset_password', default=self.user.has_usable_password()
|
|
)
|
|
if not can_reset_password:
|
|
messages.warning(
|
|
request, _('It\'s not possible to reset your password. Please contact an administrator.')
|
|
)
|
|
return utils.redirect(request, self.get_success_url())
|
|
return super(PasswordResetConfirmView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(PasswordResetConfirmView, self).get_context_data(**kwargs)
|
|
# compatibility with existing templates !
|
|
ctx['title'] = _('Enter new password')
|
|
ctx['validlink'] = True
|
|
return ctx
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super(PasswordResetConfirmView, self).get_form_kwargs()
|
|
kwargs['user'] = self.user
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
# Changing password by mail validate the email
|
|
form.user.email_verified = True
|
|
form.save()
|
|
hooks.call_hooks('event', name='password-reset-confirm', user=form.user, token=self.token, form=form)
|
|
logger.info(u'password reset for user %s with token %r', self.user, self.token.uuid)
|
|
self.token.delete()
|
|
return self.finish()
|
|
|
|
def finish(self):
|
|
response = utils.simulate_authentication(self.request, self.user, 'email')
|
|
self.request.journal.record('user.password.reset')
|
|
return response
|
|
|
|
|
|
password_reset_confirm = PasswordResetConfirmView.as_view()
|
|
|
|
|
|
class BaseRegistrationView(FormView):
|
|
form_class = registration_forms.RegistrationForm
|
|
template_name = 'registration/registration_form.html'
|
|
title = _('Registration')
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if not getattr(settings, 'REGISTRATION_OPEN', True):
|
|
raise Http404('Registration is not open.')
|
|
|
|
self.token = {}
|
|
self.ou = get_default_ou()
|
|
# load pre-filled values
|
|
if request.GET.get('token'):
|
|
try:
|
|
self.token = signing.loads(
|
|
request.GET.get('token'), max_age=settings.ACCOUNT_ACTIVATION_DAYS * 3600 * 24
|
|
)
|
|
except (TypeError, ValueError, signing.BadSignature) as e:
|
|
logger.warning(u'registration_view: invalid token: %s', e)
|
|
return HttpResponseBadRequest('invalid token', content_type='text/plain')
|
|
if 'ou' in self.token:
|
|
self.ou = OU.objects.get(pk=self.token['ou'])
|
|
self.next_url = self.token.pop(REDIRECT_FIELD_NAME, utils.select_next_url(request, None))
|
|
return super(BaseRegistrationView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
if form.is_robot():
|
|
return utils.redirect(
|
|
self.request,
|
|
'registration_complete',
|
|
params={
|
|
REDIRECT_FIELD_NAME: self.next_url,
|
|
'robot': 'on',
|
|
},
|
|
)
|
|
email = form.cleaned_data.pop('email')
|
|
|
|
# if an email has already been sent, warn once before allowing resend
|
|
token = models.Token.objects.filter(
|
|
kind='registration', content__email=email, expires__gt=timezone.now()
|
|
).exists()
|
|
resend_key = 'registration-allow-resend'
|
|
if app_settings.A2_TOKEN_EXISTS_WARNING and token and not self.request.session.get(resend_key):
|
|
self.request.session[resend_key] = True
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'An email has already been sent to %s. Click "Validate" again if '
|
|
'you really want it to be sent again.'
|
|
)
|
|
% email,
|
|
)
|
|
return self.form_invalid(form)
|
|
self.request.session[resend_key] = False
|
|
|
|
if is_ratelimited(
|
|
self.request,
|
|
key='post:email',
|
|
group='registration-email',
|
|
rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT,
|
|
increment=True,
|
|
):
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'Multiple emails have already been sent to this address. Further attempts are '
|
|
'blocked, please check your spam folder or try again later.'
|
|
),
|
|
)
|
|
return self.form_invalid(form)
|
|
if is_ratelimited(
|
|
self.request,
|
|
key='ip',
|
|
group='registration-email',
|
|
rate=app_settings.A2_EMAILS_IP_RATELIMIT,
|
|
increment=True,
|
|
):
|
|
form.add_error(
|
|
'email',
|
|
_(
|
|
'Multiple registration attempts have already been made from this IP address. No '
|
|
'further email will be sent, please check your spam folder or try again later.'
|
|
),
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
for field in form.cleaned_data:
|
|
self.token[field] = form.cleaned_data[field]
|
|
|
|
# propagate service to the registration completion view
|
|
service = get_service_from_request(self.request)
|
|
if service:
|
|
set_service_ref(self.token, service)
|
|
|
|
self.token.pop(REDIRECT_FIELD_NAME, None)
|
|
self.token.pop('email', None)
|
|
|
|
utils.send_registration_mail(self.request, email, next_url=self.next_url, ou=self.ou, **self.token)
|
|
self.request.session['registered_email'] = email
|
|
return utils.redirect(
|
|
self.request, 'registration_complete', params={REDIRECT_FIELD_NAME: self.next_url}
|
|
)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(BaseRegistrationView, self).get_context_data(**kwargs)
|
|
parameters = {'request': self.request, 'context': context}
|
|
blocks = [
|
|
utils.get_authenticator_method(authenticator, 'registration', parameters)
|
|
for authenticator in utils.get_backends('AUTH_FRONTENDS')
|
|
]
|
|
context['frontends'] = collections.OrderedDict((block['id'], block) for block in blocks if block)
|
|
return context
|
|
|
|
|
|
class RegistrationView(cbv.ValidateCSRFMixin, BaseRegistrationView):
|
|
pass
|
|
|
|
|
|
class RegistrationCompletionView(CreateView):
|
|
model = get_user_model()
|
|
success_url = 'auth_homepage'
|
|
|
|
def get_template_names(self):
|
|
if self.users and 'create' not in self.request.GET:
|
|
return ['registration/registration_completion_choose.html']
|
|
else:
|
|
return ['registration/registration_completion_form.html']
|
|
|
|
def get_success_url(self):
|
|
try:
|
|
redirect_url, next_field = app_settings.A2_REGISTRATION_REDIRECT
|
|
except Exception:
|
|
redirect_url = app_settings.A2_REGISTRATION_REDIRECT
|
|
next_field = REDIRECT_FIELD_NAME
|
|
|
|
if self.token and self.token.get(REDIRECT_FIELD_NAME):
|
|
url = self.token[REDIRECT_FIELD_NAME]
|
|
if redirect_url:
|
|
url = utils.make_url(redirect_url, params={next_field: url})
|
|
else:
|
|
if redirect_url:
|
|
url = redirect_url
|
|
else:
|
|
url = utils.make_url(self.success_url)
|
|
return url
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
registration_token = kwargs['registration_token'].replace(' ', '')
|
|
try:
|
|
token = models.Token.use('registration', registration_token, delete=False)
|
|
except models.Token.DoesNotExist:
|
|
messages.warning(request, _('Your activation key is unknown or expired'))
|
|
return utils.redirect(request, 'registration_register')
|
|
except (TypeError, ValueError):
|
|
messages.warning(request, _('Activation failed'))
|
|
return utils.redirect(request, 'registration_register')
|
|
self.token_obj = token
|
|
self.token = token.content
|
|
|
|
# allow access to token content from external authentication backends
|
|
request.token = self.token
|
|
|
|
self.authentication_method = self.token.get('authentication_method', 'email')
|
|
self.email = self.token['email']
|
|
if 'ou' in self.token:
|
|
self.ou = OU.objects.get(pk=self.token['ou'])
|
|
else:
|
|
self.ou = get_default_ou()
|
|
self.users = User.objects.filter(email__iexact=self.email).order_by('date_joined')
|
|
if self.ou:
|
|
self.users = self.users.filter(ou=self.ou)
|
|
self.email_is_unique = app_settings.A2_EMAIL_IS_UNIQUE or app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE
|
|
if self.ou:
|
|
self.email_is_unique |= self.ou.email_is_unique
|
|
self.init_fields_labels_and_help_texts()
|
|
# if registration is done during an SSO add the service to the registration event
|
|
self.service = get_service_from_token(self.token)
|
|
return super(RegistrationCompletionView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def init_fields_labels_and_help_texts(self):
|
|
attributes = models.Attribute.objects.filter(asked_on_registration=True)
|
|
default_fields = attributes.values_list('name', flat=True)
|
|
required_fields = models.Attribute.objects.filter(required=True).values_list('name', flat=True)
|
|
fields, labels = utils.get_fields_and_labels(
|
|
app_settings.A2_REGISTRATION_FIELDS,
|
|
default_fields,
|
|
app_settings.A2_REGISTRATION_REQUIRED_FIELDS,
|
|
app_settings.A2_REQUIRED_FIELDS,
|
|
models.Attribute.objects.filter(required=True).values_list('name', flat=True),
|
|
)
|
|
help_texts = {}
|
|
if app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL:
|
|
labels['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_LABEL
|
|
if app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT:
|
|
help_texts['username'] = app_settings.A2_REGISTRATION_FORM_USERNAME_HELP_TEXT
|
|
required = list(app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_fields)
|
|
if 'email' in fields:
|
|
fields.remove('email')
|
|
for field in self.token.get('skip_fields') or []:
|
|
if field in fields:
|
|
fields.remove(field)
|
|
self.fields = fields
|
|
self.labels = labels
|
|
self.required = required
|
|
self.help_texts = help_texts
|
|
|
|
def get_form_class(self):
|
|
if not self.token.get('valid_email', True):
|
|
self.fields.append('email')
|
|
self.required.append('email')
|
|
form_class = registration_forms.RegistrationCompletionForm
|
|
if self.token.get('no_password', False):
|
|
form_class = registration_forms.RegistrationCompletionFormNoPassword
|
|
form_class = profile_forms.modelform_factory(
|
|
self.model,
|
|
form=form_class,
|
|
fields=self.fields,
|
|
labels=self.labels,
|
|
required=self.required,
|
|
help_texts=self.help_texts,
|
|
)
|
|
if 'username' in self.fields and app_settings.A2_REGISTRATION_FORM_USERNAME_REGEX:
|
|
# Keep existing field label and help_text
|
|
old_field = form_class.base_fields['username']
|
|
field = CharField(
|
|
max_length=256,
|
|
label=old_field.label,
|
|
help_text=old_field.help_text,
|
|
validators=[validators.UsernameValidator()],
|
|
)
|
|
form_class = type('RegistrationForm', (form_class,), {'username': field})
|
|
return form_class
|
|
|
|
def get_form_kwargs(self, **kwargs):
|
|
'''Initialize mail from token'''
|
|
kwargs = super(RegistrationCompletionView, self).get_form_kwargs(**kwargs)
|
|
if 'ou' in self.token:
|
|
ou = get_object_or_404(OU, id=self.token['ou'])
|
|
else:
|
|
ou = get_default_ou()
|
|
|
|
attributes = {'email': self.email, 'ou': ou}
|
|
for key in self.token:
|
|
if key in app_settings.A2_PRE_REGISTRATION_FIELDS:
|
|
attributes[key] = self.token[key]
|
|
logger.debug(u'attributes %s', attributes)
|
|
|
|
prefilling_list = utils.accumulate_from_backends(self.request, 'registration_form_prefill')
|
|
logger.debug(u'prefilling_list %s', prefilling_list)
|
|
# Build a single meaningful prefilling with sets of values
|
|
prefilling = {}
|
|
for p in prefilling_list:
|
|
for name, values in p.items():
|
|
if name in self.fields:
|
|
prefilling.setdefault(name, set()).update(values)
|
|
logger.debug(u'prefilling %s', prefilling)
|
|
|
|
for name, values in prefilling.items():
|
|
attributes[name] = ' '.join(values)
|
|
logger.debug(u'attributes with prefilling %s', attributes)
|
|
|
|
if self.token.get('user_id'):
|
|
kwargs['instance'] = User.objects.get(id=self.token.get('user_id'))
|
|
else:
|
|
init_kwargs = {}
|
|
for key in ('email', 'first_name', 'last_name', 'ou'):
|
|
if key in attributes:
|
|
init_kwargs[key] = attributes[key]
|
|
kwargs['instance'] = get_user_model()(**init_kwargs)
|
|
|
|
return kwargs
|
|
|
|
def get_form(self, form_class=None):
|
|
form = super(RegistrationCompletionView, self).get_form(form_class=form_class)
|
|
hooks.call_hooks('front_modify_form', self, form)
|
|
return form
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(RegistrationCompletionView, self).get_context_data(**kwargs)
|
|
ctx['token'] = self.token
|
|
ctx['users'] = self.users
|
|
ctx['email'] = self.email
|
|
ctx['email_is_unique'] = self.email_is_unique
|
|
ctx['create'] = 'create' in self.request.GET
|
|
return ctx
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
if len(self.users) == 1 and self.email_is_unique:
|
|
# Found one user, EMAIL is unique, log her in
|
|
utils.simulate_authentication(
|
|
request, self.users[0], method=self.authentication_method, service=self.service
|
|
)
|
|
return utils.redirect(request, self.get_success_url())
|
|
confirm_data = self.token.get('confirm_data', False)
|
|
|
|
if confirm_data == 'required':
|
|
fields_to_confirm = self.required
|
|
else:
|
|
fields_to_confirm = self.fields
|
|
if all(field in self.token for field in fields_to_confirm) and (
|
|
not confirm_data or confirm_data == 'required'
|
|
):
|
|
# We already have every fields
|
|
form_kwargs = self.get_form_kwargs()
|
|
form_class = self.get_form_class()
|
|
data = self.token
|
|
if 'password' in data:
|
|
data['password1'] = data['password']
|
|
data['password2'] = data['password']
|
|
del data['password']
|
|
form_kwargs['data'] = data
|
|
form = form_class(**form_kwargs)
|
|
if form.is_valid():
|
|
user = form.save()
|
|
return self.registration_success(request, user, form)
|
|
self.get_form = lambda *args, **kwargs: form
|
|
return super(RegistrationCompletionView, self).get(request, *args, **kwargs)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if self.users and self.email_is_unique:
|
|
# email is unique, users already exist, creating a new one is forbidden !
|
|
return utils.redirect(
|
|
request, request.resolver_match.view_name, args=self.args, kwargs=self.kwargs
|
|
)
|
|
if 'uid' in request.POST:
|
|
uid = request.POST['uid']
|
|
for user in self.users:
|
|
if str(user.id) == uid:
|
|
utils.simulate_authentication(
|
|
request, user, method=self.authentication_method, service=self.service
|
|
)
|
|
return utils.redirect(request, self.get_success_url())
|
|
return super(RegistrationCompletionView, self).post(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
|
|
# remove verified fields from form, this allows an authentication
|
|
# method to provide verified data fields and to present it to the user,
|
|
# while preventing the user to modify them.
|
|
for av in models.AttributeValue.objects.with_owner(form.instance):
|
|
if av.verified and av.attribute.name in form.fields:
|
|
del form.fields[av.attribute.name]
|
|
|
|
if (
|
|
'email' in self.request.POST
|
|
and ('email' not in self.token or self.request.POST['email'] != self.token['email'])
|
|
and not self.token.get('skip_email_check')
|
|
):
|
|
# If an email is submitted it must be validated or be the same as in the token
|
|
data = form.cleaned_data.copy()
|
|
# handle complex attributes
|
|
for attribute in iter_attributes():
|
|
if attribute.name not in data:
|
|
continue
|
|
kind = attribute.get_kind()
|
|
if kind['serialize'] == attribute_kinds.identity:
|
|
continue
|
|
data[attribute.name] = kind['serialize'](data[attribute.name])
|
|
|
|
data['no_password'] = self.token.get('no_password', False)
|
|
utils.send_registration_mail(self.request, ou=self.ou, next_url=self.get_success_url(), **data)
|
|
self.token_obj.delete()
|
|
self.request.session['registered_email'] = form.cleaned_data['email']
|
|
return utils.redirect(self.request, 'registration_complete')
|
|
super(RegistrationCompletionView, self).form_valid(form)
|
|
return self.registration_success(self.request, form.instance, form)
|
|
|
|
def registration_success(self, request, user, form):
|
|
request.journal.record('user.registration', user=user, session=None, how=self.authentication_method)
|
|
hooks.call_hooks(
|
|
'event',
|
|
name='registration',
|
|
user=user,
|
|
form=form,
|
|
view=self,
|
|
authentication_method=self.authentication_method,
|
|
token=self.token,
|
|
service=self.service and self.service.slug,
|
|
)
|
|
self.token_obj.delete()
|
|
utils.simulate_authentication(request, user, method=self.authentication_method, service=self.service)
|
|
message_template = loader.get_template('authentic2/registration_success_message.html')
|
|
messages.info(self.request, message_template.render(request=request))
|
|
self.send_registration_success_email(user)
|
|
return utils.redirect(request, self.get_success_url())
|
|
|
|
def send_registration_success_email(self, user):
|
|
if not user.email:
|
|
return
|
|
|
|
template_names = ['authentic2/registration_success']
|
|
login_url = self.request.build_absolute_uri(settings.LOGIN_URL)
|
|
utils.send_templated_mail(
|
|
user,
|
|
template_names=template_names,
|
|
context={
|
|
'user': user,
|
|
'email': user.email,
|
|
'site': self.request.get_host(),
|
|
'login_url': login_url,
|
|
},
|
|
request=self.request,
|
|
)
|
|
|
|
|
|
registration_completion = RegistrationCompletionView.as_view()
|
|
|
|
|
|
class DeleteView(TemplateView):
|
|
template_name = 'authentic2/accounts_delete_request.html'
|
|
title = _('Request account deletion')
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if not app_settings.A2_REGISTRATION_CAN_DELETE_ACCOUNT:
|
|
return utils.redirect(request, '..')
|
|
return super(DeleteView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return utils.redirect(request, 'account_management')
|
|
utils.send_account_deletion_code(self.request, self.request.user)
|
|
messages.info(request, _("An account deletion validation email has been sent to your email address."))
|
|
return utils.redirect(request, 'account_management')
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(DeleteView, self).get_context_data(**kwargs)
|
|
ctx['email'] = self.request.user.email
|
|
return ctx
|
|
|
|
|
|
class ValidateDeletionView(TemplateView):
|
|
template_name = 'authentic2/accounts_delete_validation.html'
|
|
title = _('Confirm account deletion')
|
|
user = None
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
deletion_token = signing.loads(
|
|
kwargs['deletion_token'], max_age=app_settings.A2_DELETION_REQUEST_LIFETIME
|
|
)
|
|
user_pk = deletion_token['user_pk']
|
|
self.user = get_user_model().objects.get(pk=user_pk)
|
|
# A user account wont be deactived twice
|
|
if not self.user.is_active:
|
|
raise ValidationError(_('This account is inactive, it cannot be deleted.'))
|
|
logger.info('user %s confirmed the deletion of their own account', self.user)
|
|
except signing.SignatureExpired:
|
|
error = _('The account deletion request is too old, try again')
|
|
except signing.BadSignature:
|
|
error = _('The account deletion request is invalid, try again')
|
|
except ValueError:
|
|
error = _('The account deletion request was not on this site, try again')
|
|
except ValidationError as e:
|
|
error = e.message
|
|
except get_user_model().DoesNotExist:
|
|
error = _('This account has previously been deleted.')
|
|
else:
|
|
return super(ValidateDeletionView, self).dispatch(request, *args, **kwargs)
|
|
messages.error(request, error)
|
|
return utils.redirect(request, 'auth_homepage')
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' not in request.POST:
|
|
utils.send_account_deletion_mail(self.request, self.user)
|
|
logger.info(u'deletion of account %s performed', self.user)
|
|
hooks.call_hooks('event', name='delete-account', user=self.user)
|
|
request.journal.record('user.deletion', user=self.user)
|
|
is_deleted_user_logged = self.user == request.user
|
|
self.user.delete()
|
|
messages.info(request, _('Deletion performed.'))
|
|
# No real use for cancel_url or next_url here, assuming the link
|
|
# has been received by email. We instead redirect the user to the
|
|
# homepage.
|
|
if is_deleted_user_logged:
|
|
return logout(request, check_referer=False)
|
|
return utils.redirect(request, 'auth_homepage')
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(ValidateDeletionView, self).get_context_data(**kwargs)
|
|
ctx['user'] = self.user # Not necessarily the user in request
|
|
return ctx
|
|
|
|
|
|
class RegistrationCompleteView(TemplateView):
|
|
template_name = 'registration/registration_complete.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['next_url'] = utils.select_next_url(self.request, settings.LOGIN_REDIRECT_URL)
|
|
kwargs['from_email'] = settings.DEFAULT_FROM_EMAIL
|
|
kwargs['from_email_address'] = parseaddr(settings.DEFAULT_FROM_EMAIL)[1]
|
|
return super(RegistrationCompleteView, self).get_context_data(
|
|
account_activation_days=settings.ACCOUNT_ACTIVATION_DAYS, **kwargs
|
|
)
|
|
|
|
|
|
registration_complete = RegistrationCompleteView.as_view()
|
|
|
|
|
|
class PasswordChangeView(DjPasswordChangeView):
|
|
title = _('Password Change')
|
|
do_not_call_in_templates = True
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if 'next_url' in request.POST and request.POST['next_url']:
|
|
self.post_change_redirect = request.POST['next_url']
|
|
elif REDIRECT_FIELD_NAME in request.GET:
|
|
self.post_change_redirect = request.GET[REDIRECT_FIELD_NAME]
|
|
else:
|
|
self.post_change_redirect = reverse('account_management')
|
|
if not utils.user_can_change_password(request=request):
|
|
messages.warning(request, _('Password change is forbidden'))
|
|
return utils.redirect(request, self.post_change_redirect)
|
|
return super(PasswordChangeView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return utils.redirect(request, self.post_change_redirect)
|
|
return super(PasswordChangeView, self).post(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request)
|
|
messages.info(self.request, _('Password changed'))
|
|
models.PasswordReset.objects.filter(user=self.request.user).delete()
|
|
response = super(PasswordChangeView, self).form_valid(form)
|
|
self.request.journal.record('user.password.change', session=self.request.session)
|
|
return response
|
|
|
|
def get_form_class(self):
|
|
if self.request.user.has_usable_password():
|
|
return passwords_forms.PasswordChangeForm
|
|
else:
|
|
return passwords_forms.SetPasswordForm
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(PasswordChangeView, self).get_context_data(**kwargs)
|
|
ctx[REDIRECT_FIELD_NAME] = self.post_change_redirect
|
|
return ctx
|
|
|
|
|
|
password_change = decorators.setting_enabled('A2_REGISTRATION_CAN_CHANGE_PASSWORD')(
|
|
PasswordChangeView.as_view()
|
|
)
|
|
|
|
|
|
class SuView(View):
|
|
def get(self, request, uuid):
|
|
user = switch_user.resolve_token(uuid)
|
|
if not user:
|
|
raise Http404
|
|
# LDAP ad-hoc behaviour
|
|
if user.userexternalid_set.exists():
|
|
user = utils.authenticate(request, user=user)
|
|
return utils.simulate_authentication(request, user, 'su')
|
|
|
|
|
|
su = SuView.as_view()
|
|
|
|
|
|
class AuthorizedOauthServicesView(TemplateView):
|
|
template_name = 'authentic2/accounts_authorized_oauth_services.html'
|
|
title = _('Consent Management')
|
|
|
|
def get_context_data(self, **kwargs):
|
|
from authentic2_idp_oidc.models import OIDCAuthorization
|
|
|
|
context = super(AuthorizedOauthServicesView, self).get_context_data(**kwargs)
|
|
context['authorized_oauth_services'] = OIDCAuthorization.objects.filter(user=self.request.user)
|
|
return context
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
from authentic2_idp_oidc.models import OIDCAuthorization
|
|
|
|
qs = OIDCAuthorization.objects.filter(user=request.user)
|
|
auth_id = request.POST.get('auth_id')
|
|
if auth_id:
|
|
qs = qs.filter(id=auth_id)
|
|
qs.delete()
|
|
return HttpResponseRedirect(reverse('authorized-oauth-services'))
|
|
|
|
|
|
authorized_oauth_services = decorators.setting_enabled('A2_PROFILE_CAN_MANAGE_SERVICE_AUTHORIZATIONS')(
|
|
AuthorizedOauthServicesView.as_view()
|
|
)
|
|
|
|
|
|
def old_view_redirect(request, to, message=None):
|
|
'''Redirect old URL to new URL, eventually showing a message.'''
|
|
if message:
|
|
messages.info(request, message)
|
|
return utils.redirect(request, to=to)
|
|
|
|
|
|
class DisplayMessageAndContinueView(TemplateView):
|
|
template_name = 'authentic2/display_message_and_continue.html'
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
self.url = utils.select_next_url(self.request, reverse('account_management'))
|
|
self.only_info = True
|
|
|
|
storage = messages.get_messages(request)
|
|
if not len(storage):
|
|
return utils.redirect(request, self.url, resolve=False)
|
|
|
|
for message in storage:
|
|
if message.level != messages.INFO:
|
|
# If there are warning or error messages, the intermediate page must not redirect
|
|
# automatically but should ask for an user confirmation
|
|
self.only_info = False
|
|
storage.used = False
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super().get_context_data(**kwargs)
|
|
ctx['url'] = self.url
|
|
ctx['only_info'] = self.only_info
|
|
return ctx
|
|
|
|
|
|
display_message_and_continue = DisplayMessageAndContinueView.as_view()
|