This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-fc/src/authentic2_auth_fc/views.py

486 lines
20 KiB
Python

import uuid
import requests
import logging
import json
from requests_oauthlib import OAuth2Session
from django.views.generic import View, FormView
from django.views.generic.detail import SingleObjectMixin
from django.http import HttpResponseRedirect, Http404
from django.contrib.auth import authenticate, REDIRECT_FIELD_NAME
from django.contrib.auth.decorators import user_passes_test
from django.contrib import messages
from django.shortcuts import resolve_url, render
from django.utils.translation import ugettext as _
from django.utils.http import is_safe_url, urlencode
from django.conf import settings
from django.core import signing
from django.core.cache import InvalidCacheBackendError, caches
from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse
from django.forms import Form
from authentic2 import app_settings as a2_app_settings
from authentic2 import utils as a2_utils
from . import app_settings, models, utils
SET_PASSWORD_FORM_CLASS = a2_utils.import_module_or_class(
a2_app_settings.A2_REGISTRATION_SET_PASSWORD_FORM_CLASS)
def user_has_fcaccount(user):
'''Return True if user a link to FC'''
try:
return user.fcaccount is not None
except models.FcAccount.DoesNotExist:
return False
fcaccount_required = user_passes_test(user_has_fcaccount, '/')
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(LoggerMixin, *args, **kwargs)
try:
cache = caches['fc']
except InvalidCacheBackendError:
cache = caches['default']
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes, logger):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
state = unicode(uuid.uuid4())
states = request.session.setdefault('fc-states', {})
states[state] = {
'redirect_uri': redirect_uri,
}
request.session.modified = True
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'nonce': state,
}
logger.debug('query string %s', params)
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('redirect to %s', url)
return HttpResponseRedirect(url)
def resolve_access_token(authorization_code, redirect_uri, logger):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('data %s', data)
try:
response = requests.post(
app_settings.token_url, data=data,
verify=app_settings.verify_certificate,
allow_redirects=False, timeout=3)
if response.status_code == 400:
try:
data = response.json()
except ValueError:
data = {}
logger.warning(u'oauth2 error on access token retrieval '
u'(error=%s, error_description=%s, error_uri=%s)',
data['error'], data['error_description'], data['error_uri'])
return
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(u'unable to retrieve access token {}'.format(e))
else:
try:
response = response.json()
logger.debug('token resolved : {}'.format(response))
return response
except ValueError:
logger.warning(
"no JSON object can be decoded from the data received from {} : '{}'".format(
app_settings.token_url, response.content))
def access_token_from_request(request, logger):
'''Resolve an access token given a request returning from the authorization
endpoint.
'''
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return
if not state:
return
states = request.session.get('fc-states', {})
if state not in states:
return
# there should not be many FC SSO in flight
redirect_uri = states[state]['redirect_uri']
return resolve_access_token(code, redirect_uri, logger)
ACCESS_GRANT_CODE = 'accessgrantcode'
class FcOAuthSessionViewMixin(LoggerMixin):
'''Add the OAuth2 dance to a view'''
scopes = ['openid', 'profile', 'birth', 'email']
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request, *args, **kwargs):
redirect_to = request.REQUEST.get(self.redirect_field_name, '')
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
'''Show a page to close the current popup and reload the parent window
with the return url.
'''
return render(request, 'authentic2_auth_fc/close-popup-redirect.html',
{'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return HttpResponseRedirect(next_url)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args,
**kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request, *args, **kwargs)
here = '{0}?{1}'.format(
request.path, urlencode({REDIRECT_FIELD_NAME: old_next_url}))
there = '{0}{2}{1}'.format(
next_url, urlencode({REDIRECT_FIELD_NAME: here}),
'&' if '?' in next_url else '?')
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
return self.scopes
def get_ressource(self, url, verify):
try:
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
data.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error(u'unable to retrieve ressource from {} due to {}'.format(url, e))
else:
try:
data = data.json()
self.logger.debug('ressource resolved : {}'.format(data))
return data
except ValueError:
self.logger.warning(
"no JSON object can be decoded from the data received from {} : '{}'".format(
url, data.content))
def get_user_info(self):
return self.get_ressource(app_settings.userinfo_url + '?schema=openid',
app_settings.verify_certificate)
def get_data(self, scopes=[]):
data = dict()
if not app_settings.fd_list:
return data
for scope in scopes:
for fd in app_settings.fd_list[scope]:
url = fd['url']
if fd['query_dic']:
url += '?' + urlencode(fd['query_dic'])
d = self.get_ressource(url, app_settings.verify_certificate)
if d:
data.setdefault(scope, []).append((fd['name'], d))
return data
def authorization_error(self, request, *args, **kwargs):
if request.REQUEST['error'] == 'access_denied':
messages.warning(request, _('You refused the connection.'))
self.logger.debug('authorization_error %r', request.GET)
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request, self.logger)
# Token request may not be completly processed and result in no
# token
if not self.token:
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
# The token request may fail, 'error' is then required.
# A bad client secret results in error equals to invalid_request
# for FC and invalid_client for oidc_provider.
if 'error' in self.token:
msg = 'token request failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
key = app_settings.client_secret
if isinstance(key, unicode):
key = key.encode('utf-8')
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key)
if not self.id_token:
self.logger.warning(u'validation of id_token failed: %s', error)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
nonce = self.id_token.get('nonce')
states = request.session.get('fc-states', {})
if not nonce or nonce not in states:
self.logger.warning(u'invalid nonce in id_token %s, known ones %s', nonce,
u', '.join(states.keys()))
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc id_token %s', self.id_token)
for key in self.id_token:
setattr(self, key, self.id_token[key])
self.oauth_session = lambda: OAuth2Session(
app_settings.client_id, token=self.token)
self.user_info = self.get_user_info()
if not self.user_info:
msg = 'userinfo resolution failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc user_info %s', self.user_info)
self.request.session['fc-id_token'] = self.id_token
self.request.session['fc-id_token_raw'] = self.token['id_token']
self.request.session['fc-user_info'] = self.user_info
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.data = self.get_data(scopes)
self.logger.debug('fc data %s', self.data)
fc_data = self.request.session.setdefault('fc-data', {})
for scope in self.data:
fc_data.setdefault(scope, []).extend(self.data[scope])
self.logger.debug('fc data in session %s', self.request.session['fc-data'])
return super(FcOAuthSessionViewMixin, self).dispatch(request,
*args,
**kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.scopes.extend(scopes)
return ask_authorization(request, self.get_scopes(), self.logger)
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.REQUEST
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
'''Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
'''
def update_user_info(self):
self.fc_account.user_info = json.dumps(self.user_info)
self.fc_account.save()
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
self.logger.debug('updating user_info %s', self.fc_account.user_info)
def get(self, request, *args, **kwargs):
registration = True if 'registration' in request.GET else False
'''Request an access grant code and associate it to the current user'''
if request.user.is_authenticated():
# Prevent to add a link with an FC account already linked with another user.
try:
fc_account = models.FcAccount.objects.get(sub=self.sub, user__is_active=True)
if fc_account.user is not request.user:
msg = 'Attempt to link FC account {} already linked with user {}'
self.logger.info(msg.format(self.sub, fc_account.user))
messages.info(request,
_('The FranceConnect account {} is already'
' linked with another account.').format(fc_account))
return self.redirect(request)
except models.FcAccount.DoesNotExist:
pass
# Old link are deleted
json_token = json.dumps(self.token)
self.fc_account, created = models.FcAccount.objects.get_or_create(
defaults={'token': json_token},
sub=self.sub, user=request.user)
if created:
self.logger.info('fc link created sub %s', self.sub)
self.update_user_info()
data = utils.get_mapped_attributes_flat(request)
if 'email' in data:
messages.info(request,
_('Your FranceConnect account {} with '
'email {} has been linked.').format(self.fc_account,
data['email']))
else:
messages.info(request, _('Your FranceConnect account {} '
'has been linked.').format(self.fc_account))
else:
self.fc_account.token = json_token
self.fc_account.save()
self.update_user_info()
messages.info(request, _('Your local account has been updated.'))
return self.redirect(request)
user = authenticate(sub=self.sub, user_info=self.user_info,
token=self.token)
if user:
a2_utils.login(request, user, 'france-connect')
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
self.update_user_info()
self.logger.info('logged in using fc sub %s', self.sub)
return self.redirect(request)
else:
if registration:
return self.redirect_and_come_back(request, reverse('fc-registration'))
else:
messages.info(request, _('If you already have an account, please log in, else '
'create your account.'))
if app_settings.show_button_quick_account_creation:
return self.redirect_and_come_back(request, settings.LOGIN_URL)
else:
return self.redirect_and_come_back(request,
'{0}?nofc=1'.format(settings.LOGIN_URL))
class RegistrationView(LoggerMixin, View):
def get(self, request, *args, **kwargs):
data = utils.get_mapped_attributes_flat(request)
data['no_password'] = True
if app_settings.auto_register:
data['confirm_data'] = 'required'
else:
data['confirm_data'] = True
redirect_to = request.REQUEST.get(REDIRECT_FIELD_NAME, '')
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
if not 'email' in data:
data[REDIRECT_FIELD_NAME] = redirect_to
messages.info(request, _("FranceConnect didn't provide your email address, please do."))
return HttpResponseRedirect("{}?token={}".format(reverse('registration_register'),
signing.dumps(data)))
data['valid_email'] = False
activation_url = \
a2_utils.build_activation_url(request,
next_url=redirect_to,
**data)
return HttpResponseRedirect(activation_url)
class UnlinkView(LoggerMixin, SingleObjectMixin, FormView):
model = models.FcAccount
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
if app_settings.logout_when_unlink:
return reverse('auth_logout')
return reverse('account_management')
def get_form_class(self):
form_class = Form
if not self.fc_account.user.has_usable_password():
form_class = SET_PASSWORD_FORM_CLASS
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
if not self.fc_account.user.has_usable_password():
kwargs['user'] = self.fc_account.user
return kwargs
def dispatch(self, request, *args, **kwargs):
# We prevent unlinking if the user has no usable password and can't change it
# because we assume that the password is the unique other mean of authentication
# and unlinking would make the account unreachable.
if not request.user.has_usable_password() and not \
a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
self.fc_account = self.object = self.get_object()
self.check_access(self.fc_account)
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
def check_access(self, fc_account):
if self.request.user != fc_account.user:
raise PermissionDenied
def form_valid(self, form):
if not self.fc_account.user.has_usable_password():
form.save()
self.logger.info(u'user %s has set a password', self.fc_account.user)
self.fc_account.user.backend = 'authentic2.backends.models_backend.ModelBackend'
msg_tpl = _('The link with the FranceConnect account {fc_account} has been deleted.')
msg = msg_tpl.format(fc_account=self.fc_account)
self.logger.info(u'user %s unlinked from %s', self.fc_account.user, self.fc_account)
self.fc_account.delete()
messages.info(self.request, msg)
return super(UnlinkView, self).form_valid(form)
def get_context_data(self, **kwargs):
context = super(UnlinkView, self).get_context_data(**kwargs)
context['fc_account'] = self.fc_account
if not self.fc_account.user.has_usable_password():
context['no_password'] = True
return context
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super(UnlinkView, self).post(request, *args, **kwargs)
login_or_link = LoginOrLinkView.as_view()
registration = RegistrationView.as_view()
unlink = UnlinkView.as_view()
class LogoutReturnView(View):
def get(self, request, *args, **kwargs):
request.session.pop('fc-id_token', None)
request.session.pop('fc-id_token_raw', None)
request.session.pop('fc-user_info', None)
request.session.pop('fc-data', None)
request.session.pop('fc-states', None)
return HttpResponseRedirect(reverse('auth_logout'))
logout = LogoutReturnView.as_view()