598 lines
26 KiB
Python
598 lines
26 KiB
Python
# authentic2-auth-fc - authentic2 authentication for FranceConnect
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import uuid
|
|
import logging
|
|
import json
|
|
import urlparse
|
|
import requests
|
|
|
|
from requests_oauthlib import OAuth2Session
|
|
|
|
|
|
import django
|
|
from django.views.generic import View, FormView
|
|
from django.http import HttpResponseRedirect, Http404
|
|
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
|
|
from django.contrib import messages
|
|
from django.shortcuts import resolve_url, render
|
|
from django.utils.translation import ugettext as _
|
|
from django.utils.http import is_safe_url, urlencode
|
|
from django.conf import settings
|
|
from django.core import signing
|
|
from django.core.cache import InvalidCacheBackendError, caches
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.core.urlresolvers import reverse
|
|
from django.forms import Form
|
|
try:
|
|
from django.contrib.auth.views import update_session_auth_hash
|
|
except ImportError:
|
|
update_session_auth_hash = None
|
|
|
|
from authentic2 import app_settings as a2_app_settings
|
|
from authentic2 import utils as a2_utils, hooks, constants
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
from authentic2.forms.passwords import SetPasswordForm
|
|
|
|
from . import app_settings, models, utils
|
|
|
|
|
|
class LoggerMixin(object):
|
|
def __init__(self, *args, **kwargs):
|
|
self.logger = logging.getLogger(__name__)
|
|
super(LoggerMixin, *args, **kwargs)
|
|
|
|
try:
|
|
cache = caches['fc']
|
|
except InvalidCacheBackendError:
|
|
cache = caches['default']
|
|
|
|
|
|
CACHE_TIMEOUT = 60
|
|
|
|
|
|
def ask_authorization(request, scopes, logger):
|
|
'''Compute an authorize URL for obtaining the given scope'''
|
|
if not isinstance(scopes, (list, tuple)):
|
|
scopes = [scopes]
|
|
redirect_uri = request.build_absolute_uri()
|
|
state = str(uuid.uuid4())
|
|
states = request.session.setdefault('fc_states', {})
|
|
states[state] = {
|
|
'redirect_uri': redirect_uri,
|
|
}
|
|
request.session.modified = True
|
|
params = {
|
|
'client_id': app_settings.client_id,
|
|
'scope': ' '.join(scopes),
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'state': state,
|
|
'nonce': state,
|
|
'acr_values': 'eidas1',
|
|
}
|
|
logger.debug('query string %s', params)
|
|
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
|
|
logger.debug('redirect to %s', url)
|
|
response = HttpResponseRedirect(url)
|
|
response.display_message = False
|
|
return response
|
|
|
|
|
|
def resolve_access_token(authorization_code, redirect_uri, logger):
|
|
'''Exchange an authorization_code for an access_token'''
|
|
data = {
|
|
'code': authorization_code,
|
|
'client_id': app_settings.client_id,
|
|
'client_secret': app_settings.client_secret,
|
|
'redirect_uri': redirect_uri,
|
|
'grant_type': 'authorization_code',
|
|
}
|
|
logger.debug('access token request %s', data)
|
|
try:
|
|
session = utils.requests_retry_session()
|
|
response = session.post(
|
|
app_settings.token_url, data=data,
|
|
verify=app_settings.verify_certificate,
|
|
allow_redirects=False, timeout=3)
|
|
if response.status_code != 200:
|
|
try:
|
|
data = response.json()
|
|
logger.warning(u'oauth2 error on access token retrieval: %r', data)
|
|
except ValueError:
|
|
data = {}
|
|
logger.warning(u'oauth2 error on access token retrieval: %r', response.content)
|
|
return
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(u'unable to retrieve access token {}'.format(e))
|
|
else:
|
|
try:
|
|
response = response.json()
|
|
logger.debug('token resolved : {}'.format(response))
|
|
return response
|
|
except ValueError:
|
|
logger.warning(
|
|
"no JSON object can be decoded from the data received from {} : '{}'".format(
|
|
app_settings.token_url, response.content))
|
|
|
|
|
|
def access_token_from_request(request, logger):
|
|
'''Resolve an access token given a request returning from the authorization
|
|
endpoint.
|
|
'''
|
|
code = request.GET.get('code')
|
|
state = request.GET.get('state')
|
|
if not code:
|
|
return
|
|
if not state:
|
|
return
|
|
states = request.session.get('fc_states', {})
|
|
if state not in states:
|
|
return
|
|
# there should not be many FC SSO in flight
|
|
redirect_uri = states[state]['redirect_uri']
|
|
return resolve_access_token(code, redirect_uri, logger)
|
|
|
|
|
|
ACCESS_GRANT_CODE = 'accessgrantcode'
|
|
|
|
|
|
def clean_fc_session(session):
|
|
session.pop('fc_id_token', None)
|
|
session.pop('fc_id_token_raw', None)
|
|
session.pop('fc_user_info', None)
|
|
session.pop('fc_data', None)
|
|
|
|
|
|
class FcOAuthSessionViewMixin(LoggerMixin):
|
|
'''Add the OAuth2 dance to a view'''
|
|
redirect_field_name = REDIRECT_FIELD_NAME
|
|
in_popup = False
|
|
token = None
|
|
|
|
def get_in_popup(self):
|
|
return self.in_popup
|
|
|
|
def redirect_to(self, request):
|
|
if request.method == 'POST':
|
|
redirect_to = request.POST.get(self.redirect_field_name,
|
|
request.GET.get(self.redirect_field_name, ''))
|
|
else:
|
|
redirect_to = request.GET.get(self.redirect_field_name, '')
|
|
|
|
if django.VERSION < (1, 11):
|
|
safe = is_safe_url(url=redirect_to, host=request.get_host())
|
|
else:
|
|
safe = is_safe_url(url=redirect_to, allowed_hosts=[request.get_host()])
|
|
if not safe:
|
|
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
|
|
return redirect_to
|
|
|
|
def close_popup_redirect(self, request, next_url, *args, **kwargs):
|
|
'''Show a page to close the current popup and reload the parent window
|
|
with the return url.
|
|
'''
|
|
return render(request, 'authentic2_auth_fc/close-popup-redirect.html',
|
|
{'redirect_to': next_url})
|
|
|
|
def simple_redirect(self, request, next_url, *args, **kwargs):
|
|
return a2_utils.redirect(request, next_url, *args, **kwargs)
|
|
|
|
def redirect(self, request, *args, **kwargs):
|
|
next_url = kwargs.pop('next_url', None)
|
|
if next_url is None:
|
|
next_url = self.redirect_to(request, *args, **kwargs)
|
|
if self.get_in_popup():
|
|
return self.close_popup_redirect(request, next_url, *args,
|
|
**kwargs)
|
|
else:
|
|
return self.simple_redirect(request, next_url, *args, **kwargs)
|
|
|
|
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
|
|
old_next_url = self.redirect_to(request)
|
|
here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url})
|
|
here = a2_utils.make_url(here, **kwargs)
|
|
there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here})
|
|
return self.redirect(request, next_url=there, *args, **kwargs)
|
|
|
|
def get_scopes(self):
|
|
return list(set(['openid'] + app_settings.scopes))
|
|
|
|
def get_ressource(self, url, verify):
|
|
try:
|
|
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
|
|
data.raise_for_status()
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(u'unable to retrieve ressource from {} due to {}'.format(url, e))
|
|
else:
|
|
try:
|
|
data = data.json()
|
|
self.logger.debug('ressource resolved : {}'.format(data))
|
|
return data
|
|
except ValueError:
|
|
self.logger.warning(
|
|
"no JSON object can be decoded from the data received from {} : '{}'".format(
|
|
url, data.content))
|
|
|
|
def get_user_info(self):
|
|
return self.get_ressource(app_settings.userinfo_url + '?schema=openid',
|
|
app_settings.verify_certificate)
|
|
|
|
def get_data(self, scopes=[]):
|
|
data = dict()
|
|
if not app_settings.fd_list:
|
|
return data
|
|
for scope in scopes:
|
|
for fd in app_settings.fd_list[scope]:
|
|
url = fd['url']
|
|
if fd['query_dic']:
|
|
url += '?' + urlencode(fd['query_dic'])
|
|
d = self.get_ressource(url, app_settings.verify_certificate)
|
|
if d:
|
|
data.setdefault(scope, []).append((fd['name'], d))
|
|
return data
|
|
|
|
def authorization_error(self, request, *args, **kwargs):
|
|
error = request.GET.get('error')
|
|
error_description = request.GET.get('error_description')
|
|
msg = _('Unable to connect to FranceConnect: "%s".') % error
|
|
if error_description:
|
|
msg += _('("%s")') % error_description
|
|
messages.error(request, msg)
|
|
self.logger.warning('auth_fc: authorization failed with error=%r error_description=%r',
|
|
error, error_description or '')
|
|
return self.redirect(request)
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
'''Interpret the OAuth authorization dance'''
|
|
if 'code' in request.GET:
|
|
self.token = access_token_from_request(request, self.logger)
|
|
|
|
# Token request may not be completly processed and result in no
|
|
# token
|
|
if not self.token:
|
|
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
|
return self.redirect(request)
|
|
|
|
# The token request may fail, 'error' is then required.
|
|
# A bad client secret results in error equals to invalid_request
|
|
# for FC and invalid_client for oidc_provider.
|
|
if 'error' in self.token:
|
|
msg = 'token request failed : {}'.format(self.token)
|
|
self.logger.warning(msg)
|
|
messages.warning(request, _('Unable to connect to FranceConnect: "%s".') % self.token['error'])
|
|
return self.redirect(request)
|
|
key = app_settings.client_secret
|
|
# duck-type unicode/Py3 strings
|
|
if hasattr(key, 'isdecimal'):
|
|
key = key.encode('utf-8')
|
|
self.id_token, error = models.parse_id_token(
|
|
self.token['id_token'], client_id=app_settings.client_id, client_secret=key)
|
|
if not self.id_token:
|
|
self.logger.warning(u'validation of id_token failed: %s', error)
|
|
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
|
return self.redirect(request)
|
|
nonce = self.id_token.get('nonce')
|
|
states = request.session.get('fc_states', {})
|
|
if not nonce or nonce not in states:
|
|
self.logger.warning(u'invalid nonce in id_token %s, known ones %s', nonce,
|
|
u', '.join(states.keys()))
|
|
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
|
return self.redirect(request)
|
|
self.logger.debug('fc id_token %s', self.id_token)
|
|
for key in self.id_token:
|
|
setattr(self, key, self.id_token[key])
|
|
self.oauth_session = lambda: utils.requests_retry_session(
|
|
session=OAuth2Session(
|
|
app_settings.client_id, token=self.token))
|
|
self.user_info = self.get_user_info()
|
|
if not self.user_info:
|
|
msg = 'userinfo resolution failed : {}'.format(self.token)
|
|
self.logger.warning(msg)
|
|
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
|
return self.redirect(request)
|
|
self.logger.debug('fc user_info %s', self.user_info)
|
|
self.request.session['fc_id_token'] = self.id_token
|
|
self.request.session['fc_id_token_raw'] = self.token['id_token']
|
|
self.request.session['fc_user_info'] = self.user_info
|
|
if 'fd_scopes' in request.GET:
|
|
scopes = request.GET.get('fd_scopes')
|
|
scopes = scopes.split()
|
|
self.data = self.get_data(scopes)
|
|
self.logger.debug('fc data %s', self.data)
|
|
fc_data = self.request.session.setdefault('fc_data', {})
|
|
for scope in self.data:
|
|
fc_data.setdefault(scope, []).extend(self.data[scope])
|
|
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
|
|
return super(FcOAuthSessionViewMixin, self).dispatch(request,
|
|
*args,
|
|
**kwargs)
|
|
elif 'error' in request.GET:
|
|
return self.authorization_error(request, *args, **kwargs)
|
|
else:
|
|
scopes = self.get_scopes()
|
|
if 'fd_scopes' in request.GET:
|
|
scopes = list(set(scopes) | set(request.GET['fd_scopes'].split()))
|
|
return ask_authorization(request, scopes, self.logger)
|
|
|
|
|
|
class PopupViewMixin(object):
|
|
def get_in_popup(self):
|
|
return 'popup' in self.request.GET
|
|
|
|
|
|
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
|
|
'''Login with FC, if the FC account is already linked, connect this user,
|
|
if a user is logged link the user to this account, otherwise display an
|
|
error message.
|
|
'''
|
|
|
|
def update_user_info(self):
|
|
self.fc_account.user_info = json.dumps(self.user_info)
|
|
self.fc_account.save()
|
|
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
|
|
self.logger.debug('updating user_info %s', self.fc_account.user_info)
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
registration = True if 'registration' in request.GET else False
|
|
'''Request an access grant code and associate it to the current user'''
|
|
self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
|
|
if request.user.is_authenticated():
|
|
# Prevent to add a link with an FC account already linked with another user.
|
|
try:
|
|
fc_account = models.FcAccount.objects.get(sub=self.sub, user__is_active=True)
|
|
if fc_account.user is not request.user:
|
|
msg = 'Attempt to link FC account {} already linked with user {}'
|
|
self.logger.info(msg.format(self.sub, fc_account.user))
|
|
messages.error(request,
|
|
_('The FranceConnect account {} is already'
|
|
' linked with another account.').format(fc_account))
|
|
return self.redirect(request)
|
|
except models.FcAccount.DoesNotExist:
|
|
pass
|
|
|
|
# Prevent to add a link to an user which is already linked to an FC account
|
|
if request.user.fc_accounts.exists():
|
|
self.logger.warning(u'cannot link to sub %s, account is already linked to an '
|
|
u'FC account', self.sub)
|
|
messages.error(request,
|
|
_('Your account is already linked to a FranceConnect account'))
|
|
return self.redirect(request)
|
|
|
|
json_token = json.dumps(self.token)
|
|
self.fc_account, created = models.FcAccount.objects.get_or_create(
|
|
defaults={'token': json_token},
|
|
sub=self.sub, user=request.user)
|
|
if created:
|
|
self.logger.info('fc link created sub %s', self.sub)
|
|
self.update_user_info()
|
|
data = utils.get_mapped_attributes_flat(request)
|
|
if 'email' in data:
|
|
messages.info(request,
|
|
_('Your FranceConnect account {} with '
|
|
'email {} has been linked.').format(self.fc_account,
|
|
data['email']))
|
|
else:
|
|
messages.info(request, _('Your FranceConnect account {} '
|
|
'has been linked.').format(self.fc_account))
|
|
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub,
|
|
request=request)
|
|
else:
|
|
self.fc_account.token = json_token
|
|
self.fc_account.save()
|
|
self.update_user_info()
|
|
messages.info(request, _('Your local account has been updated.'))
|
|
return self.redirect(request)
|
|
|
|
default_ou = get_default_ou()
|
|
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
|
|
user = a2_utils.authenticate(
|
|
request,
|
|
sub=self.sub,
|
|
user_info=self.user_info,
|
|
token=self.token)
|
|
if not user and self.user_info.get('email') and email_is_unique:
|
|
email = self.user_info['email']
|
|
User = get_user_model()
|
|
qs = User.objects.filter(email__iexact=email)
|
|
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
|
|
qs = qs.filter(ou=default_ou)
|
|
|
|
if qs.exists():
|
|
# there should not be multiple accounts with the same mail
|
|
if len(qs) > 1:
|
|
self.logger.error(u'multiple accounts with the same mail %s, %s', email,
|
|
list(qs))
|
|
# ok we have one account
|
|
elif len(qs) == 1:
|
|
user = qs[0]
|
|
# but does he have already a link to an FC account ?
|
|
if not user.fc_accounts.exists():
|
|
fc_account, created = models.FcAccount.objects.get_or_create(
|
|
defaults={'token': json.dumps(self.token)},
|
|
sub=self.sub, user=user)
|
|
if created:
|
|
self.logger.info(u'fc link created sub %s user %s', self.sub, user)
|
|
hooks.call_hooks('event', name='fc-link', user=user, sub=self.sub,
|
|
request=request)
|
|
user = a2_utils.authenticate(
|
|
request=request,
|
|
sub=self.sub,
|
|
user_info=self.user_info,
|
|
token=self.token)
|
|
else:
|
|
messages.warning(
|
|
request,
|
|
_('Your FranceConnect email address \'%s\' is already used by another '
|
|
'account, so we cannot create an account for you. Please create an '
|
|
'account with another email address then link it to FranceConnect '
|
|
'using your account management page.') % email)
|
|
return self.redirect(request)
|
|
if user:
|
|
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
|
|
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
|
|
self.fc_account.token = json.dumps(self.token)
|
|
self.fc_account.save(update_fields=['token'])
|
|
self.update_user_info()
|
|
self.logger.info('logged in using fc sub %s', self.sub)
|
|
return self.redirect(request)
|
|
else:
|
|
params = {}
|
|
if self.service_slug:
|
|
params[constants.SERVICE_FIELD_NAME] = self.service_slug
|
|
if registration:
|
|
return self.redirect_and_come_back(request,
|
|
a2_utils.make_url('fc-registration',
|
|
params=params),
|
|
params=params)
|
|
else:
|
|
messages.info(request, _('If you already have an account, please log in, else '
|
|
'create your account.'))
|
|
|
|
login_params = params.copy()
|
|
if not app_settings.show_button_quick_account_creation:
|
|
login_params['nofc'] = 1
|
|
|
|
login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
|
|
return self.redirect_and_come_back(request, login_url, params=params)
|
|
|
|
|
|
class RegistrationView(PopupViewMixin, LoggerMixin, View):
|
|
def get(self, request, *args, **kwargs):
|
|
data = utils.get_mapped_attributes_flat(request)
|
|
data['no_password'] = True
|
|
if app_settings.auto_register:
|
|
data['confirm_data'] = 'required'
|
|
else:
|
|
data['confirm_data'] = True
|
|
redirect_to = request.GET.get(REDIRECT_FIELD_NAME, '')
|
|
if not is_safe_url(url=redirect_to, host=request.get_host()):
|
|
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
|
|
|
|
# Prevent errors when redirect_to does not contain fc-login-or-link view
|
|
parsed_redirect_to = urlparse.urlparse(redirect_to)
|
|
if parsed_redirect_to.path == reverse('fc-login-or-link'):
|
|
redirect_to = urlparse.parse_qs(parsed_redirect_to.query) \
|
|
.get(REDIRECT_FIELD_NAME, [a2_utils.make_url('auth_homepage')])[0]
|
|
params = {
|
|
REDIRECT_FIELD_NAME: redirect_to,
|
|
}
|
|
if constants.SERVICE_FIELD_NAME in request.GET:
|
|
params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
|
|
if self.get_in_popup():
|
|
params['popup'] = ''
|
|
redirect_to = a2_utils.make_url('fc-login-or-link', params=params)
|
|
if 'email' not in data:
|
|
data[REDIRECT_FIELD_NAME] = redirect_to
|
|
messages.warning(request,
|
|
_("FranceConnect didn't provide your email address, please do."))
|
|
return HttpResponseRedirect("{}?token={}".format(reverse('registration_register'),
|
|
signing.dumps(data)))
|
|
data['valid_email'] = False
|
|
data['franceconnect'] = True
|
|
data['authentication_method'] = 'france-connect'
|
|
if constants.SERVICE_FIELD_NAME in request.GET:
|
|
data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
|
|
activation_url = a2_utils.build_activation_url(request,
|
|
next_url=redirect_to,
|
|
**data)
|
|
return HttpResponseRedirect(activation_url)
|
|
|
|
|
|
class UnlinkView(LoggerMixin, FormView):
|
|
template_name = 'authentic2_auth_fc/unlink.html'
|
|
|
|
def get_success_url(self):
|
|
url = reverse('account_management')
|
|
if app_settings.logout_when_unlink:
|
|
# logout URL can be None if not session exists with FC
|
|
url = utils.build_logout_url(self.request, next_url=url) or url
|
|
clean_fc_session(self.request.session)
|
|
return url
|
|
|
|
def get_form_class(self):
|
|
form_class = Form
|
|
if self.must_set_password():
|
|
form_class = SetPasswordForm
|
|
return form_class
|
|
|
|
def get_form_kwargs(self, **kwargs):
|
|
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
|
|
if self.must_set_password():
|
|
kwargs['user'] = self.request.user
|
|
return kwargs
|
|
|
|
def must_set_password(self):
|
|
for event in self.request.session.get(constants.AUTHENTICATION_EVENTS_SESSION_KEY, []):
|
|
if event['how'].startswith('password'):
|
|
return False
|
|
return self.request.user.can_change_password()
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if not request.user.is_authenticated():
|
|
raise PermissionDenied()
|
|
# We prevent unlinking if the user has no usable password and can't change it
|
|
# because we assume that the password is the unique other mean of authentication
|
|
# and unlinking would make the account unreachable.
|
|
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
|
|
# Prevent access to the view.
|
|
raise Http404
|
|
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def form_valid(self, form):
|
|
if self.must_set_password():
|
|
form.save()
|
|
update_session_auth_hash(self.request, self.request.user)
|
|
self.logger.info(u'user %s has set a password', self.request.user)
|
|
links = models.FcAccount.objects.filter(user=self.request.user)
|
|
for link in links:
|
|
self.logger.info(u'user %s unlinked from %s', self.request.user, link)
|
|
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
|
|
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
|
|
links.delete()
|
|
return super(UnlinkView, self).form_valid(form)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(UnlinkView, self).get_context_data(**kwargs)
|
|
if self.must_set_password():
|
|
context['no_password'] = True
|
|
return context
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return a2_utils.redirect(request, 'account_management')
|
|
return super(UnlinkView, self).post(request, *args, **kwargs)
|
|
|
|
|
|
login_or_link = LoginOrLinkView.as_view()
|
|
registration = RegistrationView.as_view()
|
|
unlink = UnlinkView.as_view()
|
|
|
|
|
|
class LogoutReturnView(View):
|
|
def get(self, request, *args, **kwargs):
|
|
state = request.GET.get('state')
|
|
clean_fc_session(request.session)
|
|
states = request.session.pop('fc_states', None)
|
|
next_url = None
|
|
if states and state in states:
|
|
next_url = states[state].get('next')
|
|
if not next_url:
|
|
next_url = reverse('auth_logout')
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
logout = LogoutReturnView.as_view()
|