auth_fc: completely move account creation into the view (#52929)

- removed unused popup mode
- removed unused cache customization
- removed unused app_settings
- removed obsolete FranceConnect data provider support
- added tests cases to augment coverage on error cases
- removed storage of fc_user_info in session
- removed old attribute mapping
- only save id_token in session on login
This commit is contained in:
Benjamin Dauvergne 2021-04-11 17:19:39 +02:00
parent a208a481cf
commit 87677f6d7e
11 changed files with 670 additions and 618 deletions

View File

@ -59,20 +59,6 @@ class AppSettings(object):
def logout_when_unlink(self):
return self._setting('LOGOUT_WHEN_UNLINK', True)
@property
def logout_at_unlink_return_url(self):
return self._setting('LOGOUT_AT_UNLINK_RETURN_URL', '/accounts/')
@property
def enable_registration_form_prefill(self):
return self._setting('ENABLE_REGISTRATION_FORM_PREFILL', True)
@property
def attributes_mapping(self):
return self._setting(
'ATTRIBUTES_MAPPING', {'family_name': 'last_name', 'given_name': 'first_name', 'email': 'email'}
)
@property
def user_info_mappings(self):
return self._setting(
@ -90,10 +76,6 @@ class AppSettings(object):
},
)
@property
def next_field_name(self):
return self._setting('NEXT_FIELD_NAME', 'fc_next')
@property
def client_id(self):
return self._setting('CLIENT_ID', '')
@ -110,26 +92,10 @@ class AppSettings(object):
def client_credentials(self):
return self._setting('CLIENT_CREDENTIALS', ())
@property
def show_button_quick_account_creation(self):
return self._setting('SHOW_BUTTON_QUICK_ACCOUNT_CREATION', True)
@property
def auto_register(self):
return self._setting('AUTO_REGISTER', True)
@property
def fd_list(self):
return self._setting('FD_LIST', {})
@property
def scopes(self):
return self._setting('SCOPES', ['profile', 'email'])
@property
def popup(self):
return self._setting('POPUP', False)
app_settings = AppSettings('A2_FC_')
app_settings.__name__ = __name__

View File

@ -32,13 +32,6 @@ class Plugin(object):
return [url]
return []
def registration_form_prefill(self, request):
from . import utils
if app_settings.enable_registration_form_prefill:
return [utils.get_mapped_attributes(request)]
return []
class AppConfig(django.apps.AppConfig):
name = 'authentic2_auth_fc'

View File

