authentic/src/authentic2_auth_fc/views.py

575 lines
23 KiB
Python

# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
import time
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.views import update_session_auth_hash
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError, transaction
from django.forms import Form
from django.http import Http404, HttpResponseRedirect
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.translation import ugettext as _
from django.views.generic import FormView, View
from requests_oauthlib import OAuth2Session
from authentic2 import app_settings as a2_app_settings
from authentic2 import constants, hooks
from authentic2 import models as a2_models
from authentic2 import utils as a2_utils
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.compat.cookies import set_cookie
from authentic2.crypto import check_hmac_url, hash_chain, hmac_url
from authentic2.forms.passwords import SetPasswordForm
from authentic2.utils import views as views_utils
from authentic2.utils.models import safe_get_or_create
from authentic2.utils.service import get_service_from_ref, get_service_from_request, service_ref
from . import app_settings, models
from .utils import (
RequestError,
apply_user_info_mappings,
build_logout_url,
clean_fc_session,
get_json,
post_json,
)
logger = logging.getLogger(__name__)
User = get_user_model()
class UserOutsideDefaultOu(Exception):
pass
class LoginOrLinkView(View):
"""Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
"""
_next_url = None
service = None
@property
def next_url(self):
return self._next_url or a2_utils.select_next_url(self.request, default=settings.LOGIN_REDIRECT_URL)
@property
def redirect_uri(self):
return self.request.build_absolute_uri(reverse('fc-login-or-link'))
def redirect(self):
return a2_utils.redirect(self.request, self.next_url)
@property
def fc_display_name(self):
'''Human representation of the current FC account'''
display_name = ''
family_name = self.user_info.get('family_name')
given_name = self.user_info.get('given_name')
if given_name:
display_name += given_name
if family_name:
if display_name:
display_name += ' '
display_name += family_name
return display_name
def get(self, request, *args, **kwargs):
code = request.GET.get('code')
state = request.GET.get('state')
if code and state:
response = self.handle_authorization_response(request, code=code, state=state)
response.delete_cookie('fc-state', path=reverse('fc-login-or-link'))
return response
elif 'error' in request.GET:
return self.authorization_error(
request, error=request.GET['error'], error_description=request.GET.get('error_description')
)
else:
return self.make_authorization_request(request)
def handle_authorization_response(self, request, code, state):
# check state signature and parse it
try:
state, self._next_url, self.service = self.decode_state(state)
except ValueError:
return a2_utils.redirect(request, settings.LOGIN_REDIRECT_URL)
# regenerte the chain of hash from the stored nonce_seed
try:
encoded_seed = request.COOKIES.get('fc-state', '')
if not encoded_seed:
raise ValueError
dummy, hash_nonce, hash_state = hash_chain(3, encoded_seed=encoded_seed)
if not state or state != hash_state:
logger.warning('auth_fc: state lost, requesting authorization again')
raise ValueError
except ValueError:
return self.make_authorization_request(request)
# resolve the authorization_code and check the token endpoint response
self.token = self.resolve_authorization_code(code)
if not self.token:
# resolve_authorization_code already logged a warning.
return self.report_fc_is_down(request)
if 'error' in self.token:
logger.warning('auth_fc: token request failed, "%s"', self.token)
messages.warning(
request,
_('Unable to connect to FranceConnect: "%s".')
% (self.token.get('error_description') or self.token['error']),
)
return self.redirect()
# parse the id_token
if not self.token.get('id_token') or not isinstance(self.token['id_token'], str):
logger.warning('auth_fc: token endpoint did not return an id_token')
return self.report_fc_is_down(request)
key = app_settings.client_secret.encode()
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
)
if not self.id_token:
logger.warning('auth_fc: validation of id_token failed: %s', error)
return self.report_fc_is_down(request)
logger.debug('auth_fc: parsed id_token %s', self.id_token)
nonce = self.id_token.get('nonce')
if nonce != hash_nonce:
logger.warning('auth_fc: invalid nonce in id_token')
return self.report_fc_is_down(request)
self.sub = self.id_token.get('sub')
if not self.sub:
logger.warning('auth_fc: no sub in id_token %s', self.id_token)
return self.report_fc_is_down(request)
# get user info using the access token
if not self.token.get('access_token') or not isinstance(self.token['access_token'], str):
logger.warning('auth_fc: token endpoint did not return an access_token')
return self.report_fc_is_down(request)
self.user_info = self.get_user_info()
if self.user_info is None:
return self.report_fc_is_down(request)
logger.debug('auth_fc: user_info %s', self.user_info)
# clear FranceConnect down status
cache.delete('fc_is_down')
if request.user.is_authenticated:
return self.link(request)
else:
return self.login(request)
def encode_state(self, state, next_url, service):
encoded_state = state + ' ' + self.next_url + ' '
if service:
encoded_state += service_ref(service)
encoded_state += ' ' + hmac_url(settings.SECRET_KEY, encoded_state)
return encoded_state
def decode_state(self, state):
payload, signature = state.rsplit(' ', 1)
if not check_hmac_url(settings.SECRET_KEY, payload, signature):
raise ValueError
# service_ref can be made of one or two parts
try:
state, next_url, service_ref = payload.split(' ')
except ValueError:
state, next_url, ou_slug, service_slug = payload.split(' ')
service_ref = ou_slug + ' ' + service_slug
service = get_service_from_ref(service_ref)
return state, next_url, service
def make_authorization_request(self, request):
scope = ' '.join(set(['openid'] + app_settings.scopes))
service = self.service or get_service_from_request(request)
nonce_seed, nonce, state = hash_chain(3)
# encode the target service and next_url in the state
full_state = state + ' ' + self.next_url + ' '
if service:
full_state += service_ref(service)
full_state += ' ' + hmac_url(settings.SECRET_KEY, full_state)
params = {
'client_id': app_settings.client_id,
'scope': scope,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'state': self.encode_state(state, self.next_url, service),
'nonce': nonce,
'acr_values': 'eidas1',
}
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('auth_fc: authorization_request redirect to %s', url)
response = HttpResponseRedirect(url)
# prevent unshown messages to block the navigation to FranceConnect
response.display_message = False
# store nonce_seed in a browser cookie to prevent CSRF and check nonce
# in id_token on return by generating the hash chain again
set_cookie(
response,
'fc-state',
value=nonce_seed,
path=reverse('fc-login-or-link'),
httponly=True,
secure=request.is_secure(),
samesite='Lax',
)
return response
def resolve_authorization_code(self, authorization_code):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': self.redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('auth_fc: resolve_access_token request params %s', data)
try:
token = post_json(app_settings.token_url, data, expected_statuses=[400])
except RequestError as e:
logger.warning('auth_fc: resolve_authorization_code error %s', e)
return None
else:
logger.debug('auth_fc: token endpoint returned "%s"', token)
return token
def get_user_info(self):
try:
data = get_json(
app_settings.userinfo_url + '?schema=openid',
session=OAuth2Session(app_settings.client_id, token=self.token),
)
except RequestError as e:
logger.warning('auth_fc: get_user_info error %s', e)
return None
logger.debug('auth_fc: get_user_info returned %r', data)
return data
def authorization_error(self, request, error, error_description):
messages.error(request, _('Unable to connect to FranceConnect: "%s".') % (error_description or error))
logger.warning(
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
)
return self.redirect()
def report_fc_is_down(self, request):
messages.warning(request, _('Unable to connect to FranceConnect.'))
# put FranceConnect status in cache, if it happens for more than 5 minutes, log an error
last_down = cache.get('fc_is_down')
now = time.time()
more_than_5_minutes = last_down and (now - last_down) > 5 * 60
if more_than_5_minutes:
logger.error('auth_fc: FranceConnect is down for more than 5 minutes')
if not last_down or more_than_5_minutes:
cache.set('fc_is_down', now, 10 * 60)
return self.redirect()
def link(self, request):
'''Request an access grant code and associate it to the current user'''
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
sub=self.sub,
user=request.user,
order=0,
defaults={
'token': json.dumps(self.token),
'user_info': json.dumps(self.user_info),
},
)
# Prevent adding a link with an FC account already linked with another user.
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
if created:
logger.info('auth_fc: link created sub %s', self.sub)
messages.info(
request, _('Your FranceConnect account {} has been linked.').format(self.fc_display_name)
)
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
self.update_user_info(request.user, self.user_info)
return self.redirect()
def login(self, request):
user = a2_utils.authenticate(request, sub=self.sub, user_info=self.user_info, token=self.token)
if not user:
user = self.create_account(request, sub=self.sub, token=self.token, user_info=self.user_info)
if not user:
return self.redirect()
return self.finish_login(request, user, self.user_info)
def finish_login(self, request, user, user_info):
self.update_user_info(user, user_info)
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service=self.service)
# keep id_token around for logout
request.session['fc_id_token'] = self.id_token
request.session['fc_id_token_raw'] = self.token['id_token']
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
# redirect to account edit page if any required attribute is missing
name_to_label = dict(a2_models.Attribute.objects.filter(required=True).values_list('name', 'label'))
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(name_to_label)
missing = []
for attr_name in set(required):
value = getattr(user, attr_name, None) or getattr(user.attributes, attr_name, None)
if value in [None, '']:
missing.append(name_to_label[attr_name])
if missing:
messages.warning(
request,
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
)
return a2_utils.redirect(request, 'profile_edit', params={'next': self.next_url})
return self.redirect()
def create_account(self, request, sub, token, user_info):
email = user_info.get('email')
if email:
# try to create or find an user with this email
try:
user, created = self.get_or_create_user_with_email(email)
except UserOutsideDefaultOu:
user = None
except User.MultipleObjectsReturned:
user = None
if not user:
messages.warning(
request,
_(
'Your FranceConnect email address \'%s\' is already used by another '
'account, so we cannot create an account for you. Please connect '
'with you existing account or create an '
'account with another email address then link it to FranceConnect '
'using your account management page.'
)
% email,
)
return None
if not created and user.fc_accounts.exists():
messages.warning(
request,
_(
'Your FranceConnect email address "%(email)s" is already used by the FranceConnect '
'account of "%(user)s", so we cannot create an account for you. Please create '
'an account with another email address then link it to FranceConnect '
'using your account management page.'
)
% {'email': email, 'user': user.get_full_name()},
)
else: # no email, we cannot disembiguate users, let's create it anyway
user = User.objects.create()
created = True
try:
if created:
user.set_unusable_password()
user.save()
# As we intercept IntegrityError and we can never be sure if we are
# in a transaction or not, we must use one to prevent later SQL
# queries to fail.
with transaction.atomic():
models.FcAccount.objects.create(
user=user,
sub=sub,
order=0,
token=json.dumps(token),
user_info=json.dumps(user_info),
)
except IntegrityError:
# uniqueness check failed, as the user is new, it can only mean that the sub is not unique
# let's try again
if created:
user.delete()
return a2_utils.authenticate(request, sub=sub, token=token, user_info=user_info)
except Exception:
# if anything unexpected happen and user was created, delete it and re-raise
if created:
user.delete()
raise
else:
if created:
logger.info('auth_fc: new account "%s" created with FranceConnect sub "%s"', user, sub)
hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
else:
logger.info('auth_fc: existing account "%s" linked to FranceConnect sub "%s"', user, sub)
hooks.call_hooks('event', name='fc-link', user=user, sub=sub, request=request)
return a2_utils.authenticate(request, sub=sub, user_info=user_info, token=token)
def uniqueness_check_failed(self, request):
# currently logged :
if models.FcAccount.objects.filter(user=request.user, order=0).count():
# cannot link because we are already linked to another FC account
messages.error(request, _('Your account is already linked to a FranceConnect account'))
else:
# cannot link because the FC account is already linked to another account.
messages.error(
request,
_('The FranceConnect account {} is already linked with another account.').format(
self.fc_display_name
),
)
return self.redirect()
def update_user_info(self, user, user_info):
# always handle given_name and family_name
updated = []
if user_info.get('given_name') and user.first_name != user_info['given_name']:
user.first_name = user_info['given_name']
updated.append('given name: "%s"' % user_info['given_name'])
if user_info.get('family_name') and user.last_name != user_info['family_name']:
user.last_name = user_info['family_name']
updated.append('family name: "%s"' % user_info['family_name'])
if updated:
user.save()
logger.debug('auth_fc: updated (%s)', ' - '.join(updated))
apply_user_info_mappings(user, user_info)
return user
def get_or_create_user_with_email(self, email):
ou = get_default_ou()
if a2_app_settings.A2_EMAIL_IS_UNIQUE:
instance, created = safe_get_or_create(User, email=email, defaults={'email': email, 'ou': ou})
if instance.ou != ou:
assert not created # should not be possible
raise UserOutsideDefaultOu
return instance, created
elif ou.email_is_unique:
return safe_get_or_create(User, ou=ou, email=email, defaults={'email': email, 'ou': ou})
else:
return User.objects.create(email=email), True
login_or_link = LoginOrLinkView.as_view()
class UnlinkView(FormView):
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
url = reverse('account_management')
if app_settings.logout_when_unlink:
# logout URL can be None if not session exists with FC
url = build_logout_url(self.request, next_url=url) or url
return url
def get_form_class(self):
form_class = Form
if self.must_set_password():
form_class = SetPasswordForm
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super().get_form_kwargs(**kwargs)
if self.must_set_password():
kwargs['user'] = self.request.user
return kwargs
def must_set_password(self):
for event in self.request.session.get(constants.AUTHENTICATION_EVENTS_SESSION_KEY, []):
if event['how'].startswith('password'):
return False
return self.request.user.can_change_password()
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied()
# We prevent unlinking if the user has no usable password and can't change it
# because we assume that the password is the unique other mean of authentication
# and unlinking would make the account unreachable.
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
if self.must_set_password():
form.save()
update_session_auth_hash(self.request, self.request.user)
logger.info('auth_fc: user %s has set a password', self.request.user)
links = models.FcAccount.objects.filter(user=self.request.user)
for link in links:
logger.info('auth_fc: user %s unlinked from %s', self.request.user, link)
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
links.delete()
response = super().form_valid(form)
if app_settings.logout_when_unlink:
response.display_message = False
clean_fc_session(self.request.session)
return response
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if self.must_set_password():
context['no_password'] = True
return context
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super().post(request, *args, **kwargs)
unlink = UnlinkView.as_view()
class LogoutReturnView(View):
def get(self, request, *args, **kwargs):
state = request.GET.get('state')
clean_fc_session(request.session)
states = request.session.pop('fc_states', None)
next_url = None
if states and state in states:
next_url = states[state].get('next')
if not next_url:
next_url = reverse('auth_logout')
return HttpResponseRedirect(next_url)
logout = LogoutReturnView.as_view()