diff --git a/src/authentic2_auth_fc/app_settings.py b/src/authentic2_auth_fc/app_settings.py index 1dfd60db8..8a04e967a 100644 --- a/src/authentic2_auth_fc/app_settings.py +++ b/src/authentic2_auth_fc/app_settings.py @@ -59,20 +59,6 @@ class AppSettings(object): def logout_when_unlink(self): return self._setting('LOGOUT_WHEN_UNLINK', True) - @property - def logout_at_unlink_return_url(self): - return self._setting('LOGOUT_AT_UNLINK_RETURN_URL', '/accounts/') - - @property - def enable_registration_form_prefill(self): - return self._setting('ENABLE_REGISTRATION_FORM_PREFILL', True) - - @property - def attributes_mapping(self): - return self._setting( - 'ATTRIBUTES_MAPPING', {'family_name': 'last_name', 'given_name': 'first_name', 'email': 'email'} - ) - @property def user_info_mappings(self): return self._setting( @@ -90,10 +76,6 @@ class AppSettings(object): }, ) - @property - def next_field_name(self): - return self._setting('NEXT_FIELD_NAME', 'fc_next') - @property def client_id(self): return self._setting('CLIENT_ID', '') @@ -110,26 +92,10 @@ class AppSettings(object): def client_credentials(self): return self._setting('CLIENT_CREDENTIALS', ()) - @property - def show_button_quick_account_creation(self): - return self._setting('SHOW_BUTTON_QUICK_ACCOUNT_CREATION', True) - - @property - def auto_register(self): - return self._setting('AUTO_REGISTER', True) - - @property - def fd_list(self): - return self._setting('FD_LIST', {}) - @property def scopes(self): return self._setting('SCOPES', ['profile', 'email']) - @property - def popup(self): - return self._setting('POPUP', False) - app_settings = AppSettings('A2_FC_') app_settings.__name__ = __name__ diff --git a/src/authentic2_auth_fc/apps.py b/src/authentic2_auth_fc/apps.py index e36df5ffa..3248b515e 100644 --- a/src/authentic2_auth_fc/apps.py +++ b/src/authentic2_auth_fc/apps.py @@ -32,13 +32,6 @@ class Plugin(object): return [url] return [] - def registration_form_prefill(self, request): - from . import utils - - if app_settings.enable_registration_form_prefill: - return [utils.get_mapped_attributes(request)] - return [] - class AppConfig(django.apps.AppConfig): name = 'authentic2_auth_fc' diff --git a/src/authentic2_auth_fc/authenticators.py b/src/authentic2_auth_fc/authenticators.py index decafa5d0..b28ade7bf 100644 --- a/src/authentic2_auth_fc/authenticators.py +++ b/src/authentic2_auth_fc/authenticators.py @@ -47,19 +47,13 @@ class FcAuthenticator(BaseAuthenticator): return fc_user_info = request.session.get('fc_user_info') context = kwargs.pop('context', {}).copy() - params = {} - if app_settings.popup: - params['popup'] = '' context.update( { - 'popup': app_settings.popup, 'about_url': app_settings.about_url, 'fc_user_info': fc_user_info, } ) - context['login_url'] = a2_utils.make_url( - 'fc-login-or-link', keep_params=True, params=params, request=request - ) + context['login_url'] = a2_utils.make_url('fc-login-or-link', keep_params=True, request=request) context['block-extra-css-class'] = 'fc-login' template = 'authentic2_auth_fc/login.html' return TemplateResponse(request, template, context) @@ -74,14 +68,11 @@ class FcAuthenticator(BaseAuthenticator): params = { 'next': account_path, } - if app_settings.popup: - params['popup'] = '' link_url = a2_utils.make_url('fc-login-or-link', params=params) context = kwargs.pop('context', {}).copy() context.update( { - 'popup': app_settings.popup, 'unlink': unlink, 'about_url': app_settings.about_url, 'link_url': link_url, diff --git a/src/authentic2_auth_fc/backends.py b/src/authentic2_auth_fc/backends.py index 72685a8be..d5bc74063 100644 --- a/src/authentic2_auth_fc/backends.py +++ b/src/authentic2_auth_fc/backends.py @@ -20,19 +20,15 @@ import logging from django.contrib.auth import get_user_model from django.contrib.auth.backends import ModelBackend from django.core.exceptions import MultipleObjectsReturned, PermissionDenied -from django.db import IntegrityError -from authentic2 import hooks -from authentic2.a2_rbac.utils import get_default_ou - -from . import models, utils +from . import models logger = logging.getLogger(__name__) +User = get_user_model() class FcBackend(ModelBackend): - def authenticate(self, request=None, sub=None, **kwargs): - user_info = kwargs.get('user_info') + def authenticate(self, request, sub, token, user_info): user = None try: try: @@ -40,55 +36,13 @@ class FcBackend(ModelBackend): except MultipleObjectsReturned: account = models.FcAccount.objects.select_related().get(sub=sub, order=0) except models.FcAccount.DoesNotExist: - logger.debug(u'user with the sub %s does not exist.', sub) - else: - user = account.user - logger.debug(u'found user %s with sub %s', user, sub) - if not user.is_active: - logger.info(u'user %s login refused, it is inactive', user) - raise PermissionDenied - if user_info: - User = get_user_model() - user_qs = User.objects.filter(**kwargs.get('user_filter', {})) - if user_qs.count() > 1: - return - if user_qs.exists(): - user = user_qs.get() + return None - if not user: - user = User(ou=get_default_ou()) - user.set_unusable_password() - user.save() - try: - models.FcAccount.objects.create( - user=user, sub=sub, order=0, token=json.dumps(kwargs['token']) - ) - except IntegrityError: - # uniqueness check failed, as the user is new, it can only means that the sub is not unique - # let's try again - user.delete() - return self.authenticate(sub, **kwargs) - else: - logger.debug( - u'user creation enabled with fc_account (sub : %s - token : %s)', - sub, - json.dumps(kwargs['token']), - ) - hooks.call_hooks('event', name='fc-create', user=user, sub=sub) + if not account.user.is_active: + logger.info('auth_fc: login refused for user %s, it is inactive', user) + raise PermissionDenied - # always handle given_name and family_name - updated = [] - if user_info.get('given_name') and user.first_name != user_info['given_name']: - user.first_name = user_info['given_name'] - updated.append('given name: "%s"' % user_info['given_name']) - if user_info.get('family_name') and user.last_name != user_info['family_name']: - user.last_name = user_info['family_name'] - updated.append('family name: "%s"' % user_info['family_name']) - if updated: - user.save() - logger.debug('updated (%s)', ' - '.join(updated)) - utils.apply_user_info_mappings(user, user_info) - return user + return account.user def get_saml2_authn_context(self): import lasso diff --git a/src/authentic2_auth_fc/static/authentic2_auth_fc/js/fc.js b/src/authentic2_auth_fc/static/authentic2_auth_fc/js/fc.js deleted file mode 100644 index 198c27ac7..000000000 --- a/src/authentic2_auth_fc/static/authentic2_auth_fc/js/fc.js +++ /dev/null @@ -1,31 +0,0 @@ -/* Open FranceConnect in popup */ - - -(function(undef) { - function PopupCenter(url, title, w, h) { - // Fixes dual-screen position Most browsers Firefox - var dualScreenLeft = window.screenLeft != undefined ? window.screenLeft : window.screenX; - var dualScreenTop = window.screenTop != undefined ? window.screenTop : window.screenY; - - var width = window.innerWidth ? window.innerWidth : document.documentElement.clientWidth ? document.documentElement.clientWidth : screen.width; - var height = window.innerHeight ? window.innerHeight : document.documentElement.clientHeight ? document.documentElement.clientHeight : screen.height; - - var left = ((width / 2) - (w / 2)) + dualScreenLeft; - var top = ((height / 2) - (h / 2)) + dualScreenTop; - var newWindow = window.open(url, title, 'noopener,noreferrer,location=0,status=0,menubar=0,toolbar=0,scrollbars=yes, width=' + w + ', height=' + h + ', top=' + top + ', left=' + left); - newWindow.opener = null; - - // Puts focus on the newWindow - if (window.focus) { - newWindow.focus(); - } - } - var tags = document.getElementsByClassName('js-fc-popup'); - for (var i = 0; i < tags.length; i++) { - var tag = tags[i]; - tag.onclick = function (ev) { - PopupCenter(this.href, 'Authentification FranceConnect', 700, 500); - return false; - }; - } -})(); diff --git a/src/authentic2_auth_fc/templates/authentic2_auth_fc/linking.html b/src/authentic2_auth_fc/templates/authentic2_auth_fc/linking.html index 8bae41d84..b34a2c45a 100644 --- a/src/authentic2_auth_fc/templates/authentic2_auth_fc/linking.html +++ b/src/authentic2_auth_fc/templates/authentic2_auth_fc/linking.html @@ -18,7 +18,7 @@
{% trans "Link with a FranceConnect account" %}
- + {% trans "Link with a FranceConnect account" %}
@@ -29,4 +29,3 @@

{% trans "What is FranceConnect?" %}

-{% if popup %}{% endif %} diff --git a/src/authentic2_auth_fc/templates/authentic2_auth_fc/login.html b/src/authentic2_auth_fc/templates/authentic2_auth_fc/login.html index 20315b54a..32eb68520 100644 --- a/src/authentic2_auth_fc/templates/authentic2_auth_fc/login.html +++ b/src/authentic2_auth_fc/templates/authentic2_auth_fc/login.html @@ -6,11 +6,10 @@
+ class="button connexion"> {% trans 'Log in with FranceConnect' %}
{% include "authentic2_auth_fc/explanation.html" %} -{% if popup %}{% endif %} {% endblock %} diff --git a/src/authentic2_auth_fc/utils.py b/src/authentic2_auth_fc/utils.py index b9602d48f..f5bc10bc4 100644 --- a/src/authentic2_auth_fc/utils.py +++ b/src/authentic2_auth_fc/utils.py @@ -57,24 +57,6 @@ def build_logout_url(request, next_url=None): return None -def get_mapped_attributes(request): - values = {} - if 'fc_user_info' in request.session: - for fc_name, local_name in app_settings.attributes_mapping.items(): - if fc_name in request.session['fc_user_info']: - values[local_name] = [request.session['fc_user_info'][fc_name]] - return values - - -def get_mapped_attributes_flat(request): - values = {} - if 'fc_user_info' in request.session: - for fc_name, local_name in app_settings.attributes_mapping.items(): - if fc_name in request.session['fc_user_info']: - values[local_name] = request.session['fc_user_info'][fc_name] - return values - - def get_ref(ref, user_info): if not hasattr(user_info, 'items'): return None @@ -202,3 +184,61 @@ def requests_retry_session( # set proxies session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {})) return session + + +class RequestError(Exception): + def __init__(self, message, **details): + super().__init__(message) + self.details = details + + def __str__(self): + s = super().__str__() + if self.details: + s += ' (' + s += ' '.join('%s=%r' % (key, self.details[key]) for key in self.details) + s += ')' + return s + + +def request_json(method, url, data=None, session=None, expected_statuses=None): + session = requests_retry_session(session=session) + try: + response = getattr(session, method)( + url, + data=data, + verify=app_settings.verify_certificate, + allow_redirects=False, + timeout=3, + ) + response.raise_for_status() + except requests.exceptions.HTTPError: + try: + content = response.json() + except ValueError: + content = response.text[:256] + if expected_statuses and response.status_code in expected_statuses: + return content + raise RequestError('status code is not 200', status_code=response.status_code, content=content) + except requests.exceptions.RequestException as e: + raise RequestError('HTTP request failed', exception=e) + try: + content = response.json() + except ValueError: + raise RequestError('content is not JSON', content=response.content[:1024]) + + if not isinstance(content, dict): + raise RequestError('content is not a dict', content=content) + return content + + +def post_json(url, data, expected_statuses=None): + return request_json('post', url, data=data, expected_statuses=expected_statuses) + + +def get_json(url, session, expected_statuses=None): + return request_json('get', url, session=session, expected_statuses=expected_statuses) + + +def clean_fc_session(session): + session.pop('fc_id_token', None) + session.pop('fc_id_token_raw', None) diff --git a/src/authentic2_auth_fc/views.py b/src/authentic2_auth_fc/views.py index f5a527ee6..8c7d36dbb 100644 --- a/src/authentic2_auth_fc/views.py +++ b/src/authentic2_auth_fc/views.py @@ -16,335 +16,79 @@ import json import logging -import urllib.parse -import uuid +import time -import requests from django.conf import settings from django.contrib import messages -from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model -from django.core import signing -from django.core.cache import InvalidCacheBackendError, caches +from django.contrib.auth import get_user_model +from django.contrib.auth.views import update_session_auth_hash +from django.core.cache import cache from django.core.exceptions import PermissionDenied from django.db import IntegrityError from django.forms import Form from django.http import Http404, HttpResponseRedirect -from django.shortcuts import render, resolve_url from django.urls import reverse -from django.utils.http import is_safe_url, urlencode +from django.utils.http import urlencode from django.utils.translation import ugettext as _ from django.views.generic import FormView, View from requests_oauthlib import OAuth2Session -try: - from django.contrib.auth.views import update_session_auth_hash -except ImportError: - update_session_auth_hash = None - from authentic2 import app_settings as a2_app_settings from authentic2 import constants, hooks from authentic2 import models as a2_models from authentic2 import utils as a2_utils from authentic2.a2_rbac.utils import get_default_ou +from authentic2.compat.cookies import set_cookie +from authentic2.crypto import check_hmac_url, hash_chain, hmac_url from authentic2.forms.passwords import SetPasswordForm from authentic2.utils import views as views_utils -from authentic2.utils.service import get_service_from_request, set_service_ref +from authentic2.utils.models import safe_get_or_create +from authentic2.utils.service import get_service_from_ref, get_service_from_request, service_ref -from . import app_settings, models, utils +from . import app_settings, models +from .utils import ( + RequestError, + apply_user_info_mappings, + build_logout_url, + clean_fc_session, + get_json, + post_json, +) + +logger = logging.getLogger(__name__) +User = get_user_model() -class LoggerMixin(object): - def __init__(self, *args, **kwargs): - self.logger = logging.getLogger(__name__) - super(LoggerMixin, *args, **kwargs) +class UserOutsideDefaultOu(Exception): + pass -try: - cache = caches['fc'] -except InvalidCacheBackendError: - cache = caches['default'] - - -CACHE_TIMEOUT = 60 - - -def ask_authorization(request, scopes, logger): - '''Compute an authorize URL for obtaining the given scope''' - if not isinstance(scopes, (list, tuple)): - scopes = [scopes] - redirect_uri = request.build_absolute_uri() - state = str(uuid.uuid4()) - states = request.session.setdefault('fc_states', {}) - states[state] = { - 'redirect_uri': redirect_uri, - } - request.session.modified = True - params = { - 'client_id': app_settings.client_id, - 'scope': ' '.join(scopes), - 'redirect_uri': redirect_uri, - 'response_type': 'code', - 'state': state, - 'nonce': state, - 'acr_values': 'eidas1', - } - logger.debug('query string %s', params) - url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params)) - logger.debug('redirect to %s', url) - response = HttpResponseRedirect(url) - response.display_message = False - return response - - -def resolve_access_token(authorization_code, redirect_uri, logger): - '''Exchange an authorization_code for an access_token''' - data = { - 'code': authorization_code, - 'client_id': app_settings.client_id, - 'client_secret': app_settings.client_secret, - 'redirect_uri': redirect_uri, - 'grant_type': 'authorization_code', - } - logger.debug('access token request %s', data) - try: - session = utils.requests_retry_session() - response = session.post( - app_settings.token_url, - data=data, - verify=app_settings.verify_certificate, - allow_redirects=False, - timeout=3, - ) - if response.status_code != 200: - try: - data = response.json() - logger.error(u'oauth2 error on access token retrieval: %r', data) - except ValueError: - data = {} - logger.error(u'oauth2 error on access token retrieval: %r', response.content[:1024]) - return - except requests.exceptions.RequestException as e: - logger.error(u'unable to retrieve access token {}'.format(e)) - else: - try: - response = response.json() - logger.debug('token resolved : %s', response) - return response - except ValueError: - logger.error( - 'no JSON object can be decoded from the data received from %s: %r', - app_settings.token_url, - response.content[:1024], - ) - - -def access_token_from_request(request, logger): - """Resolve an access token given a request returning from the authorization - endpoint. +class LoginOrLinkView(View): + """Login with FC, if the FC account is already linked, connect this user, + if a user is logged link the user to this account, otherwise display an + error message. """ - code = request.GET.get('code') - state = request.GET.get('state') - if not code: - return - if not state: - return - states = request.session.get('fc_states', {}) - if state not in states: - return - # there should not be many FC SSO in flight - redirect_uri = states[state]['redirect_uri'] - return resolve_access_token(code, redirect_uri, logger) + _next_url = None + service = None -ACCESS_GRANT_CODE = 'accessgrantcode' + @property + def next_url(self): + return self._next_url or a2_utils.select_next_url(self.request, default=settings.LOGIN_REDIRECT_URL) + @property + def redirect_uri(self): + return self.request.build_absolute_uri(reverse('fc-login-or-link')) -def clean_fc_session(session): - session.pop('fc_id_token', None) - session.pop('fc_id_token_raw', None) - session.pop('fc_user_info', None) - session.pop('fc_data', None) - - -class FcOAuthSessionViewMixin(LoggerMixin): - '''Add the OAuth2 dance to a view''' - - redirect_field_name = REDIRECT_FIELD_NAME - in_popup = False - token = None - user_info = None - - def get_in_popup(self): - return self.in_popup - - def redirect_to(self, request): - if request.method == 'POST': - redirect_to = request.POST.get( - self.redirect_field_name, request.GET.get(self.redirect_field_name, '') - ) - else: - redirect_to = request.GET.get(self.redirect_field_name, '') - - safe = is_safe_url(url=redirect_to, allowed_hosts=request.get_host()) - if not safe: - redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL) - return redirect_to - - def close_popup_redirect(self, request, next_url, *args, **kwargs): - """Show a page to close the current popup and reload the parent window - with the return url. - """ - return render(request, 'authentic2_auth_fc/close-popup-redirect.html', {'redirect_to': next_url}) - - def simple_redirect(self, request, next_url, *args, **kwargs): - return a2_utils.redirect(request, next_url, *args, resolve=False, **kwargs) - - def redirect(self, request, *args, **kwargs): - next_url = kwargs.pop('next_url', None) - if next_url is None: - next_url = self.redirect_to(request, *args, **kwargs) - if self.get_in_popup(): - return self.close_popup_redirect(request, next_url, *args, **kwargs) - else: - return self.simple_redirect(request, next_url, *args, **kwargs) - - def redirect_and_come_back(self, request, next_url, *args, **kwargs): - old_next_url = self.redirect_to(request) - here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url}) - here = a2_utils.make_url(here, **kwargs) - there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here}) - return self.redirect(request, next_url=there, *args, **kwargs) - - def get_scopes(self): - return list(set(['openid'] + app_settings.scopes)) - - def get_ressource(self, url, verify): - try: - data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3) - data.raise_for_status() - except requests.exceptions.RequestException as e: - self.logger.error('unable to retrieve ressource from %s due to %s', url, e) - else: - try: - data = data.json() - self.logger.debug('ressource resolved: %s', data) - return data - except ValueError: - self.logger.error( - 'no JSON object can be decoded from the data received from %s: %r', url, data.content - ) - - def get_user_info(self): - return self.get_ressource( - app_settings.userinfo_url + '?schema=openid', app_settings.verify_certificate - ) - - def get_data(self, scopes=[]): - data = dict() - if not app_settings.fd_list: - return data - for scope in scopes: - for fd in app_settings.fd_list[scope]: - url = fd['url'] - if fd['query_dic']: - url += '?' + urlencode(fd['query_dic']) - d = self.get_ressource(url, app_settings.verify_certificate) - if d: - data.setdefault(scope, []).append((fd['name'], d)) - return data - - def authorization_error(self, request, *args, **kwargs): - error = request.GET.get('error') - error_description = request.GET.get('error_description') - msg = _('Unable to connect to FranceConnect: "%s".') % error - if error_description: - msg += _('("%s")') % error_description - messages.error(request, msg) - self.logger.warning( - 'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or '' - ) - return self.redirect(request) - - def dispatch(self, request, *args, **kwargs): - '''Interpret the OAuth authorization dance''' - if 'code' in request.GET: - self.token = access_token_from_request(request, self.logger) - - # Token request may not be completly processed and result in no - # token - if not self.token: - messages.warning(request, _('Unable to connect to FranceConnect.')) - return self.redirect(request) - - # The token request may fail, 'error' is then required. - # A bad client secret results in error equals to invalid_request - # for FC and invalid_client for oidc_provider. - if 'error' in self.token: - msg = 'token request failed : {}'.format(self.token) - self.logger.warning(msg) - messages.warning( - request, _('Unable to connect to FranceConnect: "%s".') % self.token['error'] - ) - return self.redirect(request) - key = app_settings.client_secret - # duck-type unicode/Py3 strings - if hasattr(key, 'isdecimal'): - key = key.encode('utf-8') - self.id_token, error = models.parse_id_token( - self.token['id_token'], client_id=app_settings.client_id, client_secret=key - ) - if not self.id_token: - self.logger.error(u'validation of id_token failed: %s', error) - messages.warning(request, _('Unable to connect to FranceConnect.')) - return self.redirect(request) - nonce = self.id_token.get('nonce') - states = request.session.get('fc_states', {}) - if not nonce or nonce not in states: - self.logger.error( - u'invalid nonce in id_token %s, known ones %s', nonce, u', '.join(states.keys()) - ) - messages.warning(request, _('Unable to connect to FranceConnect.')) - return self.redirect(request) - self.logger.debug('fc id_token %s', self.id_token) - for key in self.id_token: - setattr(self, key, self.id_token[key]) - self.oauth_session = lambda: utils.requests_retry_session( - session=OAuth2Session(app_settings.client_id, token=self.token) - ) - self.user_info = self.get_user_info() - if not self.user_info: - self.logger.error('userinfo resolution failed: %s', self.token) - messages.warning(request, _('Unable to connect to FranceConnect.')) - return self.redirect(request) - self.logger.debug('fc user_info %s', self.user_info) - self.request.session['fc_id_token'] = self.id_token - self.request.session['fc_id_token_raw'] = self.token['id_token'] - self.request.session['fc_user_info'] = self.user_info - if 'fd_scopes' in request.GET: - scopes = request.GET.get('fd_scopes') - scopes = scopes.split() - self.data = self.get_data(scopes) - self.logger.debug('fc data %s', self.data) - fc_data = self.request.session.setdefault('fc_data', {}) - for scope in self.data: - fc_data.setdefault(scope, []).extend(self.data[scope]) - self.logger.debug('fc data in session %s', self.request.session['fc_data']) - return super(FcOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs) - elif 'error' in request.GET: - return self.authorization_error(request, *args, **kwargs) - else: - scopes = self.get_scopes() - if 'fd_scopes' in request.GET: - scopes = list(set(scopes) | set(request.GET['fd_scopes'].split())) - return ask_authorization(request, scopes, self.logger) + def redirect(self): + return a2_utils.redirect(self.request, self.next_url) @property def fc_display_name(self): '''Human representation of the current FC account''' display_name = '' - user_info = self.user_info or {} - family_name = user_info.get('family_name') - given_name = user_info.get('given_name') + family_name = self.user_info.get('family_name') + given_name = self.user_info.get('given_name') if given_name: display_name += given_name if family_name: @@ -353,49 +97,218 @@ class FcOAuthSessionViewMixin(LoggerMixin): display_name += family_name return display_name + def get(self, request, *args, **kwargs): + code = request.GET.get('code') + state = request.GET.get('state') -class PopupViewMixin(object): - def get_in_popup(self): - return 'popup' in self.request.GET - - -class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): - """Login with FC, if the FC account is already linked, connect this user, - if a user is logged link the user to this account, otherwise display an - error message. - """ - - def update_user_info(self): - self.fc_account.token = json.dumps(self.token) - self.fc_account.user_info = json.dumps(self.user_info) - self.fc_account.save(update_fields=['token', 'user_info']) - utils.apply_user_info_mappings(self.fc_account.user, self.user_info) - self.logger.debug('updating user_info %s', self.fc_account.user_info) - - def uniqueness_check_failed(self, request): - if request.user.is_authenticated: - # currently logged : - if models.FcAccount.objects.filter(user=request.user, order=0).count(): - # cannot link because we are already linked to another FC account - messages.error(request, _('Your account is already linked to a FranceConnect account')) - else: - # cannot link because the FC account is already linked to another account. - messages.error( - request, - _('The FranceConnect account {} is already' ' linked with another account.').format( - self.fc_display_name - ), - ) + if code and state: + response = self.handle_authorization_response(request, code=code, state=state) + response.delete_cookie('fc-state', path=reverse('fc-login-or-link')) + return response + elif 'error' in request.GET: + return self.authorization_error( + request, error=request.GET['error'], error_description=request.GET.get('error_description') + ) else: - # not logged, cannot login because the user is disabled (user.is_active is False) - messages.error(request, _('Your account is disabled.')) - return self.redirect(request) + return self.make_authorization_request(request) + + def handle_authorization_response(self, request, code, state): + # check state signature and parse it + try: + state, self._next_url, self.service = self.decode_state(state) + except ValueError: + return a2_utils.redirect(request, settings.LOGIN_REDIRECT_URL) + + # regenerte the chain of hash from the stored nonce_seed + try: + encoded_seed = request.COOKIES.get('fc-state', '') + if not encoded_seed: + raise ValueError + dummy, hash_nonce, hash_state = hash_chain(3, encoded_seed=encoded_seed) + if not state or state != hash_state: + logger.warning('auth_fc: state lost, requesting authorization again') + raise ValueError + except ValueError: + return self.make_authorization_request(request) + + # resolve the authorization_code and check the token endpoint response + self.token = self.resolve_authorization_code(code) + if not self.token: + # resolve_authorization_code already logged a warning. + return self.report_fc_is_down(request) + if 'error' in self.token: + logger.warning('auth_fc: token request failed, "%s"', self.token) + messages.warning( + request, + _('Unable to connect to FranceConnect: "%s".') + % (self.token.get('error_description') or self.token['error']), + ) + return self.redirect() + + # parse the id_token + if not self.token.get('id_token') or not isinstance(self.token['id_token'], str): + logger.warning('auth_fc: token endpoint did not return an id_token') + return self.report_fc_is_down(request) + + key = app_settings.client_secret.encode() + self.id_token, error = models.parse_id_token( + self.token['id_token'], client_id=app_settings.client_id, client_secret=key + ) + if not self.id_token: + logger.warning('auth_fc: validation of id_token failed: %s', error) + return self.report_fc_is_down(request) + logger.debug('auth_fc: parsed id_token %s', self.id_token) + + nonce = self.id_token.get('nonce') + if nonce != hash_nonce: + logger.warning('auth_fc: invalid nonce in id_token') + return self.report_fc_is_down(request) + + self.sub = self.id_token.get('sub') + if not self.sub: + logger.warning('auth_fc: no sub in id_token %s', self.id_token) + return self.report_fc_is_down(request) + + # get user info using the access token + if not self.token.get('access_token') or not isinstance(self.token['access_token'], str): + logger.warning('auth_fc: token endpoint did not return an access_token') + return self.report_fc_is_down(request) + + self.user_info = self.get_user_info() + if self.user_info is None: + return self.report_fc_is_down(request) + logger.debug('auth_fc: user_info %s', self.user_info) + + # clear FranceConnect down status + cache.delete('fc_is_down') + + if request.user.is_authenticated: + return self.link(request) + else: + return self.login(request) + + def encode_state(self, state, next_url, service): + encoded_state = state + ' ' + self.next_url + ' ' + if service: + encoded_state += service_ref(service) + encoded_state += ' ' + hmac_url(settings.SECRET_KEY, encoded_state) + return encoded_state + + def decode_state(self, state): + payload, signature = state.rsplit(' ', 1) + if not check_hmac_url(settings.SECRET_KEY, payload, signature): + raise ValueError + # service_ref can be made of one or two parts + try: + state, next_url, service_ref = payload.split(' ') + except ValueError: + state, next_url, ou_slug, service_slug = payload.split(' ') + service_ref = ou_slug + ' ' + service_slug + service = get_service_from_ref(service_ref) + return state, next_url, service + + def make_authorization_request(self, request): + scope = ' '.join(set(['openid'] + app_settings.scopes)) + service = self.service or get_service_from_request(request) + + nonce_seed, nonce, state = hash_chain(3) + + # encode the target service and next_url in the state + full_state = state + ' ' + self.next_url + ' ' + if service: + full_state += service_ref(service) + full_state += ' ' + hmac_url(settings.SECRET_KEY, full_state) + params = { + 'client_id': app_settings.client_id, + 'scope': scope, + 'redirect_uri': self.redirect_uri, + 'response_type': 'code', + 'state': self.encode_state(state, self.next_url, service), + 'nonce': nonce, + 'acr_values': 'eidas1', + } + url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params)) + logger.debug('auth_fc: authorization_request redirect to %s', url) + + response = HttpResponseRedirect(url) + # prevent unshown messages to block the navigation to FranceConnect + response.display_message = False + + # store nonce_seed in a browser cookie to prevent CSRF and check nonce + # in id_token on return by generating the hash chain again + set_cookie( + response, + 'fc-state', + value=nonce_seed, + path=reverse('fc-login-or-link'), + httponly=True, + secure=request.is_secure(), + samesite='Lax', + ) + return response + + def resolve_authorization_code(self, authorization_code): + '''Exchange an authorization_code for an access_token''' + data = { + 'code': authorization_code, + 'client_id': app_settings.client_id, + 'client_secret': app_settings.client_secret, + 'redirect_uri': self.redirect_uri, + 'grant_type': 'authorization_code', + } + logger.debug('auth_fc: resolve_access_token request params %s', data) + + try: + token = post_json(app_settings.token_url, data, expected_statuses=[400]) + except RequestError as e: + logger.warning('auth_fc: resolve_authorization_code error %s', e) + return None + else: + logger.debug('auth_fc: token endpoint returned "%s"', token) + return token + + def get_user_info(self): + try: + data = get_json( + app_settings.userinfo_url + '?schema=openid', + session=OAuth2Session(app_settings.client_id, token=self.token), + ) + except RequestError as e: + logger.warning('auth_fc: get_user_info error %s', e) + return None + logger.debug('auth_fc: get_user_info returned %r', data) + return data + + def authorization_error(self, request, error, error_description): + messages.error(request, _('Unable to connect to FranceConnect: "%s".') % (error_description or error)) + logger.warning( + 'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or '' + ) + return self.redirect() + + def report_fc_is_down(self, request): + messages.warning(request, _('Unable to connect to FranceConnect.')) + # put FranceConnect status in cache, if it happens for more than 5 minutes, log an error + last_down = cache.get('fc_is_down') + now = time.time() + more_than_5_minutes = last_down and (now - last_down) > 5 * 60 + if more_than_5_minutes: + logger.error('auth_fc: FranceConnect is down for more than 5 minutes') + if not last_down or more_than_5_minutes: + cache.set('fc_is_down', now, 10 * 60) + return self.redirect() def link(self, request): '''Request an access grant code and associate it to the current user''' try: self.fc_account, created = models.FcAccount.objects.get_or_create( - sub=self.sub, user=request.user, order=0, defaults={'token': json.dumps(self.token)} + sub=self.sub, + user=request.user, + order=0, + defaults={ + 'token': json.dumps(self.token), + 'user_info': json.dumps(self.user_info), + }, ) # Prevent adding a link with an FC account already linked with another user. except IntegrityError: @@ -403,139 +316,170 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): return self.uniqueness_check_failed(request) if created: - self.logger.info('fc link created sub %s', self.sub) + logger.info('auth_fc: link created sub %s', self.sub) messages.info( request, _('Your FranceConnect account {} has been linked.').format(self.fc_display_name) ) hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request) - else: - messages.info(request, _('Your local account has been updated.')) - self.update_user_info() - return self.redirect(request) - - def get(self, request, *args, **kwargs): - if request.user.is_authenticated: - return self.link(request) - else: - return self.login(request) + self.update_user_info(request.user, self.user_info) + return self.redirect() def login(self, request): - self.service = get_service_from_request(request) - default_ou = get_default_ou() - email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique - email_present_and_unique = self.user_info.get('email') and email_is_unique - user_filter = {} - if email_present_and_unique: - user_filter = {'email__iexact': self.user_info['email']} - if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique: - user_filter['ou'] = default_ou + user = a2_utils.authenticate(request, sub=self.sub, user_info=self.user_info, token=self.token) - user = a2_utils.authenticate( - request, sub=self.sub, user_info=self.user_info, token=self.token, user_filter=user_filter - ) + if not user: + user = self.create_account(request, sub=self.sub, token=self.token, user_info=self.user_info) - # ignore user if sub is not matching and let the code below handle it - if not user.fc_accounts.filter(sub=self.sub).exists(): - user = None + if not user: + return self.redirect() - if user: - self.fc_account = user.fc_accounts.get(order=0) - if not user and email_present_and_unique: - email = self.user_info['email'] - User = get_user_model() - qs = User.objects.filter(**user_filter) + return self.finish_login(request, user, self.user_info) - if qs.exists(): - # there should not be multiple accounts with the same mail - if len(qs) > 1: - self.logger.error(u'multiple accounts with the same mail %s, %s', email, list(qs)) - # ok we have one account - elif len(qs) == 1: - user = qs[0] - # but does he have already a link to an FC account ? - if not user.fc_accounts.exists(): - try: - self.fc_account, created = models.FcAccount.objects.get_or_create( - defaults={'token': json.dumps(self.token)}, sub=self.sub, user=user, order=0 - ) - except IntegrityError: - # unique index check failed, find why. - return self.uniqueness_check_failed(request) + def finish_login(self, request, user, user_info): + self.update_user_info(user, user_info) + views_utils.check_cookie_works(request) + a2_utils.login(request, user, 'france-connect', service=self.service) - if created: - self.logger.info(u'fc link created sub %s user %s', self.sub, user) - hooks.call_hooks( - 'event', name='fc-link', user=user, sub=self.sub, request=request - ) - user = a2_utils.authenticate( - request=request, - sub=self.sub, - user_info=self.user_info, - token=self.token, - user_filter=user_filter, - ) - else: - messages.warning( - request, - _( - 'Your FranceConnect email address \'%s\' is already used by another ' - 'account, so we cannot create an account for you. Please create an ' - 'account with another email address then link it to FranceConnect ' - 'using your account management page.' - ) - % email, - ) - return self.redirect(request) - if user: - views_utils.check_cookie_works(request) - a2_utils.login(request, user, 'france-connect', service=self.service) - # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE - request.session.set_expiry(0) - self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) - self.fc_account.token = json.dumps(self.token) - self.fc_account.save(update_fields=['token']) - self.update_user_info() - self.logger.info('logged in using fc sub %s', self.sub) + # keep id_token around for logout + request.session['fc_id_token'] = self.id_token + request.session['fc_id_token_raw'] = self.token['id_token'] - # redirect to account edit page if any required attribute is missing - data = utils.get_mapped_attributes_flat(request) - required_attributes = a2_models.Attribute.objects.filter(required=True).values_list( - 'name', flat=True + # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE + request.session.set_expiry(0) + + # redirect to account edit page if any required attribute is missing + name_to_label = dict(a2_models.Attribute.objects.filter(required=True).values_list('name', 'label')) + required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(name_to_label) + missing = [] + for attr_name in set(required): + value = getattr(user, attr_name, None) or getattr(user.attributes, attr_name, None) + if value in [None, '']: + missing.append(name_to_label[attr_name]) + if missing: + messages.warning( + request, + _('The following fields are mandatory for account creation: %s') % ', '.join(missing), ) - required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_attributes) - missing = [attr for attr in set(required) - set(data) if not getattr(user.attributes, attr)] - if missing: + return a2_utils.redirect(request, 'profile_edit', params={'next': self.next_url}) + return self.redirect() + + def create_account(self, request, sub, token, user_info): + email = user_info.get('email') + + if email: + # try to create or find an user with this email + try: + user, created = self.get_or_create_user_with_email(email) + except UserOutsideDefaultOu: + user = None + except User.MultipleObjectsReturned: + user = None + if not user: messages.warning( request, - _('The following fields are mandatory for account creation: %s') % ', '.join(missing), + _( + 'Your FranceConnect email address \'%s\' is already used by another ' + 'account, so we cannot create an account for you. Please connect ' + 'with you existing account or create an ' + 'account with another email address then link it to FranceConnect ' + 'using your account management page.' + ) + % email, ) - return a2_utils.redirect(request, 'profile_edit', keep_params=True) - return self.redirect(request) - else: - params = {} - if self.service: - set_service_ref(params, self.service) - messages.info( - request, _('If you already have an account, please log in, else ' 'create your account.') + return None + else: # no email, we cannot disembiguate users, let's create it anyway + user = User.objects.create() + created = True + + try: + if created: + user.set_unusable_password() + user.save() + + models.FcAccount.objects.create( + user=user, + sub=sub, + order=0, + token=json.dumps(token), + user_info=json.dumps(user_info), ) + except IntegrityError: + # uniqueness check failed, as the user is new, it can only mean that the sub is not unique + # let's try again + if created: + user.delete() + return self.authenticate(request, sub=sub, token=token, user_info=user_info) + except Exception: + # if anything unexpected happen and user was created, delete it and re-raise + if created: + user.delete() + raise + else: + if created: + logger.info('auth_fc: new account "%s" created with FranceConnect sub "%s"', user, sub) + hooks.call_hooks('event', name='fc-create', user=user, sub=sub) + else: + logger.info('auth_fc: existing account "%s" linked to FranceConnect sub "%s"', user, sub) + hooks.call_hooks('event', name='fc-link', user=user, sub=sub, request=request) - login_params = params.copy() - if not app_settings.show_button_quick_account_creation: - login_params['nofc'] = 1 + return a2_utils.authenticate(request, sub=sub, user_info=user_info, token=token) - login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params) - return self.redirect_and_come_back(request, login_url, params=params) + def uniqueness_check_failed(self, request): + # currently logged : + if models.FcAccount.objects.filter(user=request.user, order=0).count(): + # cannot link because we are already linked to another FC account + messages.error(request, _('Your account is already linked to a FranceConnect account')) + else: + # cannot link because the FC account is already linked to another account. + messages.error( + request, + _('The FranceConnect account {} is already linked with another account.').format( + self.fc_display_name + ), + ) + return self.redirect() + + def update_user_info(self, user, user_info): + # always handle given_name and family_name + updated = [] + if user_info.get('given_name') and user.first_name != user_info['given_name']: + user.first_name = user_info['given_name'] + updated.append('given name: "%s"' % user_info['given_name']) + if user_info.get('family_name') and user.last_name != user_info['family_name']: + user.last_name = user_info['family_name'] + updated.append('family name: "%s"' % user_info['family_name']) + if updated: + user.save() + logger.debug('auth_fc: updated (%s)', ' - '.join(updated)) + apply_user_info_mappings(user, user_info) + return user + + def get_or_create_user_with_email(self, email): + ou = get_default_ou() + + if a2_app_settings.A2_EMAIL_IS_UNIQUE: + instance, created = safe_get_or_create(User, email=email, defaults={'email': email, 'ou': ou}) + if instance.ou != ou: + assert not created # should not be possible + raise UserOutsideDefaultOu + return instance, created + elif ou.email_is_unique: + return safe_get_or_create(User, ou=ou, email=email, defaults={'email': email, 'ou': ou}) + else: + return User.objects.create(email=email), True -class UnlinkView(LoggerMixin, FormView): +login_or_link = LoginOrLinkView.as_view() + + +class UnlinkView(FormView): template_name = 'authentic2_auth_fc/unlink.html' def get_success_url(self): url = reverse('account_management') if app_settings.logout_when_unlink: # logout URL can be None if not session exists with FC - url = utils.build_logout_url(self.request, next_url=url) or url - clean_fc_session(self.request.session) + url = build_logout_url(self.request, next_url=url) or url return url def get_form_class(self): @@ -545,7 +489,7 @@ class UnlinkView(LoggerMixin, FormView): return form_class def get_form_kwargs(self, **kwargs): - kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs) + kwargs = super().get_form_kwargs(**kwargs) if self.must_set_password(): kwargs['user'] = self.request.user return kwargs @@ -565,26 +509,27 @@ class UnlinkView(LoggerMixin, FormView): if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD: # Prevent access to the view. raise Http404 - return super(UnlinkView, self).dispatch(request, *args, **kwargs) + return super().dispatch(request, *args, **kwargs) def form_valid(self, form): if self.must_set_password(): form.save() update_session_auth_hash(self.request, self.request.user) - self.logger.info(u'user %s has set a password', self.request.user) + logger.info('auth_fc: user %s has set a password', self.request.user) links = models.FcAccount.objects.filter(user=self.request.user) for link in links: - self.logger.info(u'user %s unlinked from %s', self.request.user, link) + logger.info('auth_fc: user %s unlinked from %s', self.request.user, link) hooks.call_hooks('event', name='fc-unlink', user=self.request.user) messages.info(self.request, _('The link with the FranceConnect account has been deleted.')) links.delete() - response = super(UnlinkView, self).form_valid(form) + response = super().form_valid(form) if app_settings.logout_when_unlink: response.display_message = False + clean_fc_session(self.request.session) return response def get_context_data(self, **kwargs): - context = super(UnlinkView, self).get_context_data(**kwargs) + context = super().get_context_data(**kwargs) if self.must_set_password(): context['no_password'] = True return context @@ -592,10 +537,9 @@ class UnlinkView(LoggerMixin, FormView): def post(self, request, *args, **kwargs): if 'cancel' in request.POST: return a2_utils.redirect(request, 'account_management') - return super(UnlinkView, self).post(request, *args, **kwargs) + return super().post(request, *args, **kwargs) -login_or_link = LoginOrLinkView.as_view() unlink = UnlinkView.as_view() diff --git a/tests/auth_fc/conftest.py b/tests/auth_fc/conftest.py index 3676bdba4..a2d395973 100644 --- a/tests/auth_fc/conftest.py +++ b/tests/auth_fc/conftest.py @@ -29,6 +29,7 @@ from django.utils.http import urlencode from django.utils.timezone import now from jwcrypto import jwk, jwt +from authentic2.a2_rbac.utils import get_default_ou from authentic2.models import Service from authentic2.utils import make_url @@ -40,6 +41,8 @@ CLIENT_SECRET = 'yyy' class FranceConnectMock: exp = None + token_endpoint_response = None + user_info_endpoint_response = None def __init__(self): self.sub = '1234' @@ -76,7 +79,7 @@ class FranceConnectMock: @property def callback_url(self): - return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params) + return 'http://testserver' + reverse('fc-login-or-link') def login_with_fc_fixed_params(self, app): if app.session: @@ -97,6 +100,9 @@ class FranceConnectMock: return self.handle_authorization(app, response.location, status=302).follow() def access_token_response(self, url, request): + if self.token_endpoint_response: + return self.token_endpoint_response + formdata = QueryDict(request.body) assert set(formdata.keys()) == {'code', 'client_id', 'client_secret', 'redirect_uri', 'grant_type'} assert formdata['code'] == self.code @@ -127,6 +133,9 @@ class FranceConnectMock: return t.serialize() def user_info_response(self, url, request): + if self.user_info_endpoint_response: + return self.user_info_endpoint_response + assert request.headers['Authorization'] == 'Bearer %s' % self.access_token user_info = self.user_info.copy() user_info['sub'] = self.sub @@ -151,12 +160,12 @@ class FranceConnectMock: @pytest.fixture -def franceconnect(settings, service): +def franceconnect(settings, db): settings.A2_FC_ENABLE = True settings.A2_FC_CLIENT_ID = CLIENT_ID settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET - Service.objects.create(name='portail', slug='portail') + Service.objects.create(name='portail', slug='portail', ou=get_default_ou()) mock_object = FranceConnectMock() with mock_object(): yield mock_object diff --git a/tests/auth_fc/test_auth_fc.py b/tests/auth_fc/test_auth_fc.py index c1c7d7403..624a0706f 100644 --- a/tests/auth_fc/test_auth_fc.py +++ b/tests/auth_fc/test_auth_fc.py @@ -16,17 +16,24 @@ # along with this program. If not, see . import datetime +import json +import re import urllib.parse import mock +import pytest import requests from django.contrib.auth import get_user_model +from django.core.exceptions import PermissionDenied from django.urls import reverse from django.utils.timezone import now +from authentic2.a2_rbac.utils import get_default_ou +from authentic2.apps.journal.models import Event from authentic2.custom_user.models import DeletedUser -from authentic2.models import Attribute +from authentic2.models import Attribute, Service from authentic2_auth_fc import models +from authentic2_auth_fc.backends import FcBackend from authentic2_auth_fc.utils import requests_retry_session from ..utils import get_link_from_mail, login @@ -38,10 +45,19 @@ def path(url): return urllib.parse.urlparse(url).path -def test_login_redirect(app, franceconnect): +def test_fc_url_on_login(app, franceconnect): url = reverse('fc-login-or-link') response = app.get(url, status=302) - assert response['Location'].startswith('https://fcp.integ01') + assert response.location.startswith('https://fcp.integ01') + assert 'fc-state' in app.cookies + + +def test_retry_authorization_if_state_is_lost(settings, app, franceconnect, hooks): + response = app.get('/fc/callback/?next=/idp/&service=default%20portail', status=302) + # clear fc-state cookie + app.cookiejar.clear() + response = franceconnect.handle_authorization(app, response.location, status=302) + assert response.location.startswith('https://fcp.integ01') def test_login_with_condition(settings, app, franceconnect): @@ -61,7 +77,7 @@ def test_login_autorun(settings, app, franceconnect): # hide password block settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}} response = app.get('/login/') - assert response['Location'] == reverse('fc-login-or-link') + assert response.location == reverse('fc-login-or-link') def test_create(settings, app, franceconnect, hooks): @@ -71,13 +87,17 @@ def test_create(settings, app, franceconnect, hooks): response = response.click(href='callback') assert User.objects.count() == 0 + assert Event.objects.which_references(Service.objects.get()).count() == 0 response = franceconnect.handle_authorization(app, response.location, status=302) + assert 'fc-state' not in app.cookies assert User.objects.count() == 1 + # check login for service=portail was registered + assert Event.objects.which_references(Service.objects.get()).count() == 1 user = User.objects.get() assert user.verified_attributes.first_name == 'Ÿuñe' assert user.verified_attributes.last_name == 'Frédérique' - assert path(response['Location']) == '/idp/' + assert path(response.location) == '/idp/' assert hooks.event[1]['kwargs']['name'] == 'login' assert hooks.event[1]['kwargs']['service'] == 'portail' # we must be connected @@ -100,14 +120,13 @@ def test_create(settings, app, franceconnect, hooks): response = response.form.submit(name='unlink') assert models.FcAccount.objects.count() == 0 response = franceconnect.handle_logout(app, response.location) - assert path(response['Location']) == '/accounts/' + assert path(response.location) == '/accounts/' response = response.follow() assert 'The link with the FranceConnect account has been deleted' in response def test_create_expired(settings, app, franceconnect, hooks): # test direct creation failure on an expired id_token - settings.A2_FC_CREATE = True franceconnect.exp = now() - datetime.timedelta(seconds=30) response = app.get('/login/?service=portail&next=/idp/') @@ -120,7 +139,7 @@ def test_create_expired(settings, app, franceconnect, hooks): def test_login_email_is_unique(settings, app, franceconnect, caplog): settings.A2_EMAIL_IS_UNIQUE = True - user = User(email='john.doe@example.com', first_name='John', last_name='Doe') + user = User(email='john.doe@example.com', first_name='John', last_name='Doe', ou=get_default_ou()) user.set_password('toto') user.save() franceconnect.user_info['email'] = user.email @@ -131,6 +150,19 @@ def test_login_email_is_unique(settings, app, franceconnect, caplog): assert app.session['_auth_user_id'] == str(user.pk) +def test_link_after_login_with_password(app, franceconnect, simple_user): + assert models.FcAccount.objects.count() == 0 + + response = login(app, simple_user, path='/accounts/') + response = response.click(href='/fc/callback/') + + franceconnect.callback_params = {'next': '/accounts/'} + response = franceconnect.handle_authorization(app, response.location, status=302) + assert models.FcAccount.objects.count() == 1 + response = response.follow() + assert response.pyquery('.fc').text() == 'Linked FranceConnect accounts\nŸuñe Frédérique Delete link' + + def test_unlink_after_login_with_password(app, franceconnect, simple_user): models.FcAccount.objects.create(user=simple_user, user_info='{}') @@ -240,7 +272,7 @@ def test_login_with_missing_required_attributes(settings, app, franceconnect): assert path(response.location) == '/accounts/edit/' assert User.objects.count() == 1 assert models.FcAccount.objects.count() == 1 - assert 'The following fields are mandatory for account creation: title' in app.cookies['messages'] + assert 'The following fields are mandatory for account creation: Title' in app.cookies['messages'] def test_can_change_password(settings, app, franceconnect): @@ -280,7 +312,7 @@ def test_can_change_password(settings, app, franceconnect): def test_invalid_next_url(app, franceconnect): - assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ' + assert app.get('/fc/callback/?code=coin&state=JJJ72QQQ').location == '/' def test_manager_user_sidebar(app, superuser, simple_user): @@ -296,7 +328,6 @@ def test_manager_user_sidebar(app, superuser, simple_user): def test_user_info_incomplete(settings, app, franceconnect): - settings.A2_FC_CREATE = True franceconnect.user_info = {} franceconnect.login_with_fc_fixed_params(app) @@ -308,7 +339,6 @@ def test_user_info_incomplete(settings, app, franceconnect): def test_user_info_incomplete_already_linked(settings, app, franceconnect, simple_user): - settings.A2_FC_CREATE = True user = User.objects.create() models.FcAccount.objects.create(user=user, sub=franceconnect.sub) franceconnect.user_info = {} @@ -347,3 +377,161 @@ def test_create_missing_email(settings, app, franceconnect, hooks): assert User.objects.count() == 1 response = app.get('/accounts/', status=200) + + +def test_multiple_accounts_with_same_email(settings, app, franceconnect): + ou = get_default_ou() + ou.email_is_unique = True + ou.save() + + User.objects.create(email=franceconnect.user_info['email'], ou=ou) + User.objects.create(email=franceconnect.user_info['email'], ou=ou) + + response = franceconnect.login_with_fc(app, path='/accounts/') + response = response.follow() + + assert 'is already used by another' in response + + +def test_sub_with_order_0_is_used(app, db, rf): + usera = User.objects.create(username='a') + userb = User.objects.create(username='b') + models.FcAccount.objects.create(user=usera, sub='1234', order=1) + models.FcAccount.objects.create(user=userb, sub='1234', order=0) + + assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == userb + + +def test_inactive_raise_permission_denied(app, db, rf): + usera = User.objects.create(is_active=False, username='a') + models.FcAccount.objects.create(user=usera, sub='1234') + + with pytest.raises(PermissionDenied): + FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) + + +def test_order_1_is_returned(app, db, rf): + usera = User.objects.create(username='a') + models.FcAccount.objects.create(user=usera, sub='1234', order=1) + + assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == usera + + +def test_resolve_authorization_code_http_400(app, franceconnect, caplog): + franceconnect.token_endpoint_response = { + 'status_code': 400, + 'content': json.dumps({'error': 'invalid_request'}), + } + + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text) + assert 'invalid_request' in response + + +def test_resolve_authorization_code_http_400_error_description(app, franceconnect, caplog): + franceconnect.token_endpoint_response = { + 'status_code': 400, + 'content': json.dumps({'error': 'invalid_request', 'error_description': 'Requête invalide'}), + } + + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text) + assert 'invalid_request' not in response + assert 'Requête invalide' in response + + +def test_resolve_authorization_code_not_json(app, franceconnect, caplog): + franceconnect.token_endpoint_response = 'not json' + franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*resolve_authorization_code.*not JSON.*not json', caplog.text) + + +def test_get_user_info_http_400(app, franceconnect, caplog): + franceconnect.user_info_endpoint_response = { + 'status_code': 400, + 'content': json.dumps({'error': 'invalid_request'}), + } + + franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*invalid_request', caplog.text) + + +def test_get_user_info_http_400_text_content(app, franceconnect, caplog): + franceconnect.user_info_endpoint_response = { + 'status_code': 400, + 'content': 'coin', + } + franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*coin', caplog.text) + + +def test_get_user_info_not_json(app, franceconnect, caplog): + franceconnect.user_info_endpoint_response = { + 'status_code': 200, + 'content': 'coin', + } + franceconnect.login_with_fc(app, path='/accounts/').follow() + assert re.match(r'WARNING.*get_user_info.*not JSON.*coin', caplog.text) + + +def test_fc_is_down(app, franceconnect, freezer, caplog): + franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'} + + # first error -> warning + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert len(caplog.records) == 1 + assert caplog.records[-1].levelname == 'WARNING' + assert 'Unable to connect to FranceConnect' in response + + # second error, four minutes later -> warning + freezer.move_to(datetime.timedelta(seconds=+240)) + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert len(caplog.records) == 2 + assert caplog.records[-1].levelname == 'WARNING' + assert 'Unable to connect to FranceConnect' in response + + # after 5 minutes an error is logged + freezer.move_to(datetime.timedelta(seconds=+240)) + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert len(caplog.records) == 4 + assert caplog.records[-1].levelname == 'ERROR' + assert 'Unable to connect to FranceConnect' in response + + # but only every 5 minutes + freezer.move_to(datetime.timedelta(seconds=+60)) + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert len(caplog.records) == 5 + assert caplog.records[-1].levelname == 'WARNING' + assert 'Unable to connect to FranceConnect' in response + + # a success clear the down flag + franceconnect.token_endpoint_response = None + response = franceconnect.login_with_fc(app, path='/accounts/') + assert app.session['_auth_user_id'] + app.session.flush() + assert len(caplog.records) == 7 + + # such that 5 minutes later only a warning is emitted + freezer.move_to(datetime.timedelta(seconds=310)) + franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'} + response = franceconnect.login_with_fc(app, path='/accounts/').follow() + assert len(caplog.records) == 8 + assert caplog.records[-1].levelname == 'WARNING' + assert 'Unable to connect to FranceConnect' in response + + +def test_authorization_error(app, franceconnect): + error = 'unauthorized' + error_description = 'Vous n\'êtes pas autorisé à vous connecter.' + + response = app.get( + '/fc/callback/', params={'error': error, 'error_description': error_description, 'next': '/accounts/'} + ).maybe_follow() + messages = response.pyquery('.messages').text() + assert error not in messages + assert error_description in messages + + response = app.get('/fc/callback/', params={'error': error, 'next': '/accounts/'}).maybe_follow() + messages = response.pyquery('.messages').text() + assert error in messages + assert error_description not in messages