authentic/src/authentic2_auth_fc/views.py

614 lines
26 KiB
Python

# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
import logging
import json
import requests
from requests_oauthlib import OAuth2Session
from django.db import IntegrityError
from django.views.generic import View, FormView
from django.http import HttpResponseRedirect, Http404
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.contrib import messages
from django.shortcuts import resolve_url, render
from django.urls import reverse
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from django.utils.http import is_safe_url, urlencode
from django.conf import settings
from django.core import signing
from django.core.cache import InvalidCacheBackendError, caches
from django.core.exceptions import PermissionDenied
from django.forms import Form
try:
from django.contrib.auth.views import update_session_auth_hash
except ImportError:
update_session_auth_hash = None
from authentic2 import app_settings as a2_app_settings
from authentic2 import utils as a2_utils, hooks, constants
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.forms.passwords import SetPasswordForm
from authentic2.utils import views as views_utils
from authentic2.utils.service import get_service_from_request, set_service_ref
from . import app_settings, models, utils
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(LoggerMixin, *args, **kwargs)
try:
cache = caches['fc']
except InvalidCacheBackendError:
cache = caches['default']
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes, logger):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
state = str(uuid.uuid4())
states = request.session.setdefault('fc_states', {})
states[state] = {
'redirect_uri': redirect_uri,
}
request.session.modified = True
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'nonce': state,
'acr_values': 'eidas1',
}
logger.debug('query string %s', params)
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('redirect to %s', url)
response = HttpResponseRedirect(url)
response.display_message = False
return response
def resolve_access_token(authorization_code, redirect_uri, logger):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('access token request %s', data)
try:
session = utils.requests_retry_session()
response = session.post(
app_settings.token_url, data=data,
verify=app_settings.verify_certificate,
allow_redirects=False, timeout=3)
if response.status_code != 200:
try:
data = response.json()
logger.error(u'oauth2 error on access token retrieval: %r', data)
except ValueError:
data = {}
logger.error(u'oauth2 error on access token retrieval: %r', response.content[:1024])
return
except requests.exceptions.RequestException as e:
logger.error(u'unable to retrieve access token {}'.format(e))
else:
try:
response = response.json()
logger.debug('token resolved : %s', response)
return response
except ValueError:
logger.error(
'no JSON object can be decoded from the data received from %s: %r',
app_settings.token_url, response.content[:1024])
def access_token_from_request(request, logger):
'''Resolve an access token given a request returning from the authorization
endpoint.
'''
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return
if not state:
return
states = request.session.get('fc_states', {})
if state not in states:
return
# there should not be many FC SSO in flight
redirect_uri = states[state]['redirect_uri']
return resolve_access_token(code, redirect_uri, logger)
ACCESS_GRANT_CODE = 'accessgrantcode'
def clean_fc_session(session):
session.pop('fc_id_token', None)
session.pop('fc_id_token_raw', None)
session.pop('fc_user_info', None)
session.pop('fc_data', None)
class FcOAuthSessionViewMixin(LoggerMixin):
'''Add the OAuth2 dance to a view'''
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
user_info = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request):
if request.method == 'POST':
redirect_to = request.POST.get(self.redirect_field_name,
request.GET.get(self.redirect_field_name, ''))
else:
redirect_to = request.GET.get(self.redirect_field_name, '')
safe = is_safe_url(url=redirect_to, allowed_hosts=request.get_host())
if not safe:
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
'''Show a page to close the current popup and reload the parent window
with the return url.
'''
return render(request, 'authentic2_auth_fc/close-popup-redirect.html',
{'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return a2_utils.redirect(request, next_url, *args, resolve=False, **kwargs)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args,
**kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request)
here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url})
here = a2_utils.make_url(here, **kwargs)
there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here})
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
return list(set(['openid'] + app_settings.scopes))
def get_ressource(self, url, verify):
try:
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
data.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error('unable to retrieve ressource from %s due to %s', url, e)
else:
try:
data = data.json()
self.logger.debug('ressource resolved: %s', data)
return data
except ValueError:
self.logger.error(
'no JSON object can be decoded from the data received from %s: %r',
url, data.content)
def get_user_info(self):
return self.get_ressource(app_settings.userinfo_url + '?schema=openid',
app_settings.verify_certificate)
def get_data(self, scopes=[]):
data = dict()
if not app_settings.fd_list:
return data
for scope in scopes:
for fd in app_settings.fd_list[scope]:
url = fd['url']
if fd['query_dic']:
url += '?' + urlencode(fd['query_dic'])
d = self.get_ressource(url, app_settings.verify_certificate)
if d:
data.setdefault(scope, []).append((fd['name'], d))
return data
def authorization_error(self, request, *args, **kwargs):
error = request.GET.get('error')
error_description = request.GET.get('error_description')
msg = _('Unable to connect to FranceConnect: "%s".') % error
if error_description:
msg += _('("%s")') % error_description
messages.error(request, msg)
self.logger.warning('auth_fc: authorization failed with error=%r error_description=%r',
error, error_description or '')
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request, self.logger)
# Token request may not be completly processed and result in no
# token
if not self.token:
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
# The token request may fail, 'error' is then required.
# A bad client secret results in error equals to invalid_request
# for FC and invalid_client for oidc_provider.
if 'error' in self.token:
msg = 'token request failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(request, _('Unable to connect to FranceConnect: "%s".') % self.token['error'])
return self.redirect(request)
key = app_settings.client_secret
# duck-type unicode/Py3 strings
if hasattr(key, 'isdecimal'):
key = key.encode('utf-8')
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key)
if not self.id_token:
self.logger.error(u'validation of id_token failed: %s', error)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
nonce = self.id_token.get('nonce')
states = request.session.get('fc_states', {})
if not nonce or nonce not in states:
self.logger.error(u'invalid nonce in id_token %s, known ones %s', nonce,
u', '.join(states.keys()))
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc id_token %s', self.id_token)
for key in self.id_token:
setattr(self, key, self.id_token[key])
self.oauth_session = lambda: utils.requests_retry_session(
session=OAuth2Session(
app_settings.client_id, token=self.token))
self.user_info = self.get_user_info()
if not self.user_info:
self.logger.error('userinfo resolution failed: %s', self.token)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc user_info %s', self.user_info)
self.request.session['fc_id_token'] = self.id_token
self.request.session['fc_id_token_raw'] = self.token['id_token']
self.request.session['fc_user_info'] = self.user_info
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.data = self.get_data(scopes)
self.logger.debug('fc data %s', self.data)
fc_data = self.request.session.setdefault('fc_data', {})
for scope in self.data:
fc_data.setdefault(scope, []).extend(self.data[scope])
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
return super(FcOAuthSessionViewMixin, self).dispatch(request,
*args,
**kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
scopes = self.get_scopes()
if 'fd_scopes' in request.GET:
scopes = list(set(scopes) | set(request.GET['fd_scopes'].split()))
return ask_authorization(request, scopes, self.logger)
@property
def fc_display_name(self):
'''Human representation of the current FC account'''
display_name = ''
user_info = self.user_info or {}
family_name = user_info.get('family_name')
given_name = user_info.get('given_name')
if given_name:
display_name += given_name
if family_name:
if display_name:
display_name += ' '
display_name += family_name
return display_name
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.GET
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
'''Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
'''
def update_user_info(self):
self.fc_account.token = json.dumps(self.token)
self.fc_account.user_info = json.dumps(self.user_info)
self.fc_account.save(update_fields=['token', 'user_info'])
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
self.logger.debug('updating user_info %s', self.fc_account.user_info)
def uniqueness_check_failed(self, request):
if request.user.is_authenticated():
# currently logged :
if models.FcAccount.objects.filter(user=request.user, order=0).count():
# cannot link because we are already linked to another FC account
messages.error(request,
_('Your account is already linked to a FranceConnect account'))
else:
# cannot link because the FC account is already linked to another account.
messages.error(request,
_('The FranceConnect account {} is already'
' linked with another account.').format(self.fc_display_name))
else:
# not logged, cannot login because the user is disabled (user.is_active is False)
messages.error(request, _('Your account is disabled.'))
return self.redirect(request)
def get(self, request, *args, **kwargs):
registration = True if 'registration' in request.GET else False
'''Request an access grant code and associate it to the current user'''
self.service = get_service_from_request(request)
if request.user.is_authenticated:
# Prevent to add a link with an FC account already linked with another user.
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
sub=self.sub, user=request.user, order=0,
defaults={'token': json.dumps(self.token)})
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
if created:
self.logger.info('fc link created sub %s', self.sub)
messages.info(request,
_('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
else:
messages.info(request, _('Your local account has been updated.'))
self.update_user_info()
return self.redirect(request)
default_ou = get_default_ou()
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
user = a2_utils.authenticate(
request,
sub=self.sub,
user_info=self.user_info,
token=self.token)
if user:
self.fc_account = user.fc_accounts.get(order=0)
if not user and self.user_info.get('email') and email_is_unique:
email = self.user_info['email']
User = get_user_model()
qs = User.objects.filter(email__iexact=email)
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
qs = qs.filter(ou=default_ou)
if qs.exists():
# there should not be multiple accounts with the same mail
if len(qs) > 1:
self.logger.error(u'multiple accounts with the same mail %s, %s', email,
list(qs))
# ok we have one account
elif len(qs) == 1:
user = qs[0]
# but does he have already a link to an FC account ?
if not user.fc_accounts.exists():
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
defaults={'token': json.dumps(self.token)},
sub=self.sub, user=user, order=0)
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
if created:
self.logger.info(u'fc link created sub %s user %s', self.sub, user)
hooks.call_hooks('event', name='fc-link', user=user, sub=self.sub,
request=request)
user = a2_utils.authenticate(
request=request,
sub=self.sub,
user_info=self.user_info,
token=self.token)
else:
messages.warning(
request,
_('Your FranceConnect email address \'%s\' is already used by another '
'account, so we cannot create an account for you. Please create an '
'account with another email address then link it to FranceConnect '
'using your account management page.') % email)
return self.redirect(request)
if user:
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service=self.service)
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
self.fc_account.token = json.dumps(self.token)
self.fc_account.save(update_fields=['token'])
self.update_user_info()
self.logger.info('logged in using fc sub %s', self.sub)
return self.redirect(request)
else:
params = {}
if self.service:
set_service_ref(params, self.service)
if registration:
return self.redirect_and_come_back(request,
a2_utils.make_url('fc-registration',
params=params),
params=params)
else:
messages.info(request, _('If you already have an account, please log in, else '
'create your account.'))
login_params = params.copy()
if not app_settings.show_button_quick_account_creation:
login_params['nofc'] = 1
login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
return self.redirect_and_come_back(request, login_url, params=params)
class RegistrationView(PopupViewMixin, LoggerMixin, View):
def get(self, request, *args, **kwargs):
data = utils.get_mapped_attributes_flat(request)
data['no_password'] = True
if app_settings.auto_register:
data['confirm_data'] = 'required'
else:
data['confirm_data'] = True
redirect_to = request.GET.get(REDIRECT_FIELD_NAME, '')
if not is_safe_url(url=redirect_to, allowed_hosts=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
# Prevent errors when redirect_to does not contain fc-login-or-link view
parsed_redirect_to = urlparse.urlparse(redirect_to)
if parsed_redirect_to.path == reverse('fc-login-or-link'):
redirect_to = urlparse.parse_qs(parsed_redirect_to.query) \
.get(REDIRECT_FIELD_NAME, [a2_utils.make_url('auth_homepage')])[0]
params = {
REDIRECT_FIELD_NAME: redirect_to,
}
service = get_service_from_request(request)
if service:
set_service_ref(params, service)
if self.get_in_popup():
params['popup'] = ''
redirect_to = a2_utils.make_url('fc-login-or-link', params=params)
if 'email' not in data:
data[REDIRECT_FIELD_NAME] = redirect_to
messages.warning(request,
_("FranceConnect didn't provide your email address, please do."))
return HttpResponseRedirect("{}?token={}".format(reverse('registration_register'),
signing.dumps(data)))
data['valid_email'] = False
data['franceconnect'] = True
data['authentication_method'] = 'france-connect'
if service:
set_service_ref(data, service)
activation_url = a2_utils.build_activation_url(request,
next_url=redirect_to,
**data)
return HttpResponseRedirect(activation_url)
class UnlinkView(LoggerMixin, FormView):
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
url = reverse('account_management')
if app_settings.logout_when_unlink:
# logout URL can be None if not session exists with FC
url = utils.build_logout_url(self.request, next_url=url) or url
clean_fc_session(self.request.session)
return url
def get_form_class(self):
form_class = Form
if self.must_set_password():
form_class = SetPasswordForm
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
if self.must_set_password():
kwargs['user'] = self.request.user
return kwargs
def must_set_password(self):
for event in self.request.session.get(constants.AUTHENTICATION_EVENTS_SESSION_KEY, []):
if event['how'].startswith('password'):
return False
return self.request.user.can_change_password()
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied()
# We prevent unlinking if the user has no usable password and can't change it
# because we assume that the password is the unique other mean of authentication
# and unlinking would make the account unreachable.
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
def form_valid(self, form):
if self.must_set_password():
form.save()
update_session_auth_hash(self.request, self.request.user)
self.logger.info(u'user %s has set a password', self.request.user)
links = models.FcAccount.objects.filter(user=self.request.user)
for link in links:
self.logger.info(u'user %s unlinked from %s', self.request.user, link)
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
links.delete()
return super(UnlinkView, self).form_valid(form)
def get_context_data(self, **kwargs):
context = super(UnlinkView, self).get_context_data(**kwargs)
if self.must_set_password():
context['no_password'] = True
return context
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super(UnlinkView, self).post(request, *args, **kwargs)
login_or_link = LoginOrLinkView.as_view()
registration = RegistrationView.as_view()
unlink = UnlinkView.as_view()
class LogoutReturnView(View):
def get(self, request, *args, **kwargs):
state = request.GET.get('state')
clean_fc_session(request.session)
states = request.session.pop('fc_states', None)
next_url = None
if states and state in states:
next_url = states[state].get('next')
if not next_url:
next_url = reverse('auth_logout')
return HttpResponseRedirect(next_url)
logout = LogoutReturnView.as_view()