authentic/src/authentic2_auth_fc/views.py

610 lines
25 KiB
Python

# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
import urllib.parse
import uuid
import requests
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.core import signing
from django.core.cache import InvalidCacheBackendError, caches
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError
from django.forms import Form
from django.http import Http404, HttpResponseRedirect
from django.shortcuts import render, resolve_url
from django.urls import reverse
from django.utils.http import is_safe_url, urlencode
from django.utils.translation import ugettext as _
from django.views.generic import FormView, View
from requests_oauthlib import OAuth2Session
try:
from django.contrib.auth.views import update_session_auth_hash
except ImportError:
update_session_auth_hash = None
from authentic2 import app_settings as a2_app_settings
from authentic2 import constants, hooks
from authentic2 import models as a2_models
from authentic2 import utils as a2_utils
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.forms.passwords import SetPasswordForm
from authentic2.utils import views as views_utils
from authentic2.utils.service import get_service_from_request, set_service_ref
from . import app_settings, models, utils
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(LoggerMixin, *args, **kwargs)
try:
cache = caches['fc']
except InvalidCacheBackendError:
cache = caches['default']
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes, logger):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
state = str(uuid.uuid4())
states = request.session.setdefault('fc_states', {})
states[state] = {
'redirect_uri': redirect_uri,
}
request.session.modified = True
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'nonce': state,
'acr_values': 'eidas1',
}
logger.debug('query string %s', params)
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('redirect to %s', url)
response = HttpResponseRedirect(url)
response.display_message = False
return response
def resolve_access_token(authorization_code, redirect_uri, logger):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('access token request %s', data)
try:
session = utils.requests_retry_session()
response = session.post(
app_settings.token_url,
data=data,
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
if response.status_code != 200:
try:
data = response.json()
logger.error(u'oauth2 error on access token retrieval: %r', data)
except ValueError:
data = {}
logger.error(u'oauth2 error on access token retrieval: %r', response.content[:1024])
return
except requests.exceptions.RequestException as e:
logger.error(u'unable to retrieve access token {}'.format(e))
else:
try:
response = response.json()
logger.debug('token resolved : %s', response)
return response
except ValueError:
logger.error(
'no JSON object can be decoded from the data received from %s: %r',
app_settings.token_url,
response.content[:1024],
)
def access_token_from_request(request, logger):
"""Resolve an access token given a request returning from the authorization
endpoint.
"""
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return
if not state:
return
states = request.session.get('fc_states', {})
if state not in states:
return
# there should not be many FC SSO in flight
redirect_uri = states[state]['redirect_uri']
return resolve_access_token(code, redirect_uri, logger)
ACCESS_GRANT_CODE = 'accessgrantcode'
def clean_fc_session(session):
session.pop('fc_id_token', None)
session.pop('fc_id_token_raw', None)
session.pop('fc_user_info', None)
session.pop('fc_data', None)
class FcOAuthSessionViewMixin(LoggerMixin):
'''Add the OAuth2 dance to a view'''
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
user_info = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request):
if request.method == 'POST':
redirect_to = request.POST.get(
self.redirect_field_name, request.GET.get(self.redirect_field_name, '')
)
else:
redirect_to = request.GET.get(self.redirect_field_name, '')
safe = is_safe_url(url=redirect_to, allowed_hosts=request.get_host())
if not safe:
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
"""Show a page to close the current popup and reload the parent window
with the return url.
"""
return render(request, 'authentic2_auth_fc/close-popup-redirect.html', {'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return a2_utils.redirect(request, next_url, *args, resolve=False, **kwargs)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args, **kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request)
here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url})
here = a2_utils.make_url(here, **kwargs)
there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here})
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
return list(set(['openid'] + app_settings.scopes))
def get_ressource(self, url, verify):
try:
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
data.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error('unable to retrieve ressource from %s due to %s', url, e)
else:
try:
data = data.json()
self.logger.debug('ressource resolved: %s', data)
return data
except ValueError:
self.logger.error(
'no JSON object can be decoded from the data received from %s: %r', url, data.content
)
def get_user_info(self):
return self.get_ressource(
app_settings.userinfo_url + '?schema=openid', app_settings.verify_certificate
)
def get_data(self, scopes=[]):
data = dict()
if not app_settings.fd_list:
return data
for scope in scopes:
for fd in app_settings.fd_list[scope]:
url = fd['url']
if fd['query_dic']:
url += '?' + urlencode(fd['query_dic'])
d = self.get_ressource(url, app_settings.verify_certificate)
if d:
data.setdefault(scope, []).append((fd['name'], d))
return data
def authorization_error(self, request, *args, **kwargs):
error = request.GET.get('error')
error_description = request.GET.get('error_description')
msg = _('Unable to connect to FranceConnect: "%s".') % error
if error_description:
msg += _('("%s")') % error_description
messages.error(request, msg)
self.logger.warning(
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
)
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request, self.logger)
# Token request may not be completly processed and result in no
# token
if not self.token:
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
# The token request may fail, 'error' is then required.
# A bad client secret results in error equals to invalid_request
# for FC and invalid_client for oidc_provider.
if 'error' in self.token:
msg = 'token request failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(
request, _('Unable to connect to FranceConnect: "%s".') % self.token['error']
)
return self.redirect(request)
key = app_settings.client_secret
# duck-type unicode/Py3 strings
if hasattr(key, 'isdecimal'):
key = key.encode('utf-8')
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
)
if not self.id_token:
self.logger.error(u'validation of id_token failed: %s', error)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
nonce = self.id_token.get('nonce')
states = request.session.get('fc_states', {})
if not nonce or nonce not in states:
self.logger.error(
u'invalid nonce in id_token %s, known ones %s', nonce, u', '.join(states.keys())
)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc id_token %s', self.id_token)
for key in self.id_token:
setattr(self, key, self.id_token[key])
self.oauth_session = lambda: utils.requests_retry_session(
session=OAuth2Session(app_settings.client_id, token=self.token)
)
self.user_info = self.get_user_info()
if not self.user_info:
self.logger.error('userinfo resolution failed: %s', self.token)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc user_info %s', self.user_info)
self.request.session['fc_id_token'] = self.id_token
self.request.session['fc_id_token_raw'] = self.token['id_token']
self.request.session['fc_user_info'] = self.user_info
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.data = self.get_data(scopes)
self.logger.debug('fc data %s', self.data)
fc_data = self.request.session.setdefault('fc_data', {})
for scope in self.data:
fc_data.setdefault(scope, []).extend(self.data[scope])
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
return super(FcOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
scopes = self.get_scopes()
if 'fd_scopes' in request.GET:
scopes = list(set(scopes) | set(request.GET['fd_scopes'].split()))
return ask_authorization(request, scopes, self.logger)
@property
def fc_display_name(self):
'''Human representation of the current FC account'''
display_name = ''
user_info = self.user_info or {}
family_name = user_info.get('family_name')
given_name = user_info.get('given_name')
if given_name:
display_name += given_name
if family_name:
if display_name:
display_name += ' '
display_name += family_name
return display_name
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.GET
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
"""Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
"""
def update_user_info(self):
self.fc_account.token = json.dumps(self.token)
self.fc_account.user_info = json.dumps(self.user_info)
self.fc_account.save(update_fields=['token', 'user_info'])
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
self.logger.debug('updating user_info %s', self.fc_account.user_info)
def uniqueness_check_failed(self, request):
if request.user.is_authenticated:
# currently logged :
if models.FcAccount.objects.filter(user=request.user, order=0).count():
# cannot link because we are already linked to another FC account
messages.error(request, _('Your account is already linked to a FranceConnect account'))
else:
# cannot link because the FC account is already linked to another account.
messages.error(
request,
_('The FranceConnect account {} is already' ' linked with another account.').format(
self.fc_display_name
),
)
else:
# not logged, cannot login because the user is disabled (user.is_active is False)
messages.error(request, _('Your account is disabled.'))
return self.redirect(request)
def get(self, request, *args, **kwargs):
'''Request an access grant code and associate it to the current user'''
self.service = get_service_from_request(request)
if request.user.is_authenticated:
# Prevent to add a link with an FC account already linked with another user.
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
sub=self.sub, user=request.user, order=0, defaults={'token': json.dumps(self.token)}
)
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
if created:
self.logger.info('fc link created sub %s', self.sub)
messages.info(
request, _('Your FranceConnect account {} has been linked.').format(self.fc_display_name)
)
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
else:
messages.info(request, _('Your local account has been updated.'))
self.update_user_info()
return self.redirect(request)
default_ou = get_default_ou()
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
email_present_and_unique = self.user_info.get('email') and email_is_unique
user_filter = {}
if email_present_and_unique:
user_filter = {'email__iexact': self.user_info['email']}
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
user_filter['ou'] = default_ou
user = a2_utils.authenticate(
request, sub=self.sub, user_info=self.user_info, token=self.token, user_filter=user_filter
)
# ignore user if sub is not matching and let the code below handle it
if not user.fc_accounts.filter(sub=self.sub).exists():
user = None
if user:
self.fc_account = user.fc_accounts.get(order=0)
if not user and email_present_and_unique:
email = self.user_info['email']
User = get_user_model()
qs = User.objects.filter(**user_filter)
if qs.exists():
# there should not be multiple accounts with the same mail
if len(qs) > 1:
self.logger.error(u'multiple accounts with the same mail %s, %s', email, list(qs))
# ok we have one account
elif len(qs) == 1:
user = qs[0]
# but does he have already a link to an FC account ?
if not user.fc_accounts.exists():
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
defaults={'token': json.dumps(self.token)}, sub=self.sub, user=user, order=0
)
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
if created:
self.logger.info(u'fc link created sub %s user %s', self.sub, user)
hooks.call_hooks(
'event', name='fc-link', user=user, sub=self.sub, request=request
)
user = a2_utils.authenticate(
request=request,
sub=self.sub,
user_info=self.user_info,
token=self.token,
user_filter=user_filter,
)
else:
messages.warning(
request,
_(
'Your FranceConnect email address \'%s\' is already used by another '
'account, so we cannot create an account for you. Please create an '
'account with another email address then link it to FranceConnect '
'using your account management page.'
)
% email,
)
return self.redirect(request)
if user:
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service=self.service)
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
self.fc_account.token = json.dumps(self.token)
self.fc_account.save(update_fields=['token'])
self.update_user_info()
self.logger.info('logged in using fc sub %s', self.sub)
# redirect to account edit page if any required attribute is missing
data = utils.get_mapped_attributes_flat(request)
required_attributes = a2_models.Attribute.objects.filter(required=True).values_list(
'name', flat=True
)
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_attributes)
missing = [attr for attr in set(required) - set(data) if not getattr(user.attributes, attr)]
if missing:
messages.warning(
request,
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
)
return a2_utils.redirect(request, 'profile_edit', keep_params=True)
return self.redirect(request)
else:
params = {}
if self.service:
set_service_ref(params, self.service)
messages.info(
request, _('If you already have an account, please log in, else ' 'create your account.')
)
login_params = params.copy()
if not app_settings.show_button_quick_account_creation:
login_params['nofc'] = 1
login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
return self.redirect_and_come_back(request, login_url, params=params)
class UnlinkView(LoggerMixin, FormView):
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
url = reverse('account_management')
if app_settings.logout_when_unlink:
# logout URL can be None if not session exists with FC
url = utils.build_logout_url(self.request, next_url=url) or url
clean_fc_session(self.request.session)
return url
def get_form_class(self):
form_class = Form
if self.must_set_password():
form_class = SetPasswordForm
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
if self.must_set_password():
kwargs['user'] = self.request.user
return kwargs
def must_set_password(self):
for event in self.request.session.get(constants.AUTHENTICATION_EVENTS_SESSION_KEY, []):
if event['how'].startswith('password'):
return False
return self.request.user.can_change_password()
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied()
# We prevent unlinking if the user has no usable password and can't change it
# because we assume that the password is the unique other mean of authentication
# and unlinking would make the account unreachable.
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
def form_valid(self, form):
if self.must_set_password():
form.save()
update_session_auth_hash(self.request, self.request.user)
self.logger.info(u'user %s has set a password', self.request.user)
links = models.FcAccount.objects.filter(user=self.request.user)
for link in links:
self.logger.info(u'user %s unlinked from %s', self.request.user, link)
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
links.delete()
response = super(UnlinkView, self).form_valid(form)
if app_settings.logout_when_unlink:
response.display_message = False
return response
def get_context_data(self, **kwargs):
context = super(UnlinkView, self).get_context_data(**kwargs)
if self.must_set_password():
context['no_password'] = True
return context
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super(UnlinkView, self).post(request, *args, **kwargs)
login_or_link = LoginOrLinkView.as_view()
unlink = UnlinkView.as_view()
class LogoutReturnView(View):
def get(self, request, *args, **kwargs):
state = request.GET.get('state')
clean_fc_session(request.session)
states = request.session.pop('fc_states', None)
next_url = None
if states and state in states:
next_url = states[state].get('next')
if not next_url:
next_url = reverse('auth_logout')
return HttpResponseRedirect(next_url)
logout = LogoutReturnView.as_view()