@ -47,19 +47,13 @@ class FcAuthenticator(BaseAuthenticator):
return
fc_user_info = request.session.get('fc_user_info')
context = kwargs.pop('context', {}).copy()
params = {}
if app_settings.popup:
params['popup'] = ''
context.update(
{
'popup': app_settings.popup,
'about_url': app_settings.about_url,
'fc_user_info': fc_user_info,
}
)
context['login_url'] = a2_utils.make_url(
'fc-login-or-link', keep_params=True, params=params, request=request
)
context['login_url'] = a2_utils.make_url('fc-login-or-link', keep_params=True, request=request)
context['block-extra-css-class'] = 'fc-login'
template = 'authentic2_auth_fc/login.html'
return TemplateResponse(request, template, context)
@ -74,14 +68,11 @@ class FcAuthenticator(BaseAuthenticator):
params = {
'next': account_path,
}
if app_settings.popup:
params['popup'] = ''
link_url = a2_utils.make_url('fc-login-or-link', params=params)
context = kwargs.pop('context', {}).copy()
context.update(
{
'popup': app_settings.popup,
'unlink': unlink,
'about_url': app_settings.about_url,
'link_url': link_url,

View File

@ -20,19 +20,15 @@ import logging
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import MultipleObjectsReturned, PermissionDenied
from django.db import IntegrityError
from authentic2 import hooks
from authentic2.a2_rbac.utils import get_default_ou
from . import models, utils
from . import models
logger = logging.getLogger(__name__)
User = get_user_model()
class FcBackend(ModelBackend):
def authenticate(self, request=None, sub=None, **kwargs):
user_info = kwargs.get('user_info')
def authenticate(self, request, sub, token, user_info):
user = None
try:
try:
@ -40,55 +36,13 @@ class FcBackend(ModelBackend):
except MultipleObjectsReturned:
account = models.FcAccount.objects.select_related().get(sub=sub, order=0)
except models.FcAccount.DoesNotExist:
logger.debug(u'user with the sub %s does not exist.', sub)
else:
user = account.user
logger.debug(u'found user %s with sub %s', user, sub)
if not user.is_active:
logger.info(u'user %s login refused, it is inactive', user)
raise PermissionDenied
if user_info:
User = get_user_model()
user_qs = User.objects.filter(**kwargs.get('user_filter', {}))
if user_qs.count() > 1:
return
if user_qs.exists():
user = user_qs.get()
return None
if not user:
user = User(ou=get_default_ou())
user.set_unusable_password()
user.save()
try:
models.FcAccount.objects.create(
user=user, sub=sub, order=0, token=json.dumps(kwargs['token'])
)
except IntegrityError:
# uniqueness check failed, as the user is new, it can only means that the sub is not unique
# let's try again
user.delete()
return self.authenticate(sub, **kwargs)
else:
logger.debug(
u'user creation enabled with fc_account (sub : %s - token : %s)',
sub,
json.dumps(kwargs['token']),
)
hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
if not account.user.is_active:
logger.info('auth_fc: login refused for user %s, it is inactive', user)
raise PermissionDenied
# always handle given_name and family_name
updated = []
if user_info.get('given_name') and user.first_name != user_info['given_name']:
user.first_name = user_info['given_name']
updated.append('given name: "%s"' % user_info['given_name'])
if user_info.get('family_name') and user.last_name != user_info['family_name']:
user.last_name = user_info['family_name']
updated.append('family name: "%s"' % user_info['family_name'])
if updated:
user.save()
logger.debug('updated (%s)', ' - '.join(updated))
utils.apply_user_info_mappings(user, user_info)
return user
return account.user
def get_saml2_authn_context(self):
import lasso

View File

@ -1,31 +0,0 @@
/* Open FranceConnect in popup */
(function(undef) {
function PopupCenter(url, title, w, h) {
// Fixes dual-screen position Most browsers Firefox
var dualScreenLeft = window.screenLeft != undefined ? window.screenLeft : window.screenX;
var dualScreenTop = window.screenTop != undefined ? window.screenTop : window.screenY;
var width = window.innerWidth ? window.innerWidth : document.documentElement.clientWidth ? document.documentElement.clientWidth : screen.width;
var height = window.innerHeight ? window.innerHeight : document.documentElement.clientHeight ? document.documentElement.clientHeight : screen.height;
var left = ((width / 2) - (w / 2)) + dualScreenLeft;
var top = ((height / 2) - (h / 2)) + dualScreenTop;
var newWindow = window.open(url, title, 'noopener,noreferrer,location=0,status=0,menubar=0,toolbar=0,scrollbars=yes, width=' + w + ', height=' + h + ', top=' + top + ', left=' + left);
newWindow.opener = null;
// Puts focus on the newWindow
if (window.focus) {
newWindow.focus();
}
}
var tags = document.getElementsByClassName('js-fc-popup');
for (var i = 0; i < tags.length; i++) {
var tag = tags[i];
tag.onclick = function (ev) {
PopupCenter(this.href, 'Authentification FranceConnect', 700, 500);
return false;
};
}
})();

View File

@ -18,7 +18,7 @@
<div id="fc-button-wrapper">
<div id="fc-button">
<div>{% trans "Link with a FranceConnect account" %}</div>
<a href="{{ link_url }}" class="button linking-button connexion{% if popup %} js-fc-popup{% endif %}">
<a href="{{ link_url }}" class="button linking-button connexion">
<span class="sr-only">{% trans "Link with a FranceConnect account" %}</span>
</a>
</div>
@ -29,4 +29,3 @@
</div>
<p><a href="{{ about_url }}" target="_blank" rel="noopener">{% trans "What is FranceConnect?" %}</a></p>
</div>
{% if popup %}<script src="{% static 'authentic2_auth_fc/js/fc.js' %}" type="text/javascript"></script>{% endif %}

View File

@ -6,11 +6,10 @@
<div id="fc-button-wrapper">
<div id="fc-button">
<a href="{{ login_url }}"
class="button connexion{% if popup %} js-fc-popup{% endif %}">
class="button connexion">
<span class="sr-only">{% trans 'Log in with FranceConnect' %}</span>
</a>
</div>
</div>
{% include "authentic2_auth_fc/explanation.html" %}
{% if popup %}<script src="{% static 'authentic2_auth_fc/js/fc.js' %}" type="text/javascript"></script>{% endif %}
{% endblock %}

View File

@ -57,24 +57,6 @@ def build_logout_url(request, next_url=None):
return None
def get_mapped_attributes(request):
values = {}
if 'fc_user_info' in request.session:
for fc_name, local_name in app_settings.attributes_mapping.items():
if fc_name in request.session['fc_user_info']:
values[local_name] = [request.session['fc_user_info'][fc_name]]
return values
def get_mapped_attributes_flat(request):
values = {}
if 'fc_user_info' in request.session:
for fc_name, local_name in app_settings.attributes_mapping.items():
if fc_name in request.session['fc_user_info']:
values[local_name] = request.session['fc_user_info'][fc_name]
return values
def get_ref(ref, user_info):
if not hasattr(user_info, 'items'):
return None
@ -202,3 +184,61 @@ def requests_retry_session(
# set proxies
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
return session
class RequestError(Exception):
def __init__(self, message, **details):
super().__init__(message)
self.details = details
def __str__(self):
s = super().__str__()
if self.details:
s += ' ('
s += ' '.join('%s=%r' % (key, self.details[key]) for key in self.details)
s += ')'
return s
def request_json(method, url, data=None, session=None, expected_statuses=None):
session = requests_retry_session(session=session)
try:
response = getattr(session, method)(
url,
data=data,
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
response.raise_for_status()
except requests.exceptions.HTTPError:
try:
content = response.json()
except ValueError:
content = response.text[:256]
if expected_statuses and response.status_code in expected_statuses:
return content
raise RequestError('status code is not 200', status_code=response.status_code, content=content)
except requests.exceptions.RequestException as e:
raise RequestError('HTTP request failed', exception=e)
try:
content = response.json()
except ValueError:
raise RequestError('content is not JSON', content=response.content[:1024])
if not isinstance(content, dict):
raise RequestError('content is not a dict', content=content)
return content
def post_json(url, data, expected_statuses=None):
return request_json('post', url, data=data, expected_statuses=expected_statuses)
def get_json(url, session, expected_statuses=None):
return request_json('get', url, session=session, expected_statuses=expected_statuses)
def clean_fc_session(session):
session.pop('fc_id_token', None)
session.pop('fc_id_token_raw', None)

View File

@ -16,335 +16,79 @@
import json
import logging
import urllib.parse
import uuid
import time
import requests
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.core import signing
from django.core.cache import InvalidCacheBackendError, caches
from django.contrib.auth import get_user_model
from django.contrib.auth.views import update_session_auth_hash
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError
from django.forms import Form
from django.http import Http404, HttpResponseRedirect
from django.shortcuts import render, resolve_url
from django.urls import reverse
from django.utils.http import is_safe_url, urlencode
from django.utils.http import urlencode
from django.utils.translation import ugettext as _
from django.views.generic import FormView, View
from requests_oauthlib import OAuth2Session
try:
from django.contrib.auth.views import update_session_auth_hash
except ImportError:
update_session_auth_hash = None
from authentic2 import app_settings as a2_app_settings
from authentic2 import constants, hooks
from authentic2 import models as a2_models
from authentic2 import utils as a2_utils
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.compat.cookies import set_cookie
from authentic2.crypto import check_hmac_url, hash_chain, hmac_url
from authentic2.forms.passwords import SetPasswordForm
from authentic2.utils import views as views_utils
from authentic2.utils.service import get_service_from_request, set_service_ref
from authentic2.utils.models import safe_get_or_create
from authentic2.utils.service import get_service_from_ref, get_service_from_request, service_ref
from . import app_settings, models, utils
from . import app_settings, models
from .utils import (
RequestError,
apply_user_info_mappings,
build_logout_url,
clean_fc_session,
get_json,
post_json,
)
logger = logging.getLogger(__name__)
User = get_user_model()
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(LoggerMixin, *args, **kwargs)
class UserOutsideDefaultOu(Exception):
pass
try:
cache = caches['fc']
except InvalidCacheBackendError:
cache = caches['default']
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes, logger):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
state = str(uuid.uuid4())
states = request.session.setdefault('fc_states', {})
states[state] = {
'redirect_uri': redirect_uri,
}
request.session.modified = True
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'nonce': state,
'acr_values': 'eidas1',
}
logger.debug('query string %s', params)
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('redirect to %s', url)
response = HttpResponseRedirect(url)
response.display_message = False
return response
def resolve_access_token(authorization_code, redirect_uri, logger):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('access token request %s', data)
try:
session = utils.requests_retry_session()
response = session.post(
app_settings.token_url,
data=data,
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
if response.status_code != 200:
try:
data = response.json()
logger.error(u'oauth2 error on access token retrieval: %r', data)
except ValueError:
data = {}
logger.error(u'oauth2 error on access token retrieval: %r', response.content[:1024])
return
except requests.exceptions.RequestException as e:
logger.error(u'unable to retrieve access token {}'.format(e))
else:
try:
response = response.json()
logger.debug('token resolved : %s', response)
return response
except ValueError:
logger.error(
'no JSON object can be decoded from the data received from %s: %r',
app_settings.token_url,
response.content[:1024],
)
def access_token_from_request(request, logger):
"""Resolve an access token given a request returning from the authorization
endpoint.
class LoginOrLinkView(View):
"""Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
"""
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return
if not state:
return
states = request.session.get('fc_states', {})
if state not in states:
return
# there should not be many FC SSO in flight
redirect_uri = states[state]['redirect_uri']
return resolve_access_token(code, redirect_uri, logger)
_next_url = None
service = None
ACCESS_GRANT_CODE = 'accessgrantcode'
@property
def next_url(self):
return self._next_url or a2_utils.select_next_url(self.request, default=settings.LOGIN_REDIRECT_URL)
@property
def redirect_uri(self):
return self.request.build_absolute_uri(reverse('fc-login-or-link'))
def clean_fc_session(session):
session.pop('fc_id_token', None)
session.pop('fc_id_token_raw', None)
session.pop('fc_user_info', None)
session.pop('fc_data', None)
class FcOAuthSessionViewMixin(LoggerMixin):
'''Add the OAuth2 dance to a view'''
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
user_info = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request):
if request.method == 'POST':
redirect_to = request.POST.get(
self.redirect_field_name, request.GET.get(self.redirect_field_name, '')
)
else:
redirect_to = request.GET.get(self.redirect_field_name, '')
safe = is_safe_url(url=redirect_to, allowed_hosts=request.get_host())
if not safe:
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
"""Show a page to close the current popup and reload the parent window
with the return url.
"""
return render(request, 'authentic2_auth_fc/close-popup-redirect.html', {'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return a2_utils.redirect(request, next_url, *args, resolve=False, **kwargs)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args, **kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request)
here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url})
here = a2_utils.make_url(here, **kwargs)
there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here})
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
return list(set(['openid'] + app_settings.scopes))
def get_ressource(self, url, verify):
try:
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
data.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error('unable to retrieve ressource from %s due to %s', url, e)
else:
try:
data = data.json()
self.logger.debug('ressource resolved: %s', data)
return data
except ValueError:
self.logger.error(
'no JSON object can be decoded from the data received from %s: %r', url, data.content
)
def get_user_info(self):
return self.get_ressource(
app_settings.userinfo_url + '?schema=openid', app_settings.verify_certificate
)
def get_data(self, scopes=[]):
data = dict()
if not app_settings.fd_list:
return data
for scope in scopes:
for fd in app_settings.fd_list[scope]:
url = fd['url']
if fd['query_dic']:
url += '?' + urlencode(fd['query_dic'])
d = self.get_ressource(url, app_settings.verify_certificate)
if d:
data.setdefault(scope, []).append((fd['name'], d))
return data
def authorization_error(self, request, *args, **kwargs):
error = request.GET.get('error')
error_description = request.GET.get('error_description')
msg = _('Unable to connect to FranceConnect: "%s".') % error
if error_description:
msg += _('("%s")') % error_description
messages.error(request, msg)
self.logger.warning(
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
)
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request, self.logger)
# Token request may not be completly processed and result in no
# token
if not self.token:
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
# The token request may fail, 'error' is then required.
# A bad client secret results in error equals to invalid_request
# for FC and invalid_client for oidc_provider.
if 'error' in self.token:
msg = 'token request failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(
request, _('Unable to connect to FranceConnect: "%s".') % self.token['error']
)
return self.redirect(request)
key = app_settings.client_secret
# duck-type unicode/Py3 strings
if hasattr(key, 'isdecimal'):
key = key.encode('utf-8')
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
)
if not self.id_token:
self.logger.error(u'validation of id_token failed: %s', error)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
nonce = self.id_token.get('nonce')
states = request.session.get('fc_states', {})
if not nonce or nonce not in states:
self.logger.error(
u'invalid nonce in id_token %s, known ones %s', nonce, u', '.join(states.keys())
)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc id_token %s', self.id_token)
for key in self.id_token:
setattr(self, key, self.id_token[key])
self.oauth_session = lambda: utils.requests_retry_session(
session=OAuth2Session(app_settings.client_id, token=self.token)
)
self.user_info = self.get_user_info()
if not self.user_info:
self.logger.error('userinfo resolution failed: %s', self.token)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc user_info %s', self.user_info)
self.request.session['fc_id_token'] = self.id_token
self.request.session['fc_id_token_raw'] = self.token['id_token']
self.request.session['fc_user_info'] = self.user_info
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.data = self.get_data(scopes)
self.logger.debug('fc data %s', self.data)
fc_data = self.request.session.setdefault('fc_data', {})
for scope in self.data:
fc_data.setdefault(scope, []).extend(self.data[scope])
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
return super(FcOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
scopes = self.get_scopes()
if 'fd_scopes' in request.GET:
scopes = list(set(scopes) | set(request.GET['fd_scopes'].split()))
return ask_authorization(request, scopes, self.logger)
def redirect(self):
return a2_utils.redirect(self.request, self.next_url)
@property
def fc_display_name(self):
'''Human representation of the current FC account'''
display_name = ''
user_info = self.user_info or {}
family_name = user_info.get('family_name')
given_name = user_info.get('given_name')
family_name = self.user_info.get('family_name')
given_name = self.user_info.get('given_name')
if given_name:
display_name += given_name
if family_name:
@ -353,49 +97,218 @@ class FcOAuthSessionViewMixin(LoggerMixin):
display_name += family_name
return display_name
def get(self, request, *args, **kwargs):
code = request.GET.get('code')
state = request.GET.get('state')
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.GET
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
"""Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
"""
def update_user_info(self):
self.fc_account.token = json.dumps(self.token)
self.fc_account.user_info = json.dumps(self.user_info)
self.fc_account.save(update_fields=['token', 'user_info'])
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
self.logger.debug('updating user_info %s', self.fc_account.user_info)
def uniqueness_check_failed(self, request):
if request.user.is_authenticated:
# currently logged :
if models.FcAccount.objects.filter(user=request.user, order=0).count():
# cannot link because we are already linked to another FC account
messages.error(request, _('Your account is already linked to a FranceConnect account'))
else:
# cannot link because the FC account is already linked to another account.
messages.error(
request,
_('The FranceConnect account {} is already' ' linked with another account.').format(
self.fc_display_name
),
)
if code and state:
response = self.handle_authorization_response(request, code=code, state=state)
response.delete_cookie('fc-state', path=reverse('fc-login-or-link'))
return response
elif 'error' in request.GET:
return self.authorization_error(
request, error=request.GET['error'], error_description=request.GET.get('error_description')
)
else:
# not logged, cannot login because the user is disabled (user.is_active is False)
messages.error(request, _('Your account is disabled.'))
return self.redirect(request)
return self.make_authorization_request(request)
def handle_authorization_response(self, request, code, state):
# check state signature and parse it
try:
state, self._next_url, self.service = self.decode_state(state)
except ValueError:
return a2_utils.redirect(request, settings.LOGIN_REDIRECT_URL)
# regenerte the chain of hash from the stored nonce_seed
try:
encoded_seed = request.COOKIES.get('fc-state', '')
if not encoded_seed:
raise ValueError
dummy, hash_nonce, hash_state = hash_chain(3, encoded_seed=encoded_seed)
if not state or state != hash_state:
logger.warning('auth_fc: state lost, requesting authorization again')
raise ValueError
except ValueError:
return self.make_authorization_request(request)
# resolve the authorization_code and check the token endpoint response
self.token = self.resolve_authorization_code(code)
if not self.token:
# resolve_authorization_code already logged a warning.
return self.report_fc_is_down(request)
if 'error' in self.token:
logger.warning('auth_fc: token request failed, "%s"', self.token)
messages.warning(
request,
_('Unable to connect to FranceConnect: "%s".')
% (self.token.get('error_description') or self.token['error']),
)
return self.redirect()
# parse the id_token
if not self.token.get('id_token') or not isinstance(self.token['id_token'], str):
logger.warning('auth_fc: token endpoint did not return an id_token')
return self.report_fc_is_down(request)
key = app_settings.client_secret.encode()
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
)
if not self.id_token:
logger.warning('auth_fc: validation of id_token failed: %s', error)
return self.report_fc_is_down(request)
logger.debug('auth_fc: parsed id_token %s', self.id_token)
nonce = self.id_token.get('nonce')
if nonce != hash_nonce:
logger.warning('auth_fc: invalid nonce in id_token')
return self.report_fc_is_down(request)
self.sub = self.id_token.get('sub')
if not self.sub:
logger.warning('auth_fc: no sub in id_token %s', self.id_token)
return self.report_fc_is_down(request)
# get user info using the access token
if not self.token.get('access_token') or not isinstance(self.token['access_token'], str):
logger.warning('auth_fc: token endpoint did not return an access_token')
return self.report_fc_is_down(request)
self.user_info = self.get_user_info()
if self.user_info is None:
return self.report_fc_is_down(request)
logger.debug('auth_fc: user_info %s', self.user_info)
# clear FranceConnect down status
cache.delete('fc_is_down')
if request.user.is_authenticated:
return self.link(request)
else:
return self.login(request)
def encode_state(self, state, next_url, service):
encoded_state = state + ' ' + self.next_url + ' '
if service:
encoded_state += service_ref(service)
encoded_state += ' ' + hmac_url(settings.SECRET_KEY, encoded_state)
return encoded_state
def decode_state(self, state):
payload, signature = state.rsplit(' ', 1)
if not check_hmac_url(settings.SECRET_KEY, payload, signature):
raise ValueError
# service_ref can be made of one or two parts
try:
state, next_url, service_ref = payload.split(' ')
except ValueError:
state, next_url, ou_slug, service_slug = payload.split(' ')
service_ref = ou_slug + ' ' + service_slug
service = get_service_from_ref(service_ref)
return state, next_url, service
def make_authorization_request(self, request):
scope = ' '.join(set(['openid'] + app_settings.scopes))
service = self.service or get_service_from_request(request)
nonce_seed, nonce, state = hash_chain(3)
# encode the target service and next_url in the state
full_state = state + ' ' + self.next_url + ' '
if service:
full_state += service_ref(service)
full_state += ' ' + hmac_url(settings.SECRET_KEY, full_state)
params = {
'client_id': app_settings.client_id,
'scope': scope,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'state': self.encode_state(state, self.next_url, service),
'nonce': nonce,
'acr_values': 'eidas1',
}
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('auth_fc: authorization_request redirect to %s', url)
response = HttpResponseRedirect(url)
# prevent unshown messages to block the navigation to FranceConnect
response.display_message = False
# store nonce_seed in a browser cookie to prevent CSRF and check nonce
# in id_token on return by generating the hash chain again
set_cookie(
response,
'fc-state',
value=nonce_seed,
path=reverse('fc-login-or-link'),
httponly=True,
secure=request.is_secure(),
samesite='Lax',
)
return response
def resolve_authorization_code(self, authorization_code):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': self.redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('auth_fc: resolve_access_token request params %s', data)
try:
token = post_json(app_settings.token_url, data, expected_statuses=[400])
except RequestError as e:
logger.warning('auth_fc: resolve_authorization_code error %s', e)
return None
else:
logger.debug('auth_fc: token endpoint returned "%s"', token)
return token
def get_user_info(self):
try:
data = get_json(
app_settings.userinfo_url + '?schema=openid',
session=OAuth2Session(app_settings.client_id, token=self.token),
)
except RequestError as e:
logger.warning('auth_fc: get_user_info error %s', e)
return None
logger.debug('auth_fc: get_user_info returned %r', data)
return data
def authorization_error(self, request, error, error_description):
messages.error(request, _('Unable to connect to FranceConnect: "%s".') % (error_description or error))
logger.warning(
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
)
return self.redirect()
def report_fc_is_down(self, request):
messages.warning(request, _('Unable to connect to FranceConnect.'))
# put FranceConnect status in cache, if it happens for more than 5 minutes, log an error
last_down = cache.get('fc_is_down')
now = time.time()
more_than_5_minutes = last_down and (now - last_down) > 5 * 60
if more_than_5_minutes:
logger.error('auth_fc: FranceConnect is down for more than 5 minutes')
if not last_down or more_than_5_minutes:
cache.set('fc_is_down', now, 10 * 60)
return self.redirect()
def link(self, request):
'''Request an access grant code and associate it to the current user'''
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
sub=self.sub, user=request.user, order=0, defaults={'token': json.dumps(self.token)}
sub=self.sub,
user=request.user,
order=0,
defaults={
'token': json.dumps(self.token),
'user_info': json.dumps(self.user_info),
},
)
# Prevent adding a link with an FC account already linked with another user.
except IntegrityError:
@ -403,139 +316,170 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
return self.uniqueness_check_failed(request)
if created:
self.logger.info('fc link created sub %s', self.sub)
logger.info('auth_fc: link created sub %s', self.sub)
messages.info(
request, _('Your FranceConnect account {} has been linked.').format(self.fc_display_name)
)
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
else:
messages.info(request, _('Your local account has been updated.'))
self.update_user_info()
return self.redirect(request)
def get(self, request, *args, **kwargs):
if request.user.is_authenticated:
return self.link(request)
else:
return self.login(request)
self.update_user_info(request.user, self.user_info)
return self.redirect()
def login(self, request):
self.service = get_service_from_request(request)
default_ou = get_default_ou()
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
email_present_and_unique = self.user_info.get('email') and email_is_unique
user_filter = {}
if email_present_and_unique:
user_filter = {'email__iexact': self.user_info['email']}
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
user_filter['ou'] = default_ou
user = a2_utils.authenticate(request, sub=self.sub, user_info=self.user_info, token=self.token)
user = a2_utils.authenticate(
request, sub=self.sub, user_info=self.user_info, token=self.token, user_filter=user_filter
)
if not user:
user = self.create_account(request, sub=self.sub, token=self.token, user_info=self.user_info)
# ignore user if sub is not matching and let the code below handle it
if not user.fc_accounts.filter(sub=self.sub).exists():
user = None
if not user:
return self.redirect()
if user:
self.fc_account = user.fc_accounts.get(order=0)
if not user and email_present_and_unique:
email = self.user_info['email']
User = get_user_model()
qs = User.objects.filter(**user_filter)
return self.finish_login(request, user, self.user_info)
if qs.exists():
# there should not be multiple accounts with the same mail
if len(qs) > 1:
self.logger.error(u'multiple accounts with the same mail %s, %s', email, list(qs))
# ok we have one account
elif len(qs) == 1:
user = qs[0]
# but does he have already a link to an FC account ?
if not user.fc_accounts.exists():
try:
self.fc_account, created = models.FcAccount.objects.get_or_create(
defaults={'token': json.dumps(self.token)}, sub=self.sub, user=user, order=0
)
except IntegrityError:
# unique index check failed, find why.
return self.uniqueness_check_failed(request)
def finish_login(self, request, user, user_info):
self.update_user_info(user, user_info)
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service=self.service)
if created:
self.logger.info(u'fc link created sub %s user %s', self.sub, user)
hooks.call_hooks(
'event', name='fc-link', user=user, sub=self.sub, request=request
)
user = a2_utils.authenticate(
request=request,
sub=self.sub,
user_info=self.user_info,
token=self.token,
user_filter=user_filter,
)
else:
messages.warning(
request,
_(
'Your FranceConnect email address \'%s\' is already used by another '
'account, so we cannot create an account for you. Please create an '
'account with another email address then link it to FranceConnect '
'using your account management page.'
)
% email,
)
return self.redirect(request)
if user:
views_utils.check_cookie_works(request)
a2_utils.login(request, user, 'france-connect', service=self.service)
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
self.fc_account.token = json.dumps(self.token)
self.fc_account.save(update_fields=['token'])
self.update_user_info()
self.logger.info('logged in using fc sub %s', self.sub)
# keep id_token around for logout
request.session['fc_id_token'] = self.id_token
request.session['fc_id_token_raw'] = self.token['id_token']
# redirect to account edit page if any required attribute is missing
data = utils.get_mapped_attributes_flat(request)
required_attributes = a2_models.Attribute.objects.filter(required=True).values_list(
'name', flat=True
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
request.session.set_expiry(0)
# redirect to account edit page if any required attribute is missing
name_to_label = dict(a2_models.Attribute.objects.filter(required=True).values_list('name', 'label'))
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(name_to_label)
missing = []
for attr_name in set(required):
value = getattr(user, attr_name, None) or getattr(user.attributes, attr_name, None)
if value in [None, '']:
missing.append(name_to_label[attr_name])
if missing:
messages.warning(
request,
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
)
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_attributes)
missing = [attr for attr in set(required) - set(data) if not getattr(user.attributes, attr)]
if missing:
return a2_utils.redirect(request, 'profile_edit', params={'next': self.next_url})
return self.redirect()
def create_account(self, request, sub, token, user_info):
email = user_info.get('email')
if email:
# try to create or find an user with this email
try:
user, created = self.get_or_create_user_with_email(email)
except UserOutsideDefaultOu:
user = None
except User.MultipleObjectsReturned:
user = None
if not user:
messages.warning(
request,
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
_(
'Your FranceConnect email address \'%s\' is already used by another '
'account, so we cannot create an account for you. Please connect '
'with you existing account or create an '
'account with another email address then link it to FranceConnect '
'using your account management page.'
)
% email,
)
return a2_utils.redirect(request, 'profile_edit', keep_params=True)
return self.redirect(request)
else:
params = {}
if self.service:
set_service_ref(params, self.service)
messages.info(
request, _('If you already have an account, please log in, else ' 'create your account.')
return None
else: # no email, we cannot disembiguate users, let's create it anyway
user = User.objects.create()
created = True
try:
if created:
user.set_unusable_password()
user.save()
models.FcAccount.objects.create(
user=user,
sub=sub,
order=0,
token=json.dumps(token),
user_info=json.dumps(user_info),
)
except IntegrityError:
# uniqueness check failed, as the user is new, it can only mean that the sub is not unique
# let's try again
if created:
user.delete()
return self.authenticate(request, sub=sub, token=token, user_info=user_info)
except Exception:
# if anything unexpected happen and user was created, delete it and re-raise
if created:
user.delete()
raise
else:
if created:
logger.info('auth_fc: new account "%s" created with FranceConnect sub "%s"', user, sub)
hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
else:
logger.info('auth_fc: existing account "%s" linked to FranceConnect sub "%s"', user, sub)
hooks.call_hooks('event', name='fc-link', user=user, sub=sub, request=request)
login_params = params.copy()
if not app_settings.show_button_quick_account_creation:
login_params['nofc'] = 1
return a2_utils.authenticate(request, sub=sub, user_info=user_info, token=token)
login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
return self.redirect_and_come_back(request, login_url, params=params)
def uniqueness_check_failed(self, request):
# currently logged :
if models.FcAccount.objects.filter(user=request.user, order=0).count():
# cannot link because we are already linked to another FC account
messages.error(request, _('Your account is already linked to a FranceConnect account'))
else:
# cannot link because the FC account is already linked to another account.
messages.error(
request,
_('The FranceConnect account {} is already linked with another account.').format(
self.fc_display_name
),
)
return self.redirect()
def update_user_info(self, user, user_info):
# always handle given_name and family_name
updated = []
if user_info.get('given_name') and user.first_name != user_info['given_name']:
user.first_name = user_info['given_name']
updated.append('given name: "%s"' % user_info['given_name'])
if user_info.get('family_name') and user.last_name != user_info['family_name']:
user.last_name = user_info['family_name']
updated.append('family name: "%s"' % user_info['family_name'])
if updated:
user.save()
logger.debug('auth_fc: updated (%s)', ' - '.join(updated))
apply_user_info_mappings(user, user_info)
return user
def get_or_create_user_with_email(self, email):
ou = get_default_ou()
if a2_app_settings.A2_EMAIL_IS_UNIQUE:
instance, created = safe_get_or_create(User, email=email, defaults={'email': email, 'ou': ou})
if instance.ou != ou:
assert not created # should not be possible
raise UserOutsideDefaultOu
return instance, created
elif ou.email_is_unique:
return safe_get_or_create(User, ou=ou, email=email, defaults={'email': email, 'ou': ou})
else:
return User.objects.create(email=email), True
class UnlinkView(LoggerMixin, FormView):
login_or_link = LoginOrLinkView.as_view()
class UnlinkView(FormView):
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
url = reverse('account_management')
if app_settings.logout_when_unlink:
# logout URL can be None if not session exists with FC
url = utils.build_logout_url(self.request, next_url=url) or url
clean_fc_session(self.request.session)
url = build_logout_url(self.request, next_url=url) or url
return url
def get_form_class(self):
@ -545,7 +489,7 @@ class UnlinkView(LoggerMixin, FormView):
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
kwargs = super().get_form_kwargs(**kwargs)
if self.must_set_password():
kwargs['user'] = self.request.user
return kwargs
@ -565,26 +509,27 @@ class UnlinkView(LoggerMixin, FormView):
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
if self.must_set_password():
form.save()
update_session_auth_hash(self.request, self.request.user)
self.logger.info(u'user %s has set a password', self.request.user)
logger.info('auth_fc: user %s has set a password', self.request.user)
links = models.FcAccount.objects.filter(user=self.request.user)
for link in links:
self.logger.info(u'user %s unlinked from %s', self.request.user, link)
logger.info('auth_fc: user %s unlinked from %s', self.request.user, link)
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
links.delete()
response = super(UnlinkView, self).form_valid(form)
response = super().form_valid(form)
if app_settings.logout_when_unlink:
response.display_message = False
clean_fc_session(self.request.session)
return response
def get_context_data(self, **kwargs):
context = super(UnlinkView, self).get_context_data(**kwargs)
context = super().get_context_data(**kwargs)
if self.must_set_password():
context['no_password'] = True
return context
@ -592,10 +537,9 @@ class UnlinkView(LoggerMixin, FormView):
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super(UnlinkView, self).post(request, *args, **kwargs)
return super().post(request, *args, **kwargs)
login_or_link = LoginOrLinkView.as_view()
unlink = UnlinkView.as_view()

View File

@ -29,6 +29,7 @@ from django.utils.http import urlencode
from django.utils.timezone import now
from jwcrypto import jwk, jwt
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Service
from authentic2.utils import make_url
@ -40,6 +41,8 @@ CLIENT_SECRET = 'yyy'
class FranceConnectMock:
exp = None
token_endpoint_response = None
user_info_endpoint_response = None
def __init__(self):
self.sub = '1234'
@ -76,7 +79,7 @@ class FranceConnectMock:
@property
def callback_url(self):
return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
return 'http://testserver' + reverse('fc-login-or-link')
def login_with_fc_fixed_params(self, app):
if app.session:
@ -97,6 +100,9 @@ class FranceConnectMock:
return self.handle_authorization(app, response.location, status=302).follow()
def access_token_response(self, url, request):
if self.token_endpoint_response:
return self.token_endpoint_response
formdata = QueryDict(request.body)
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret', 'redirect_uri', 'grant_type'}
assert formdata['code'] == self.code
@ -127,6 +133,9 @@ class FranceConnectMock:
return t.serialize()
def user_info_response(self, url, request):
if self.user_info_endpoint_response:
return self.user_info_endpoint_response
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
user_info = self.user_info.copy()
user_info['sub'] = self.sub
@ -151,12 +160,12 @@ class FranceConnectMock:
@pytest.fixture
def franceconnect(settings, service):
def franceconnect(settings, db):
settings.A2_FC_ENABLE = True
settings.A2_FC_CLIENT_ID = CLIENT_ID
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
Service.objects.create(name='portail', slug='portail')
Service.objects.create(name='portail', slug='portail', ou=get_default_ou())
mock_object = FranceConnectMock()
with mock_object():
yield mock_object

View File

@ -16,17 +16,24 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import re
import urllib.parse
import mock
import pytest
import requests
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.urls import reverse
from django.utils.timezone import now
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.apps.journal.models import Event
from authentic2.custom_user.models import DeletedUser
from authentic2.models import Attribute
from authentic2.models import Attribute, Service
from authentic2_auth_fc import models
from authentic2_auth_fc.backends import FcBackend
from authentic2_auth_fc.utils import requests_retry_session
from ..utils import get_link_from_mail, login
@ -38,10 +45,19 @@ def path(url):
return urllib.parse.urlparse(url).path
def test_login_redirect(app, franceconnect):
def test_fc_url_on_login(app, franceconnect):
url = reverse('fc-login-or-link')
response = app.get(url, status=302)
assert response['Location'].startswith('https://fcp.integ01')
assert response.location.startswith('https://fcp.integ01')
assert 'fc-state' in app.cookies
def test_retry_authorization_if_state_is_lost(settings, app, franceconnect, hooks):
response = app.get('/fc/callback/?next=/idp/&service=default%20portail', status=302)
# clear fc-state cookie
app.cookiejar.clear()
response = franceconnect.handle_authorization(app, response.location, status=302)
assert response.location.startswith('https://fcp.integ01')
def test_login_with_condition(settings, app, franceconnect):
@ -61,7 +77,7 @@ def test_login_autorun(settings, app, franceconnect):
# hide password block
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
response = app.get('/login/')
assert response['Location'] == reverse('fc-login-or-link')
assert response.location == reverse('fc-login-or-link')
def test_create(settings, app, franceconnect, hooks):
@ -71,13 +87,17 @@ def test_create(settings, app, franceconnect, hooks):
response = response.click(href='callback')
assert User.objects.count() == 0
assert Event.objects.which_references(Service.objects.get()).count() == 0
response = franceconnect.handle_authorization(app, response.location, status=302)
assert 'fc-state' not in app.cookies
assert User.objects.count() == 1
# check login for service=portail was registered
assert Event.objects.which_references(Service.objects.get()).count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == 'Ÿuñe'
assert user.verified_attributes.last_name == 'Frédérique'
assert path(response['Location']) == '/idp/'
assert path(response.location) == '/idp/'
assert hooks.event[1]['kwargs']['name'] == 'login'
assert hooks.event[1]['kwargs']['service'] == 'portail'
# we must be connected
@ -100,14 +120,13 @@ def test_create(settings, app, franceconnect, hooks):
response = response.form.submit(name='unlink')
assert models.FcAccount.objects.count() == 0
response = franceconnect.handle_logout(app, response.location)
assert path(response['Location']) == '/accounts/'
assert path(response.location) == '/accounts/'
response = response.follow()
assert 'The link with the FranceConnect account has been deleted' in response
def test_create_expired(settings, app, franceconnect, hooks):
# test direct creation failure on an expired id_token
settings.A2_FC_CREATE = True
franceconnect.exp = now() - datetime.timedelta(seconds=30)
response = app.get('/login/?service=portail&next=/idp/')
@ -120,7 +139,7 @@ def test_create_expired(settings, app, franceconnect, hooks):
def test_login_email_is_unique(settings, app, franceconnect, caplog):
settings.A2_EMAIL_IS_UNIQUE = True
user = User(email='john.doe@example.com', first_name='John', last_name='Doe')
user = User(email='john.doe@example.com', first_name='John', last_name='Doe', ou=get_default_ou())
user.set_password('toto')
user.save()
franceconnect.user_info['email'] = user.email
@ -131,6 +150,19 @@ def test_login_email_is_unique(settings, app, franceconnect, caplog):
assert app.session['_auth_user_id'] == str(user.pk)
def test_link_after_login_with_password(app, franceconnect, simple_user):
assert models.FcAccount.objects.count() == 0
response = login(app, simple_user, path='/accounts/')
response = response.click(href='/fc/callback/')
franceconnect.callback_params = {'next': '/accounts/'}
response = franceconnect.handle_authorization(app, response.location, status=302)
assert models.FcAccount.objects.count() == 1
response = response.follow()
assert response.pyquery('.fc').text() == 'Linked FranceConnect accounts\nŸuñe Frédérique Delete link'
def test_unlink_after_login_with_password(app, franceconnect, simple_user):
models.FcAccount.objects.create(user=simple_user, user_info='{}')
@ -240,7 +272,7 @@ def test_login_with_missing_required_attributes(settings, app, franceconnect):
assert path(response.location) == '/accounts/edit/'
assert User.objects.count() == 1
assert models.FcAccount.objects.count() == 1
assert 'The following fields are mandatory for account creation: title' in app.cookies['messages']
assert 'The following fields are mandatory for account creation: Title' in app.cookies['messages']
def test_can_change_password(settings, app, franceconnect):
@ -280,7 +312,7 @@ def test_can_change_password(settings, app, franceconnect):
def test_invalid_next_url(app, franceconnect):
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
assert app.get('/fc/callback/?code=coin&state=JJJ72QQQ').location == '/'
def test_manager_user_sidebar(app, superuser, simple_user):
@ -296,7 +328,6 @@ def test_manager_user_sidebar(app, superuser, simple_user):
def test_user_info_incomplete(settings, app, franceconnect):
settings.A2_FC_CREATE = True
franceconnect.user_info = {}
franceconnect.login_with_fc_fixed_params(app)
@ -308,7 +339,6 @@ def test_user_info_incomplete(settings, app, franceconnect):
def test_user_info_incomplete_already_linked(settings, app, franceconnect, simple_user):
settings.A2_FC_CREATE = True
user = User.objects.create()
models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
franceconnect.user_info = {}
@ -347,3 +377,161 @@ def test_create_missing_email(settings, app, franceconnect, hooks):
assert User.objects.count() == 1
response = app.get('/accounts/', status=200)
def test_multiple_accounts_with_same_email(settings, app, franceconnect):
ou = get_default_ou()
ou.email_is_unique = True
ou.save()
User.objects.create(email=franceconnect.user_info['email'], ou=ou)
User.objects.create(email=franceconnect.user_info['email'], ou=ou)
response = franceconnect.login_with_fc(app, path='/accounts/')
response = response.follow()
assert 'is already used by another' in response
def test_sub_with_order_0_is_used(app, db, rf):
usera = User.objects.create(username='a')
userb = User.objects.create(username='b')
models.FcAccount.objects.create(user=usera, sub='1234', order=1)
models.FcAccount.objects.create(user=userb, sub='1234', order=0)
assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == userb
def test_inactive_raise_permission_denied(app, db, rf):
usera = User.objects.create(is_active=False, username='a')
models.FcAccount.objects.create(user=usera, sub='1234')
with pytest.raises(PermissionDenied):
FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={})
def test_order_1_is_returned(app, db, rf):
usera = User.objects.create(username='a')
models.FcAccount.objects.create(user=usera, sub='1234', order=1)
assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == usera
def test_resolve_authorization_code_http_400(app, franceconnect, caplog):
franceconnect.token_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request'}),
}
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
assert 'invalid_request' in response
def test_resolve_authorization_code_http_400_error_description(app, franceconnect, caplog):
franceconnect.token_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request', 'error_description': 'Requête invalide'}),
}
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
assert 'invalid_request' not in response
assert 'Requête invalide' in response
def test_resolve_authorization_code_not_json(app, franceconnect, caplog):
franceconnect.token_endpoint_response = 'not json'
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*resolve_authorization_code.*not JSON.*not json', caplog.text)
def test_get_user_info_http_400(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request'}),
}
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*invalid_request', caplog.text)
def test_get_user_info_http_400_text_content(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 400,
'content': 'coin',
}
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*coin', caplog.text)
def test_get_user_info_not_json(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 200,
'content': 'coin',
}
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*not JSON.*coin', caplog.text)
def test_fc_is_down(app, franceconnect, freezer, caplog):
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
# first error -> warning
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert len(caplog.records) == 1
assert caplog.records[-1].levelname == 'WARNING'
assert 'Unable to connect to FranceConnect' in response
# second error, four minutes later -> warning
freezer.move_to(datetime.timedelta(seconds=+240))
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert len(caplog.records) == 2
assert caplog.records[-1].levelname == 'WARNING'
assert 'Unable to connect to FranceConnect' in response
# after 5 minutes an error is logged
freezer.move_to(datetime.timedelta(seconds=+240))
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert len(caplog.records) == 4
assert caplog.records[-1].levelname == 'ERROR'
assert 'Unable to connect to FranceConnect' in response
# but only every 5 minutes
freezer.move_to(datetime.timedelta(seconds=+60))
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert len(caplog.records) == 5
assert caplog.records[-1].levelname == 'WARNING'
assert 'Unable to connect to FranceConnect' in response
# a success clear the down flag
franceconnect.token_endpoint_response = None
response = franceconnect.login_with_fc(app, path='/accounts/')
assert app.session['_auth_user_id']
app.session.flush()
assert len(caplog.records) == 7
# such that 5 minutes later only a warning is emitted
freezer.move_to(datetime.timedelta(seconds=310))
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
assert len(caplog.records) == 8
assert caplog.records[-1].levelname == 'WARNING'
assert 'Unable to connect to FranceConnect' in response
def test_authorization_error(app, franceconnect):
error = 'unauthorized'
error_description = 'Vous n\'êtes pas autorisé à vous connecter.'
response = app.get(
'/fc/callback/', params={'error': error, 'error_description': error_description, 'next': '/accounts/'}
).maybe_follow()
messages = response.pyquery('.messages').text()
assert error not in messages
assert error_description in messages
response = app.get('/fc/callback/', params={'error': error, 'next': '/accounts/'}).maybe_follow()
messages = response.pyquery('.messages').text()
assert error in messages
assert error_description not in messages