auth_fc: completely move account creation into the view (#52929)
- removed unused popup mode - removed unused cache customization - removed unused app_settings - removed obsolete FranceConnect data provider support - added tests cases to augment coverage on error cases - removed storage of fc_user_info in session - removed old attribute mapping - only save id_token in session on login
This commit is contained in:
parent
a208a481cf
commit
87677f6d7e
|
@ -59,20 +59,6 @@ class AppSettings(object):
|
|||
def logout_when_unlink(self):
|
||||
return self._setting('LOGOUT_WHEN_UNLINK', True)
|
||||
|
||||
@property
|
||||
def logout_at_unlink_return_url(self):
|
||||
return self._setting('LOGOUT_AT_UNLINK_RETURN_URL', '/accounts/')
|
||||
|
||||
@property
|
||||
def enable_registration_form_prefill(self):
|
||||
return self._setting('ENABLE_REGISTRATION_FORM_PREFILL', True)
|
||||
|
||||
@property
|
||||
def attributes_mapping(self):
|
||||
return self._setting(
|
||||
'ATTRIBUTES_MAPPING', {'family_name': 'last_name', 'given_name': 'first_name', 'email': 'email'}
|
||||
)
|
||||
|
||||
@property
|
||||
def user_info_mappings(self):
|
||||
return self._setting(
|
||||
|
@ -90,10 +76,6 @@ class AppSettings(object):
|
|||
},
|
||||
)
|
||||
|
||||
@property
|
||||
def next_field_name(self):
|
||||
return self._setting('NEXT_FIELD_NAME', 'fc_next')
|
||||
|
||||
@property
|
||||
def client_id(self):
|
||||
return self._setting('CLIENT_ID', '')
|
||||
|
@ -110,26 +92,10 @@ class AppSettings(object):
|
|||
def client_credentials(self):
|
||||
return self._setting('CLIENT_CREDENTIALS', ())
|
||||
|
||||
@property
|
||||
def show_button_quick_account_creation(self):
|
||||
return self._setting('SHOW_BUTTON_QUICK_ACCOUNT_CREATION', True)
|
||||
|
||||
@property
|
||||
def auto_register(self):
|
||||
return self._setting('AUTO_REGISTER', True)
|
||||
|
||||
@property
|
||||
def fd_list(self):
|
||||
return self._setting('FD_LIST', {})
|
||||
|
||||
@property
|
||||
def scopes(self):
|
||||
return self._setting('SCOPES', ['profile', 'email'])
|
||||
|
||||
@property
|
||||
def popup(self):
|
||||
return self._setting('POPUP', False)
|
||||
|
||||
|
||||
app_settings = AppSettings('A2_FC_')
|
||||
app_settings.__name__ = __name__
|
||||
|
|
|
@ -32,13 +32,6 @@ class Plugin(object):
|
|||
return [url]
|
||||
return []
|
||||
|
||||
def registration_form_prefill(self, request):
|
||||
from . import utils
|
||||
|
||||
if app_settings.enable_registration_form_prefill:
|
||||
return [utils.get_mapped_attributes(request)]
|
||||
return []
|
||||
|
||||
|
||||
class AppConfig(django.apps.AppConfig):
|
||||
name = 'authentic2_auth_fc'
|
||||
|
|
|
@ -47,19 +47,13 @@ class FcAuthenticator(BaseAuthenticator):
|
|||
return
|
||||
fc_user_info = request.session.get('fc_user_info')
|
||||
context = kwargs.pop('context', {}).copy()
|
||||
params = {}
|
||||
if app_settings.popup:
|
||||
params['popup'] = ''
|
||||
context.update(
|
||||
{
|
||||
'popup': app_settings.popup,
|
||||
'about_url': app_settings.about_url,
|
||||
'fc_user_info': fc_user_info,
|
||||
}
|
||||
)
|
||||
context['login_url'] = a2_utils.make_url(
|
||||
'fc-login-or-link', keep_params=True, params=params, request=request
|
||||
)
|
||||
context['login_url'] = a2_utils.make_url('fc-login-or-link', keep_params=True, request=request)
|
||||
context['block-extra-css-class'] = 'fc-login'
|
||||
template = 'authentic2_auth_fc/login.html'
|
||||
return TemplateResponse(request, template, context)
|
||||
|
@ -74,14 +68,11 @@ class FcAuthenticator(BaseAuthenticator):
|
|||
params = {
|
||||
'next': account_path,
|
||||
}
|
||||
if app_settings.popup:
|
||||
params['popup'] = ''
|
||||
link_url = a2_utils.make_url('fc-login-or-link', params=params)
|
||||
|
||||
context = kwargs.pop('context', {}).copy()
|
||||
context.update(
|
||||
{
|
||||
'popup': app_settings.popup,
|
||||
'unlink': unlink,
|
||||
'about_url': app_settings.about_url,
|
||||
'link_url': link_url,
|
||||
|
|
|
@ -20,19 +20,15 @@ import logging
|
|||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
from django.core.exceptions import MultipleObjectsReturned, PermissionDenied
|
||||
from django.db import IntegrityError
|
||||
|
||||
from authentic2 import hooks
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
|
||||
from . import models, utils
|
||||
from . import models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class FcBackend(ModelBackend):
|
||||
def authenticate(self, request=None, sub=None, **kwargs):
|
||||
user_info = kwargs.get('user_info')
|
||||
def authenticate(self, request, sub, token, user_info):
|
||||
user = None
|
||||
try:
|
||||
try:
|
||||
|
@ -40,55 +36,13 @@ class FcBackend(ModelBackend):
|
|||
except MultipleObjectsReturned:
|
||||
account = models.FcAccount.objects.select_related().get(sub=sub, order=0)
|
||||
except models.FcAccount.DoesNotExist:
|
||||
logger.debug(u'user with the sub %s does not exist.', sub)
|
||||
else:
|
||||
user = account.user
|
||||
logger.debug(u'found user %s with sub %s', user, sub)
|
||||
if not user.is_active:
|
||||
logger.info(u'user %s login refused, it is inactive', user)
|
||||
raise PermissionDenied
|
||||
if user_info:
|
||||
User = get_user_model()
|
||||
user_qs = User.objects.filter(**kwargs.get('user_filter', {}))
|
||||
if user_qs.count() > 1:
|
||||
return
|
||||
if user_qs.exists():
|
||||
user = user_qs.get()
|
||||
return None
|
||||
|
||||
if not user:
|
||||
user = User(ou=get_default_ou())
|
||||
user.set_unusable_password()
|
||||
user.save()
|
||||
try:
|
||||
models.FcAccount.objects.create(
|
||||
user=user, sub=sub, order=0, token=json.dumps(kwargs['token'])
|
||||
)
|
||||
except IntegrityError:
|
||||
# uniqueness check failed, as the user is new, it can only means that the sub is not unique
|
||||
# let's try again
|
||||
user.delete()
|
||||
return self.authenticate(sub, **kwargs)
|
||||
else:
|
||||
logger.debug(
|
||||
u'user creation enabled with fc_account (sub : %s - token : %s)',
|
||||
sub,
|
||||
json.dumps(kwargs['token']),
|
||||
)
|
||||
hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
|
||||
if not account.user.is_active:
|
||||
logger.info('auth_fc: login refused for user %s, it is inactive', user)
|
||||
raise PermissionDenied
|
||||
|
||||
# always handle given_name and family_name
|
||||
updated = []
|
||||
if user_info.get('given_name') and user.first_name != user_info['given_name']:
|
||||
user.first_name = user_info['given_name']
|
||||
updated.append('given name: "%s"' % user_info['given_name'])
|
||||
if user_info.get('family_name') and user.last_name != user_info['family_name']:
|
||||
user.last_name = user_info['family_name']
|
||||
updated.append('family name: "%s"' % user_info['family_name'])
|
||||
if updated:
|
||||
user.save()
|
||||
logger.debug('updated (%s)', ' - '.join(updated))
|
||||
utils.apply_user_info_mappings(user, user_info)
|
||||
return user
|
||||
return account.user
|
||||
|
||||
def get_saml2_authn_context(self):
|
||||
import lasso
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
/* Open FranceConnect in popup */
|
||||
|
||||
|
||||
(function(undef) {
|
||||
function PopupCenter(url, title, w, h) {
|
||||
// Fixes dual-screen position Most browsers Firefox
|
||||
var dualScreenLeft = window.screenLeft != undefined ? window.screenLeft : window.screenX;
|
||||
var dualScreenTop = window.screenTop != undefined ? window.screenTop : window.screenY;
|
||||
|
||||
var width = window.innerWidth ? window.innerWidth : document.documentElement.clientWidth ? document.documentElement.clientWidth : screen.width;
|
||||
var height = window.innerHeight ? window.innerHeight : document.documentElement.clientHeight ? document.documentElement.clientHeight : screen.height;
|
||||
|
||||
var left = ((width / 2) - (w / 2)) + dualScreenLeft;
|
||||
var top = ((height / 2) - (h / 2)) + dualScreenTop;
|
||||
var newWindow = window.open(url, title, 'noopener,noreferrer,location=0,status=0,menubar=0,toolbar=0,scrollbars=yes, width=' + w + ', height=' + h + ', top=' + top + ', left=' + left);
|
||||
newWindow.opener = null;
|
||||
|
||||
// Puts focus on the newWindow
|
||||
if (window.focus) {
|
||||
newWindow.focus();
|
||||
}
|
||||
}
|
||||
var tags = document.getElementsByClassName('js-fc-popup');
|
||||
for (var i = 0; i < tags.length; i++) {
|
||||
var tag = tags[i];
|
||||
tag.onclick = function (ev) {
|
||||
PopupCenter(this.href, 'Authentification FranceConnect', 700, 500);
|
||||
return false;
|
||||
};
|
||||
}
|
||||
})();
|
|
@ -18,7 +18,7 @@
|
|||
<div id="fc-button-wrapper">
|
||||
<div id="fc-button">
|
||||
<div>{% trans "Link with a FranceConnect account" %}</div>
|
||||
<a href="{{ link_url }}" class="button linking-button connexion{% if popup %} js-fc-popup{% endif %}">
|
||||
<a href="{{ link_url }}" class="button linking-button connexion">
|
||||
<span class="sr-only">{% trans "Link with a FranceConnect account" %}</span>
|
||||
</a>
|
||||
</div>
|
||||
|
@ -29,4 +29,3 @@
|
|||
</div>
|
||||
<p><a href="{{ about_url }}" target="_blank" rel="noopener">{% trans "What is FranceConnect?" %}</a></p>
|
||||
</div>
|
||||
{% if popup %}<script src="{% static 'authentic2_auth_fc/js/fc.js' %}" type="text/javascript"></script>{% endif %}
|
||||
|
|
|
@ -6,11 +6,10 @@
|
|||
<div id="fc-button-wrapper">
|
||||
<div id="fc-button">
|
||||
<a href="{{ login_url }}"
|
||||
class="button connexion{% if popup %} js-fc-popup{% endif %}">
|
||||
class="button connexion">
|
||||
<span class="sr-only">{% trans 'Log in with FranceConnect' %}</span>
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
{% include "authentic2_auth_fc/explanation.html" %}
|
||||
{% if popup %}<script src="{% static 'authentic2_auth_fc/js/fc.js' %}" type="text/javascript"></script>{% endif %}
|
||||
{% endblock %}
|
||||
|
|
|
@ -57,24 +57,6 @@ def build_logout_url(request, next_url=None):
|
|||
return None
|
||||
|
||||
|
||||
def get_mapped_attributes(request):
|
||||
values = {}
|
||||
if 'fc_user_info' in request.session:
|
||||
for fc_name, local_name in app_settings.attributes_mapping.items():
|
||||
if fc_name in request.session['fc_user_info']:
|
||||
values[local_name] = [request.session['fc_user_info'][fc_name]]
|
||||
return values
|
||||
|
||||
|
||||
def get_mapped_attributes_flat(request):
|
||||
values = {}
|
||||
if 'fc_user_info' in request.session:
|
||||
for fc_name, local_name in app_settings.attributes_mapping.items():
|
||||
if fc_name in request.session['fc_user_info']:
|
||||
values[local_name] = request.session['fc_user_info'][fc_name]
|
||||
return values
|
||||
|
||||
|
||||
def get_ref(ref, user_info):
|
||||
if not hasattr(user_info, 'items'):
|
||||
return None
|
||||
|
@ -202,3 +184,61 @@ def requests_retry_session(
|
|||
# set proxies
|
||||
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
|
||||
return session
|
||||
|
||||
|
||||
class RequestError(Exception):
|
||||
def __init__(self, message, **details):
|
||||
super().__init__(message)
|
||||
self.details = details
|
||||
|
||||
def __str__(self):
|
||||
s = super().__str__()
|
||||
if self.details:
|
||||
s += ' ('
|
||||
s += ' '.join('%s=%r' % (key, self.details[key]) for key in self.details)
|
||||
s += ')'
|
||||
return s
|
||||
|
||||
|
||||
def request_json(method, url, data=None, session=None, expected_statuses=None):
|
||||
session = requests_retry_session(session=session)
|
||||
try:
|
||||
response = getattr(session, method)(
|
||||
url,
|
||||
data=data,
|
||||
verify=app_settings.verify_certificate,
|
||||
allow_redirects=False,
|
||||
timeout=3,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
try:
|
||||
content = response.json()
|
||||
except ValueError:
|
||||
content = response.text[:256]
|
||||
if expected_statuses and response.status_code in expected_statuses:
|
||||
return content
|
||||
raise RequestError('status code is not 200', status_code=response.status_code, content=content)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise RequestError('HTTP request failed', exception=e)
|
||||
try:
|
||||
content = response.json()
|
||||
except ValueError:
|
||||
raise RequestError('content is not JSON', content=response.content[:1024])
|
||||
|
||||
if not isinstance(content, dict):
|
||||
raise RequestError('content is not a dict', content=content)
|
||||
return content
|
||||
|
||||
|
||||
def post_json(url, data, expected_statuses=None):
|
||||
return request_json('post', url, data=data, expected_statuses=expected_statuses)
|
||||
|
||||
|
||||
def get_json(url, session, expected_statuses=None):
|
||||
return request_json('get', url, session=session, expected_statuses=expected_statuses)
|
||||
|
||||
|
||||
def clean_fc_session(session):
|
||||
session.pop('fc_id_token', None)
|
||||
session.pop('fc_id_token_raw', None)
|
||||
|
|
|
@ -16,335 +16,79 @@
|
|||
|
||||
import json
|
||||
import logging
|
||||
import urllib.parse
|
||||
import uuid
|
||||
import time
|
||||
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
|
||||
from django.core import signing
|
||||
from django.core.cache import InvalidCacheBackendError, caches
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.views import update_session_auth_hash
|
||||
from django.core.cache import cache
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.db import IntegrityError
|
||||
from django.forms import Form
|
||||
from django.http import Http404, HttpResponseRedirect
|
||||
from django.shortcuts import render, resolve_url
|
||||
from django.urls import reverse
|
||||
from django.utils.http import is_safe_url, urlencode
|
||||
from django.utils.http import urlencode
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.generic import FormView, View
|
||||
from requests_oauthlib import OAuth2Session
|
||||
|
||||
try:
|
||||
from django.contrib.auth.views import update_session_auth_hash
|
||||
except ImportError:
|
||||
update_session_auth_hash = None
|
||||
|
||||
from authentic2 import app_settings as a2_app_settings
|
||||
from authentic2 import constants, hooks
|
||||
from authentic2 import models as a2_models
|
||||
from authentic2 import utils as a2_utils
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.compat.cookies import set_cookie
|
||||
from authentic2.crypto import check_hmac_url, hash_chain, hmac_url
|
||||
from authentic2.forms.passwords import SetPasswordForm
|
||||
from authentic2.utils import views as views_utils
|
||||
from authentic2.utils.service import get_service_from_request, set_service_ref
|
||||
from authentic2.utils.models import safe_get_or_create
|
||||
from authentic2.utils.service import get_service_from_ref, get_service_from_request, service_ref
|
||||
|
||||
from . import app_settings, models, utils
|
||||
from . import app_settings, models
|
||||
from .utils import (
|
||||
RequestError,
|
||||
apply_user_info_mappings,
|
||||
build_logout_url,
|
||||
clean_fc_session,
|
||||
get_json,
|
||||
post_json,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class LoggerMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
super(LoggerMixin, *args, **kwargs)
|
||||
class UserOutsideDefaultOu(Exception):
|
||||
pass
|
||||
|
||||
|
||||
try:
|
||||
cache = caches['fc']
|
||||
except InvalidCacheBackendError:
|
||||
cache = caches['default']
|
||||
|
||||
|
||||
CACHE_TIMEOUT = 60
|
||||
|
||||
|
||||
def ask_authorization(request, scopes, logger):
|
||||
'''Compute an authorize URL for obtaining the given scope'''
|
||||
if not isinstance(scopes, (list, tuple)):
|
||||
scopes = [scopes]
|
||||
redirect_uri = request.build_absolute_uri()
|
||||
state = str(uuid.uuid4())
|
||||
states = request.session.setdefault('fc_states', {})
|
||||
states[state] = {
|
||||
'redirect_uri': redirect_uri,
|
||||
}
|
||||
request.session.modified = True
|
||||
params = {
|
||||
'client_id': app_settings.client_id,
|
||||
'scope': ' '.join(scopes),
|
||||
'redirect_uri': redirect_uri,
|
||||
'response_type': 'code',
|
||||
'state': state,
|
||||
'nonce': state,
|
||||
'acr_values': 'eidas1',
|
||||
}
|
||||
logger.debug('query string %s', params)
|
||||
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
|
||||
logger.debug('redirect to %s', url)
|
||||
response = HttpResponseRedirect(url)
|
||||
response.display_message = False
|
||||
return response
|
||||
|
||||
|
||||
def resolve_access_token(authorization_code, redirect_uri, logger):
|
||||
'''Exchange an authorization_code for an access_token'''
|
||||
data = {
|
||||
'code': authorization_code,
|
||||
'client_id': app_settings.client_id,
|
||||
'client_secret': app_settings.client_secret,
|
||||
'redirect_uri': redirect_uri,
|
||||
'grant_type': 'authorization_code',
|
||||
}
|
||||
logger.debug('access token request %s', data)
|
||||
try:
|
||||
session = utils.requests_retry_session()
|
||||
response = session.post(
|
||||
app_settings.token_url,
|
||||
data=data,
|
||||
verify=app_settings.verify_certificate,
|
||||
allow_redirects=False,
|
||||
timeout=3,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
try:
|
||||
data = response.json()
|
||||
logger.error(u'oauth2 error on access token retrieval: %r', data)
|
||||
except ValueError:
|
||||
data = {}
|
||||
logger.error(u'oauth2 error on access token retrieval: %r', response.content[:1024])
|
||||
return
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(u'unable to retrieve access token {}'.format(e))
|
||||
else:
|
||||
try:
|
||||
response = response.json()
|
||||
logger.debug('token resolved : %s', response)
|
||||
return response
|
||||
except ValueError:
|
||||
logger.error(
|
||||
'no JSON object can be decoded from the data received from %s: %r',
|
||||
app_settings.token_url,
|
||||
response.content[:1024],
|
||||
)
|
||||
|
||||
|
||||
def access_token_from_request(request, logger):
|
||||
"""Resolve an access token given a request returning from the authorization
|
||||
endpoint.
|
||||
class LoginOrLinkView(View):
|
||||
"""Login with FC, if the FC account is already linked, connect this user,
|
||||
if a user is logged link the user to this account, otherwise display an
|
||||
error message.
|
||||
"""
|
||||
code = request.GET.get('code')
|
||||
state = request.GET.get('state')
|
||||
if not code:
|
||||
return
|
||||
if not state:
|
||||
return
|
||||
states = request.session.get('fc_states', {})
|
||||
if state not in states:
|
||||
return
|
||||
# there should not be many FC SSO in flight
|
||||
redirect_uri = states[state]['redirect_uri']
|
||||
return resolve_access_token(code, redirect_uri, logger)
|
||||
|
||||
_next_url = None
|
||||
service = None
|
||||
|
||||
ACCESS_GRANT_CODE = 'accessgrantcode'
|
||||
@property
|
||||
def next_url(self):
|
||||
return self._next_url or a2_utils.select_next_url(self.request, default=settings.LOGIN_REDIRECT_URL)
|
||||
|
||||
@property
|
||||
def redirect_uri(self):
|
||||
return self.request.build_absolute_uri(reverse('fc-login-or-link'))
|
||||
|
||||
def clean_fc_session(session):
|
||||
session.pop('fc_id_token', None)
|
||||
session.pop('fc_id_token_raw', None)
|
||||
session.pop('fc_user_info', None)
|
||||
session.pop('fc_data', None)
|
||||
|
||||
|
||||
class FcOAuthSessionViewMixin(LoggerMixin):
|
||||
'''Add the OAuth2 dance to a view'''
|
||||
|
||||
redirect_field_name = REDIRECT_FIELD_NAME
|
||||
in_popup = False
|
||||
token = None
|
||||
user_info = None
|
||||
|
||||
def get_in_popup(self):
|
||||
return self.in_popup
|
||||
|
||||
def redirect_to(self, request):
|
||||
if request.method == 'POST':
|
||||
redirect_to = request.POST.get(
|
||||
self.redirect_field_name, request.GET.get(self.redirect_field_name, '')
|
||||
)
|
||||
else:
|
||||
redirect_to = request.GET.get(self.redirect_field_name, '')
|
||||
|
||||
safe = is_safe_url(url=redirect_to, allowed_hosts=request.get_host())
|
||||
if not safe:
|
||||
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
|
||||
return redirect_to
|
||||
|
||||
def close_popup_redirect(self, request, next_url, *args, **kwargs):
|
||||
"""Show a page to close the current popup and reload the parent window
|
||||
with the return url.
|
||||
"""
|
||||
return render(request, 'authentic2_auth_fc/close-popup-redirect.html', {'redirect_to': next_url})
|
||||
|
||||
def simple_redirect(self, request, next_url, *args, **kwargs):
|
||||
return a2_utils.redirect(request, next_url, *args, resolve=False, **kwargs)
|
||||
|
||||
def redirect(self, request, *args, **kwargs):
|
||||
next_url = kwargs.pop('next_url', None)
|
||||
if next_url is None:
|
||||
next_url = self.redirect_to(request, *args, **kwargs)
|
||||
if self.get_in_popup():
|
||||
return self.close_popup_redirect(request, next_url, *args, **kwargs)
|
||||
else:
|
||||
return self.simple_redirect(request, next_url, *args, **kwargs)
|
||||
|
||||
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
|
||||
old_next_url = self.redirect_to(request)
|
||||
here = a2_utils.make_url(request.path, params={REDIRECT_FIELD_NAME: old_next_url})
|
||||
here = a2_utils.make_url(here, **kwargs)
|
||||
there = a2_utils.make_url(next_url, params={REDIRECT_FIELD_NAME: here})
|
||||
return self.redirect(request, next_url=there, *args, **kwargs)
|
||||
|
||||
def get_scopes(self):
|
||||
return list(set(['openid'] + app_settings.scopes))
|
||||
|
||||
def get_ressource(self, url, verify):
|
||||
try:
|
||||
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
|
||||
data.raise_for_status()
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error('unable to retrieve ressource from %s due to %s', url, e)
|
||||
else:
|
||||
try:
|
||||
data = data.json()
|
||||
self.logger.debug('ressource resolved: %s', data)
|
||||
return data
|
||||
except ValueError:
|
||||
self.logger.error(
|
||||
'no JSON object can be decoded from the data received from %s: %r', url, data.content
|
||||
)
|
||||
|
||||
def get_user_info(self):
|
||||
return self.get_ressource(
|
||||
app_settings.userinfo_url + '?schema=openid', app_settings.verify_certificate
|
||||
)
|
||||
|
||||
def get_data(self, scopes=[]):
|
||||
data = dict()
|
||||
if not app_settings.fd_list:
|
||||
return data
|
||||
for scope in scopes:
|
||||
for fd in app_settings.fd_list[scope]:
|
||||
url = fd['url']
|
||||
if fd['query_dic']:
|
||||
url += '?' + urlencode(fd['query_dic'])
|
||||
d = self.get_ressource(url, app_settings.verify_certificate)
|
||||
if d:
|
||||
data.setdefault(scope, []).append((fd['name'], d))
|
||||
return data
|
||||
|
||||
def authorization_error(self, request, *args, **kwargs):
|
||||
error = request.GET.get('error')
|
||||
error_description = request.GET.get('error_description')
|
||||
msg = _('Unable to connect to FranceConnect: "%s".') % error
|
||||
if error_description:
|
||||
msg += _('("%s")') % error_description
|
||||
messages.error(request, msg)
|
||||
self.logger.warning(
|
||||
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
|
||||
)
|
||||
return self.redirect(request)
|
||||
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
'''Interpret the OAuth authorization dance'''
|
||||
if 'code' in request.GET:
|
||||
self.token = access_token_from_request(request, self.logger)
|
||||
|
||||
# Token request may not be completly processed and result in no
|
||||
# token
|
||||
if not self.token:
|
||||
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
||||
return self.redirect(request)
|
||||
|
||||
# The token request may fail, 'error' is then required.
|
||||
# A bad client secret results in error equals to invalid_request
|
||||
# for FC and invalid_client for oidc_provider.
|
||||
if 'error' in self.token:
|
||||
msg = 'token request failed : {}'.format(self.token)
|
||||
self.logger.warning(msg)
|
||||
messages.warning(
|
||||
request, _('Unable to connect to FranceConnect: "%s".') % self.token['error']
|
||||
)
|
||||
return self.redirect(request)
|
||||
key = app_settings.client_secret
|
||||
# duck-type unicode/Py3 strings
|
||||
if hasattr(key, 'isdecimal'):
|
||||
key = key.encode('utf-8')
|
||||
self.id_token, error = models.parse_id_token(
|
||||
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
|
||||
)
|
||||
if not self.id_token:
|
||||
self.logger.error(u'validation of id_token failed: %s', error)
|
||||
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
||||
return self.redirect(request)
|
||||
nonce = self.id_token.get('nonce')
|
||||
states = request.session.get('fc_states', {})
|
||||
if not nonce or nonce not in states:
|
||||
self.logger.error(
|
||||
u'invalid nonce in id_token %s, known ones %s', nonce, u', '.join(states.keys())
|
||||
)
|
||||
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
||||
return self.redirect(request)
|
||||
self.logger.debug('fc id_token %s', self.id_token)
|
||||
for key in self.id_token:
|
||||
setattr(self, key, self.id_token[key])
|
||||
self.oauth_session = lambda: utils.requests_retry_session(
|
||||
session=OAuth2Session(app_settings.client_id, token=self.token)
|
||||
)
|
||||
self.user_info = self.get_user_info()
|
||||
if not self.user_info:
|
||||
self.logger.error('userinfo resolution failed: %s', self.token)
|
||||
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
||||
return self.redirect(request)
|
||||
self.logger.debug('fc user_info %s', self.user_info)
|
||||
self.request.session['fc_id_token'] = self.id_token
|
||||
self.request.session['fc_id_token_raw'] = self.token['id_token']
|
||||
self.request.session['fc_user_info'] = self.user_info
|
||||
if 'fd_scopes' in request.GET:
|
||||
scopes = request.GET.get('fd_scopes')
|
||||
scopes = scopes.split()
|
||||
self.data = self.get_data(scopes)
|
||||
self.logger.debug('fc data %s', self.data)
|
||||
fc_data = self.request.session.setdefault('fc_data', {})
|
||||
for scope in self.data:
|
||||
fc_data.setdefault(scope, []).extend(self.data[scope])
|
||||
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
|
||||
return super(FcOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs)
|
||||
elif 'error' in request.GET:
|
||||
return self.authorization_error(request, *args, **kwargs)
|
||||
else:
|
||||
scopes = self.get_scopes()
|
||||
if 'fd_scopes' in request.GET:
|
||||
scopes = list(set(scopes) | set(request.GET['fd_scopes'].split()))
|
||||
return ask_authorization(request, scopes, self.logger)
|
||||
def redirect(self):
|
||||
return a2_utils.redirect(self.request, self.next_url)
|
||||
|
||||
@property
|
||||
def fc_display_name(self):
|
||||
'''Human representation of the current FC account'''
|
||||
display_name = ''
|
||||
user_info = self.user_info or {}
|
||||
family_name = user_info.get('family_name')
|
||||
given_name = user_info.get('given_name')
|
||||
family_name = self.user_info.get('family_name')
|
||||
given_name = self.user_info.get('given_name')
|
||||
if given_name:
|
||||
display_name += given_name
|
||||
if family_name:
|
||||
|
@ -353,49 +97,218 @@ class FcOAuthSessionViewMixin(LoggerMixin):
|
|||
display_name += family_name
|
||||
return display_name
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
code = request.GET.get('code')
|
||||
state = request.GET.get('state')
|
||||
|
||||
class PopupViewMixin(object):
|
||||
def get_in_popup(self):
|
||||
return 'popup' in self.request.GET
|
||||
|
||||
|
||||
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
|
||||
"""Login with FC, if the FC account is already linked, connect this user,
|
||||
if a user is logged link the user to this account, otherwise display an
|
||||
error message.
|
||||
"""
|
||||
|
||||
def update_user_info(self):
|
||||
self.fc_account.token = json.dumps(self.token)
|
||||
self.fc_account.user_info = json.dumps(self.user_info)
|
||||
self.fc_account.save(update_fields=['token', 'user_info'])
|
||||
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
|
||||
self.logger.debug('updating user_info %s', self.fc_account.user_info)
|
||||
|
||||
def uniqueness_check_failed(self, request):
|
||||
if request.user.is_authenticated:
|
||||
# currently logged :
|
||||
if models.FcAccount.objects.filter(user=request.user, order=0).count():
|
||||
# cannot link because we are already linked to another FC account
|
||||
messages.error(request, _('Your account is already linked to a FranceConnect account'))
|
||||
else:
|
||||
# cannot link because the FC account is already linked to another account.
|
||||
messages.error(
|
||||
request,
|
||||
_('The FranceConnect account {} is already' ' linked with another account.').format(
|
||||
self.fc_display_name
|
||||
),
|
||||
)
|
||||
if code and state:
|
||||
response = self.handle_authorization_response(request, code=code, state=state)
|
||||
response.delete_cookie('fc-state', path=reverse('fc-login-or-link'))
|
||||
return response
|
||||
elif 'error' in request.GET:
|
||||
return self.authorization_error(
|
||||
request, error=request.GET['error'], error_description=request.GET.get('error_description')
|
||||
)
|
||||
else:
|
||||
# not logged, cannot login because the user is disabled (user.is_active is False)
|
||||
messages.error(request, _('Your account is disabled.'))
|
||||
return self.redirect(request)
|
||||
return self.make_authorization_request(request)
|
||||
|
||||
def handle_authorization_response(self, request, code, state):
|
||||
# check state signature and parse it
|
||||
try:
|
||||
state, self._next_url, self.service = self.decode_state(state)
|
||||
except ValueError:
|
||||
return a2_utils.redirect(request, settings.LOGIN_REDIRECT_URL)
|
||||
|
||||
# regenerte the chain of hash from the stored nonce_seed
|
||||
try:
|
||||
encoded_seed = request.COOKIES.get('fc-state', '')
|
||||
if not encoded_seed:
|
||||
raise ValueError
|
||||
dummy, hash_nonce, hash_state = hash_chain(3, encoded_seed=encoded_seed)
|
||||
if not state or state != hash_state:
|
||||
logger.warning('auth_fc: state lost, requesting authorization again')
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
return self.make_authorization_request(request)
|
||||
|
||||
# resolve the authorization_code and check the token endpoint response
|
||||
self.token = self.resolve_authorization_code(code)
|
||||
if not self.token:
|
||||
# resolve_authorization_code already logged a warning.
|
||||
return self.report_fc_is_down(request)
|
||||
if 'error' in self.token:
|
||||
logger.warning('auth_fc: token request failed, "%s"', self.token)
|
||||
messages.warning(
|
||||
request,
|
||||
_('Unable to connect to FranceConnect: "%s".')
|
||||
% (self.token.get('error_description') or self.token['error']),
|
||||
)
|
||||
return self.redirect()
|
||||
|
||||
# parse the id_token
|
||||
if not self.token.get('id_token') or not isinstance(self.token['id_token'], str):
|
||||
logger.warning('auth_fc: token endpoint did not return an id_token')
|
||||
return self.report_fc_is_down(request)
|
||||
|
||||
key = app_settings.client_secret.encode()
|
||||
self.id_token, error = models.parse_id_token(
|
||||
self.token['id_token'], client_id=app_settings.client_id, client_secret=key
|
||||
)
|
||||
if not self.id_token:
|
||||
logger.warning('auth_fc: validation of id_token failed: %s', error)
|
||||
return self.report_fc_is_down(request)
|
||||
logger.debug('auth_fc: parsed id_token %s', self.id_token)
|
||||
|
||||
nonce = self.id_token.get('nonce')
|
||||
if nonce != hash_nonce:
|
||||
logger.warning('auth_fc: invalid nonce in id_token')
|
||||
return self.report_fc_is_down(request)
|
||||
|
||||
self.sub = self.id_token.get('sub')
|
||||
if not self.sub:
|
||||
logger.warning('auth_fc: no sub in id_token %s', self.id_token)
|
||||
return self.report_fc_is_down(request)
|
||||
|
||||
# get user info using the access token
|
||||
if not self.token.get('access_token') or not isinstance(self.token['access_token'], str):
|
||||
logger.warning('auth_fc: token endpoint did not return an access_token')
|
||||
return self.report_fc_is_down(request)
|
||||
|
||||
self.user_info = self.get_user_info()
|
||||
if self.user_info is None:
|
||||
return self.report_fc_is_down(request)
|
||||
logger.debug('auth_fc: user_info %s', self.user_info)
|
||||
|
||||
# clear FranceConnect down status
|
||||
cache.delete('fc_is_down')
|
||||
|
||||
if request.user.is_authenticated:
|
||||
return self.link(request)
|
||||
else:
|
||||
return self.login(request)
|
||||
|
||||
def encode_state(self, state, next_url, service):
|
||||
encoded_state = state + ' ' + self.next_url + ' '
|
||||
if service:
|
||||
encoded_state += service_ref(service)
|
||||
encoded_state += ' ' + hmac_url(settings.SECRET_KEY, encoded_state)
|
||||
return encoded_state
|
||||
|
||||
def decode_state(self, state):
|
||||
payload, signature = state.rsplit(' ', 1)
|
||||
if not check_hmac_url(settings.SECRET_KEY, payload, signature):
|
||||
raise ValueError
|
||||
# service_ref can be made of one or two parts
|
||||
try:
|
||||
state, next_url, service_ref = payload.split(' ')
|
||||
except ValueError:
|
||||
state, next_url, ou_slug, service_slug = payload.split(' ')
|
||||
service_ref = ou_slug + ' ' + service_slug
|
||||
service = get_service_from_ref(service_ref)
|
||||
return state, next_url, service
|
||||
|
||||
def make_authorization_request(self, request):
|
||||
scope = ' '.join(set(['openid'] + app_settings.scopes))
|
||||
service = self.service or get_service_from_request(request)
|
||||
|
||||
nonce_seed, nonce, state = hash_chain(3)
|
||||
|
||||
# encode the target service and next_url in the state
|
||||
full_state = state + ' ' + self.next_url + ' '
|
||||
if service:
|
||||
full_state += service_ref(service)
|
||||
full_state += ' ' + hmac_url(settings.SECRET_KEY, full_state)
|
||||
params = {
|
||||
'client_id': app_settings.client_id,
|
||||
'scope': scope,
|
||||
'redirect_uri': self.redirect_uri,
|
||||
'response_type': 'code',
|
||||
'state': self.encode_state(state, self.next_url, service),
|
||||
'nonce': nonce,
|
||||
'acr_values': 'eidas1',
|
||||
}
|
||||
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
|
||||
logger.debug('auth_fc: authorization_request redirect to %s', url)
|
||||
|
||||
response = HttpResponseRedirect(url)
|
||||
# prevent unshown messages to block the navigation to FranceConnect
|
||||
response.display_message = False
|
||||
|
||||
# store nonce_seed in a browser cookie to prevent CSRF and check nonce
|
||||
# in id_token on return by generating the hash chain again
|
||||
set_cookie(
|
||||
response,
|
||||
'fc-state',
|
||||
value=nonce_seed,
|
||||
path=reverse('fc-login-or-link'),
|
||||
httponly=True,
|
||||
secure=request.is_secure(),
|
||||
samesite='Lax',
|
||||
)
|
||||
return response
|
||||
|
||||
def resolve_authorization_code(self, authorization_code):
|
||||
'''Exchange an authorization_code for an access_token'''
|
||||
data = {
|
||||
'code': authorization_code,
|
||||
'client_id': app_settings.client_id,
|
||||
'client_secret': app_settings.client_secret,
|
||||
'redirect_uri': self.redirect_uri,
|
||||
'grant_type': 'authorization_code',
|
||||
}
|
||||
logger.debug('auth_fc: resolve_access_token request params %s', data)
|
||||
|
||||
try:
|
||||
token = post_json(app_settings.token_url, data, expected_statuses=[400])
|
||||
except RequestError as e:
|
||||
logger.warning('auth_fc: resolve_authorization_code error %s', e)
|
||||
return None
|
||||
else:
|
||||
logger.debug('auth_fc: token endpoint returned "%s"', token)
|
||||
return token
|
||||
|
||||
def get_user_info(self):
|
||||
try:
|
||||
data = get_json(
|
||||
app_settings.userinfo_url + '?schema=openid',
|
||||
session=OAuth2Session(app_settings.client_id, token=self.token),
|
||||
)
|
||||
except RequestError as e:
|
||||
logger.warning('auth_fc: get_user_info error %s', e)
|
||||
return None
|
||||
logger.debug('auth_fc: get_user_info returned %r', data)
|
||||
return data
|
||||
|
||||
def authorization_error(self, request, error, error_description):
|
||||
messages.error(request, _('Unable to connect to FranceConnect: "%s".') % (error_description or error))
|
||||
logger.warning(
|
||||
'auth_fc: authorization failed with error=%r error_description=%r', error, error_description or ''
|
||||
)
|
||||
return self.redirect()
|
||||
|
||||
def report_fc_is_down(self, request):
|
||||
messages.warning(request, _('Unable to connect to FranceConnect.'))
|
||||
# put FranceConnect status in cache, if it happens for more than 5 minutes, log an error
|
||||
last_down = cache.get('fc_is_down')
|
||||
now = time.time()
|
||||
more_than_5_minutes = last_down and (now - last_down) > 5 * 60
|
||||
if more_than_5_minutes:
|
||||
logger.error('auth_fc: FranceConnect is down for more than 5 minutes')
|
||||
if not last_down or more_than_5_minutes:
|
||||
cache.set('fc_is_down', now, 10 * 60)
|
||||
return self.redirect()
|
||||
|
||||
def link(self, request):
|
||||
'''Request an access grant code and associate it to the current user'''
|
||||
try:
|
||||
self.fc_account, created = models.FcAccount.objects.get_or_create(
|
||||
sub=self.sub, user=request.user, order=0, defaults={'token': json.dumps(self.token)}
|
||||
sub=self.sub,
|
||||
user=request.user,
|
||||
order=0,
|
||||
defaults={
|
||||
'token': json.dumps(self.token),
|
||||
'user_info': json.dumps(self.user_info),
|
||||
},
|
||||
)
|
||||
# Prevent adding a link with an FC account already linked with another user.
|
||||
except IntegrityError:
|
||||
|
@ -403,139 +316,170 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
|
|||
return self.uniqueness_check_failed(request)
|
||||
|
||||
if created:
|
||||
self.logger.info('fc link created sub %s', self.sub)
|
||||
logger.info('auth_fc: link created sub %s', self.sub)
|
||||
messages.info(
|
||||
request, _('Your FranceConnect account {} has been linked.').format(self.fc_display_name)
|
||||
)
|
||||
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
|
||||
else:
|
||||
messages.info(request, _('Your local account has been updated.'))
|
||||
self.update_user_info()
|
||||
return self.redirect(request)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
if request.user.is_authenticated:
|
||||
return self.link(request)
|
||||
else:
|
||||
return self.login(request)
|
||||
self.update_user_info(request.user, self.user_info)
|
||||
return self.redirect()
|
||||
|
||||
def login(self, request):
|
||||
self.service = get_service_from_request(request)
|
||||
default_ou = get_default_ou()
|
||||
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
|
||||
email_present_and_unique = self.user_info.get('email') and email_is_unique
|
||||
user_filter = {}
|
||||
if email_present_and_unique:
|
||||
user_filter = {'email__iexact': self.user_info['email']}
|
||||
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
|
||||
user_filter['ou'] = default_ou
|
||||
user = a2_utils.authenticate(request, sub=self.sub, user_info=self.user_info, token=self.token)
|
||||
|
||||
user = a2_utils.authenticate(
|
||||
request, sub=self.sub, user_info=self.user_info, token=self.token, user_filter=user_filter
|
||||
)
|
||||
if not user:
|
||||
user = self.create_account(request, sub=self.sub, token=self.token, user_info=self.user_info)
|
||||
|
||||
# ignore user if sub is not matching and let the code below handle it
|
||||
if not user.fc_accounts.filter(sub=self.sub).exists():
|
||||
user = None
|
||||
if not user:
|
||||
return self.redirect()
|
||||
|
||||
if user:
|
||||
self.fc_account = user.fc_accounts.get(order=0)
|
||||
if not user and email_present_and_unique:
|
||||
email = self.user_info['email']
|
||||
User = get_user_model()
|
||||
qs = User.objects.filter(**user_filter)
|
||||
return self.finish_login(request, user, self.user_info)
|
||||
|
||||
if qs.exists():
|
||||
# there should not be multiple accounts with the same mail
|
||||
if len(qs) > 1:
|
||||
self.logger.error(u'multiple accounts with the same mail %s, %s', email, list(qs))
|
||||
# ok we have one account
|
||||
elif len(qs) == 1:
|
||||
user = qs[0]
|
||||
# but does he have already a link to an FC account ?
|
||||
if not user.fc_accounts.exists():
|
||||
try:
|
||||
self.fc_account, created = models.FcAccount.objects.get_or_create(
|
||||
defaults={'token': json.dumps(self.token)}, sub=self.sub, user=user, order=0
|
||||
)
|
||||
except IntegrityError:
|
||||
# unique index check failed, find why.
|
||||
return self.uniqueness_check_failed(request)
|
||||
def finish_login(self, request, user, user_info):
|
||||
self.update_user_info(user, user_info)
|
||||
views_utils.check_cookie_works(request)
|
||||
a2_utils.login(request, user, 'france-connect', service=self.service)
|
||||
|
||||
if created:
|
||||
self.logger.info(u'fc link created sub %s user %s', self.sub, user)
|
||||
hooks.call_hooks(
|
||||
'event', name='fc-link', user=user, sub=self.sub, request=request
|
||||
)
|
||||
user = a2_utils.authenticate(
|
||||
request=request,
|
||||
sub=self.sub,
|
||||
user_info=self.user_info,
|
||||
token=self.token,
|
||||
user_filter=user_filter,
|
||||
)
|
||||
else:
|
||||
messages.warning(
|
||||
request,
|
||||
_(
|
||||
'Your FranceConnect email address \'%s\' is already used by another '
|
||||
'account, so we cannot create an account for you. Please create an '
|
||||
'account with another email address then link it to FranceConnect '
|
||||
'using your account management page.'
|
||||
)
|
||||
% email,
|
||||
)
|
||||
return self.redirect(request)
|
||||
if user:
|
||||
views_utils.check_cookie_works(request)
|
||||
a2_utils.login(request, user, 'france-connect', service=self.service)
|
||||
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
|
||||
request.session.set_expiry(0)
|
||||
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
|
||||
self.fc_account.token = json.dumps(self.token)
|
||||
self.fc_account.save(update_fields=['token'])
|
||||
self.update_user_info()
|
||||
self.logger.info('logged in using fc sub %s', self.sub)
|
||||
# keep id_token around for logout
|
||||
request.session['fc_id_token'] = self.id_token
|
||||
request.session['fc_id_token_raw'] = self.token['id_token']
|
||||
|
||||
# redirect to account edit page if any required attribute is missing
|
||||
data = utils.get_mapped_attributes_flat(request)
|
||||
required_attributes = a2_models.Attribute.objects.filter(required=True).values_list(
|
||||
'name', flat=True
|
||||
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
|
||||
request.session.set_expiry(0)
|
||||
|
||||
# redirect to account edit page if any required attribute is missing
|
||||
name_to_label = dict(a2_models.Attribute.objects.filter(required=True).values_list('name', 'label'))
|
||||
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(name_to_label)
|
||||
missing = []
|
||||
for attr_name in set(required):
|
||||
value = getattr(user, attr_name, None) or getattr(user.attributes, attr_name, None)
|
||||
if value in [None, '']:
|
||||
missing.append(name_to_label[attr_name])
|
||||
if missing:
|
||||
messages.warning(
|
||||
request,
|
||||
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
|
||||
)
|
||||
required = list(a2_app_settings.A2_REGISTRATION_REQUIRED_FIELDS) + list(required_attributes)
|
||||
missing = [attr for attr in set(required) - set(data) if not getattr(user.attributes, attr)]
|
||||
if missing:
|
||||
return a2_utils.redirect(request, 'profile_edit', params={'next': self.next_url})
|
||||
return self.redirect()
|
||||
|
||||
def create_account(self, request, sub, token, user_info):
|
||||
email = user_info.get('email')
|
||||
|
||||
if email:
|
||||
# try to create or find an user with this email
|
||||
try:
|
||||
user, created = self.get_or_create_user_with_email(email)
|
||||
except UserOutsideDefaultOu:
|
||||
user = None
|
||||
except User.MultipleObjectsReturned:
|
||||
user = None
|
||||
if not user:
|
||||
messages.warning(
|
||||
request,
|
||||
_('The following fields are mandatory for account creation: %s') % ', '.join(missing),
|
||||
_(
|
||||
'Your FranceConnect email address \'%s\' is already used by another '
|
||||
'account, so we cannot create an account for you. Please connect '
|
||||
'with you existing account or create an '
|
||||
'account with another email address then link it to FranceConnect '
|
||||
'using your account management page.'
|
||||
)
|
||||
% email,
|
||||
)
|
||||
return a2_utils.redirect(request, 'profile_edit', keep_params=True)
|
||||
return self.redirect(request)
|
||||
else:
|
||||
params = {}
|
||||
if self.service:
|
||||
set_service_ref(params, self.service)
|
||||
messages.info(
|
||||
request, _('If you already have an account, please log in, else ' 'create your account.')
|
||||
return None
|
||||
else: # no email, we cannot disembiguate users, let's create it anyway
|
||||
user = User.objects.create()
|
||||
created = True
|
||||
|
||||
try:
|
||||
if created:
|
||||
user.set_unusable_password()
|
||||
user.save()
|
||||
|
||||
models.FcAccount.objects.create(
|
||||
user=user,
|
||||
sub=sub,
|
||||
order=0,
|
||||
token=json.dumps(token),
|
||||
user_info=json.dumps(user_info),
|
||||
)
|
||||
except IntegrityError:
|
||||
# uniqueness check failed, as the user is new, it can only mean that the sub is not unique
|
||||
# let's try again
|
||||
if created:
|
||||
user.delete()
|
||||
return self.authenticate(request, sub=sub, token=token, user_info=user_info)
|
||||
except Exception:
|
||||
# if anything unexpected happen and user was created, delete it and re-raise
|
||||
if created:
|
||||
user.delete()
|
||||
raise
|
||||
else:
|
||||
if created:
|
||||
logger.info('auth_fc: new account "%s" created with FranceConnect sub "%s"', user, sub)
|
||||
hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
|
||||
else:
|
||||
logger.info('auth_fc: existing account "%s" linked to FranceConnect sub "%s"', user, sub)
|
||||
hooks.call_hooks('event', name='fc-link', user=user, sub=sub, request=request)
|
||||
|
||||
login_params = params.copy()
|
||||
if not app_settings.show_button_quick_account_creation:
|
||||
login_params['nofc'] = 1
|
||||
return a2_utils.authenticate(request, sub=sub, user_info=user_info, token=token)
|
||||
|
||||
login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
|
||||
return self.redirect_and_come_back(request, login_url, params=params)
|
||||
def uniqueness_check_failed(self, request):
|
||||
# currently logged :
|
||||
if models.FcAccount.objects.filter(user=request.user, order=0).count():
|
||||
# cannot link because we are already linked to another FC account
|
||||
messages.error(request, _('Your account is already linked to a FranceConnect account'))
|
||||
else:
|
||||
# cannot link because the FC account is already linked to another account.
|
||||
messages.error(
|
||||
request,
|
||||
_('The FranceConnect account {} is already linked with another account.').format(
|
||||
self.fc_display_name
|
||||
),
|
||||
)
|
||||
return self.redirect()
|
||||
|
||||
def update_user_info(self, user, user_info):
|
||||
# always handle given_name and family_name
|
||||
updated = []
|
||||
if user_info.get('given_name') and user.first_name != user_info['given_name']:
|
||||
user.first_name = user_info['given_name']
|
||||
updated.append('given name: "%s"' % user_info['given_name'])
|
||||
if user_info.get('family_name') and user.last_name != user_info['family_name']:
|
||||
user.last_name = user_info['family_name']
|
||||
updated.append('family name: "%s"' % user_info['family_name'])
|
||||
if updated:
|
||||
user.save()
|
||||
logger.debug('auth_fc: updated (%s)', ' - '.join(updated))
|
||||
apply_user_info_mappings(user, user_info)
|
||||
return user
|
||||
|
||||
def get_or_create_user_with_email(self, email):
|
||||
ou = get_default_ou()
|
||||
|
||||
if a2_app_settings.A2_EMAIL_IS_UNIQUE:
|
||||
instance, created = safe_get_or_create(User, email=email, defaults={'email': email, 'ou': ou})
|
||||
if instance.ou != ou:
|
||||
assert not created # should not be possible
|
||||
raise UserOutsideDefaultOu
|
||||
return instance, created
|
||||
elif ou.email_is_unique:
|
||||
return safe_get_or_create(User, ou=ou, email=email, defaults={'email': email, 'ou': ou})
|
||||
else:
|
||||
return User.objects.create(email=email), True
|
||||
|
||||
|
||||
class UnlinkView(LoggerMixin, FormView):
|
||||
login_or_link = LoginOrLinkView.as_view()
|
||||
|
||||
|
||||
class UnlinkView(FormView):
|
||||
template_name = 'authentic2_auth_fc/unlink.html'
|
||||
|
||||
def get_success_url(self):
|
||||
url = reverse('account_management')
|
||||
if app_settings.logout_when_unlink:
|
||||
# logout URL can be None if not session exists with FC
|
||||
url = utils.build_logout_url(self.request, next_url=url) or url
|
||||
clean_fc_session(self.request.session)
|
||||
url = build_logout_url(self.request, next_url=url) or url
|
||||
return url
|
||||
|
||||
def get_form_class(self):
|
||||
|
@ -545,7 +489,7 @@ class UnlinkView(LoggerMixin, FormView):
|
|||
return form_class
|
||||
|
||||
def get_form_kwargs(self, **kwargs):
|
||||
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
|
||||
kwargs = super().get_form_kwargs(**kwargs)
|
||||
if self.must_set_password():
|
||||
kwargs['user'] = self.request.user
|
||||
return kwargs
|
||||
|
@ -565,26 +509,27 @@ class UnlinkView(LoggerMixin, FormView):
|
|||
if self.must_set_password() and not a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
|
||||
# Prevent access to the view.
|
||||
raise Http404
|
||||
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
|
||||
def form_valid(self, form):
|
||||
if self.must_set_password():
|
||||
form.save()
|
||||
update_session_auth_hash(self.request, self.request.user)
|
||||
self.logger.info(u'user %s has set a password', self.request.user)
|
||||
logger.info('auth_fc: user %s has set a password', self.request.user)
|
||||
links = models.FcAccount.objects.filter(user=self.request.user)
|
||||
for link in links:
|
||||
self.logger.info(u'user %s unlinked from %s', self.request.user, link)
|
||||
logger.info('auth_fc: user %s unlinked from %s', self.request.user, link)
|
||||
hooks.call_hooks('event', name='fc-unlink', user=self.request.user)
|
||||
messages.info(self.request, _('The link with the FranceConnect account has been deleted.'))
|
||||
links.delete()
|
||||
response = super(UnlinkView, self).form_valid(form)
|
||||
response = super().form_valid(form)
|
||||
if app_settings.logout_when_unlink:
|
||||
response.display_message = False
|
||||
clean_fc_session(self.request.session)
|
||||
return response
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super(UnlinkView, self).get_context_data(**kwargs)
|
||||
context = super().get_context_data(**kwargs)
|
||||
if self.must_set_password():
|
||||
context['no_password'] = True
|
||||
return context
|
||||
|
@ -592,10 +537,9 @@ class UnlinkView(LoggerMixin, FormView):
|
|||
def post(self, request, *args, **kwargs):
|
||||
if 'cancel' in request.POST:
|
||||
return a2_utils.redirect(request, 'account_management')
|
||||
return super(UnlinkView, self).post(request, *args, **kwargs)
|
||||
return super().post(request, *args, **kwargs)
|
||||
|
||||
|
||||
login_or_link = LoginOrLinkView.as_view()
|
||||
unlink = UnlinkView.as_view()
|
||||
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ from django.utils.http import urlencode
|
|||
from django.utils.timezone import now
|
||||
from jwcrypto import jwk, jwt
|
||||
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.models import Service
|
||||
from authentic2.utils import make_url
|
||||
|
||||
|
@ -40,6 +41,8 @@ CLIENT_SECRET = 'yyy'
|
|||
|
||||
class FranceConnectMock:
|
||||
exp = None
|
||||
token_endpoint_response = None
|
||||
user_info_endpoint_response = None
|
||||
|
||||
def __init__(self):
|
||||
self.sub = '1234'
|
||||
|
@ -76,7 +79,7 @@ class FranceConnectMock:
|
|||
|
||||
@property
|
||||
def callback_url(self):
|
||||
return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
|
||||
return 'http://testserver' + reverse('fc-login-or-link')
|
||||
|
||||
def login_with_fc_fixed_params(self, app):
|
||||
if app.session:
|
||||
|
@ -97,6 +100,9 @@ class FranceConnectMock:
|
|||
return self.handle_authorization(app, response.location, status=302).follow()
|
||||
|
||||
def access_token_response(self, url, request):
|
||||
if self.token_endpoint_response:
|
||||
return self.token_endpoint_response
|
||||
|
||||
formdata = QueryDict(request.body)
|
||||
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret', 'redirect_uri', 'grant_type'}
|
||||
assert formdata['code'] == self.code
|
||||
|
@ -127,6 +133,9 @@ class FranceConnectMock:
|
|||
return t.serialize()
|
||||
|
||||
def user_info_response(self, url, request):
|
||||
if self.user_info_endpoint_response:
|
||||
return self.user_info_endpoint_response
|
||||
|
||||
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
|
||||
user_info = self.user_info.copy()
|
||||
user_info['sub'] = self.sub
|
||||
|
@ -151,12 +160,12 @@ class FranceConnectMock:
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def franceconnect(settings, service):
|
||||
def franceconnect(settings, db):
|
||||
settings.A2_FC_ENABLE = True
|
||||
settings.A2_FC_CLIENT_ID = CLIENT_ID
|
||||
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
|
||||
|
||||
Service.objects.create(name='portail', slug='portail')
|
||||
Service.objects.create(name='portail', slug='portail', ou=get_default_ou())
|
||||
mock_object = FranceConnectMock()
|
||||
with mock_object():
|
||||
yield mock_object
|
||||
|
|
|
@ -16,17 +16,24 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
import requests
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import now
|
||||
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.apps.journal.models import Event
|
||||
from authentic2.custom_user.models import DeletedUser
|
||||
from authentic2.models import Attribute
|
||||
from authentic2.models import Attribute, Service
|
||||
from authentic2_auth_fc import models
|
||||
from authentic2_auth_fc.backends import FcBackend
|
||||
from authentic2_auth_fc.utils import requests_retry_session
|
||||
|
||||
from ..utils import get_link_from_mail, login
|
||||
|
@ -38,10 +45,19 @@ def path(url):
|
|||
return urllib.parse.urlparse(url).path
|
||||
|
||||
|
||||
def test_login_redirect(app, franceconnect):
|
||||
def test_fc_url_on_login(app, franceconnect):
|
||||
url = reverse('fc-login-or-link')
|
||||
response = app.get(url, status=302)
|
||||
assert response['Location'].startswith('https://fcp.integ01')
|
||||
assert response.location.startswith('https://fcp.integ01')
|
||||
assert 'fc-state' in app.cookies
|
||||
|
||||
|
||||
def test_retry_authorization_if_state_is_lost(settings, app, franceconnect, hooks):
|
||||
response = app.get('/fc/callback/?next=/idp/&service=default%20portail', status=302)
|
||||
# clear fc-state cookie
|
||||
app.cookiejar.clear()
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert response.location.startswith('https://fcp.integ01')
|
||||
|
||||
|
||||
def test_login_with_condition(settings, app, franceconnect):
|
||||
|
@ -61,7 +77,7 @@ def test_login_autorun(settings, app, franceconnect):
|
|||
# hide password block
|
||||
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
|
||||
response = app.get('/login/')
|
||||
assert response['Location'] == reverse('fc-login-or-link')
|
||||
assert response.location == reverse('fc-login-or-link')
|
||||
|
||||
|
||||
def test_create(settings, app, franceconnect, hooks):
|
||||
|
@ -71,13 +87,17 @@ def test_create(settings, app, franceconnect, hooks):
|
|||
response = response.click(href='callback')
|
||||
|
||||
assert User.objects.count() == 0
|
||||
assert Event.objects.which_references(Service.objects.get()).count() == 0
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert 'fc-state' not in app.cookies
|
||||
assert User.objects.count() == 1
|
||||
# check login for service=portail was registered
|
||||
assert Event.objects.which_references(Service.objects.get()).count() == 1
|
||||
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == 'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == 'Frédérique'
|
||||
assert path(response['Location']) == '/idp/'
|
||||
assert path(response.location) == '/idp/'
|
||||
assert hooks.event[1]['kwargs']['name'] == 'login'
|
||||
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
||||
# we must be connected
|
||||
|
@ -100,14 +120,13 @@ def test_create(settings, app, franceconnect, hooks):
|
|||
response = response.form.submit(name='unlink')
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
response = franceconnect.handle_logout(app, response.location)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
assert path(response.location) == '/accounts/'
|
||||
response = response.follow()
|
||||
assert 'The link with the FranceConnect account has been deleted' in response
|
||||
|
||||
|
||||
def test_create_expired(settings, app, franceconnect, hooks):
|
||||
# test direct creation failure on an expired id_token
|
||||
settings.A2_FC_CREATE = True
|
||||
franceconnect.exp = now() - datetime.timedelta(seconds=30)
|
||||
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
|
@ -120,7 +139,7 @@ def test_create_expired(settings, app, franceconnect, hooks):
|
|||
|
||||
def test_login_email_is_unique(settings, app, franceconnect, caplog):
|
||||
settings.A2_EMAIL_IS_UNIQUE = True
|
||||
user = User(email='john.doe@example.com', first_name='John', last_name='Doe')
|
||||
user = User(email='john.doe@example.com', first_name='John', last_name='Doe', ou=get_default_ou())
|
||||
user.set_password('toto')
|
||||
user.save()
|
||||
franceconnect.user_info['email'] = user.email
|
||||
|
@ -131,6 +150,19 @@ def test_login_email_is_unique(settings, app, franceconnect, caplog):
|
|||
assert app.session['_auth_user_id'] == str(user.pk)
|
||||
|
||||
|
||||
def test_link_after_login_with_password(app, franceconnect, simple_user):
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
|
||||
response = login(app, simple_user, path='/accounts/')
|
||||
response = response.click(href='/fc/callback/')
|
||||
|
||||
franceconnect.callback_params = {'next': '/accounts/'}
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
response = response.follow()
|
||||
assert response.pyquery('.fc').text() == 'Linked FranceConnect accounts\nŸuñe Frédérique Delete link'
|
||||
|
||||
|
||||
def test_unlink_after_login_with_password(app, franceconnect, simple_user):
|
||||
models.FcAccount.objects.create(user=simple_user, user_info='{}')
|
||||
|
||||
|
@ -240,7 +272,7 @@ def test_login_with_missing_required_attributes(settings, app, franceconnect):
|
|||
assert path(response.location) == '/accounts/edit/'
|
||||
assert User.objects.count() == 1
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
assert 'The following fields are mandatory for account creation: title' in app.cookies['messages']
|
||||
assert 'The following fields are mandatory for account creation: Title' in app.cookies['messages']
|
||||
|
||||
|
||||
def test_can_change_password(settings, app, franceconnect):
|
||||
|
@ -280,7 +312,7 @@ def test_can_change_password(settings, app, franceconnect):
|
|||
|
||||
|
||||
def test_invalid_next_url(app, franceconnect):
|
||||
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
|
||||
assert app.get('/fc/callback/?code=coin&state=JJJ72QQQ').location == '/'
|
||||
|
||||
|
||||
def test_manager_user_sidebar(app, superuser, simple_user):
|
||||
|
@ -296,7 +328,6 @@ def test_manager_user_sidebar(app, superuser, simple_user):
|
|||
|
||||
|
||||
def test_user_info_incomplete(settings, app, franceconnect):
|
||||
settings.A2_FC_CREATE = True
|
||||
franceconnect.user_info = {}
|
||||
franceconnect.login_with_fc_fixed_params(app)
|
||||
|
||||
|
@ -308,7 +339,6 @@ def test_user_info_incomplete(settings, app, franceconnect):
|
|||
|
||||
|
||||
def test_user_info_incomplete_already_linked(settings, app, franceconnect, simple_user):
|
||||
settings.A2_FC_CREATE = True
|
||||
user = User.objects.create()
|
||||
models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
|
||||
franceconnect.user_info = {}
|
||||
|
@ -347,3 +377,161 @@ def test_create_missing_email(settings, app, franceconnect, hooks):
|
|||
assert User.objects.count() == 1
|
||||
|
||||
response = app.get('/accounts/', status=200)
|
||||
|
||||
|
||||
def test_multiple_accounts_with_same_email(settings, app, franceconnect):
|
||||
ou = get_default_ou()
|
||||
ou.email_is_unique = True
|
||||
ou.save()
|
||||
|
||||
User.objects.create(email=franceconnect.user_info['email'], ou=ou)
|
||||
User.objects.create(email=franceconnect.user_info['email'], ou=ou)
|
||||
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/')
|
||||
response = response.follow()
|
||||
|
||||
assert 'is already used by another' in response
|
||||
|
||||
|
||||
def test_sub_with_order_0_is_used(app, db, rf):
|
||||
usera = User.objects.create(username='a')
|
||||
userb = User.objects.create(username='b')
|
||||
models.FcAccount.objects.create(user=usera, sub='1234', order=1)
|
||||
models.FcAccount.objects.create(user=userb, sub='1234', order=0)
|
||||
|
||||
assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == userb
|
||||
|
||||
|
||||
def test_inactive_raise_permission_denied(app, db, rf):
|
||||
usera = User.objects.create(is_active=False, username='a')
|
||||
models.FcAccount.objects.create(user=usera, sub='1234')
|
||||
|
||||
with pytest.raises(PermissionDenied):
|
||||
FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={})
|
||||
|
||||
|
||||
def test_order_1_is_returned(app, db, rf):
|
||||
usera = User.objects.create(username='a')
|
||||
models.FcAccount.objects.create(user=usera, sub='1234', order=1)
|
||||
|
||||
assert FcBackend().authenticate(rf.get('/'), sub='1234', token={}, user_info={}) == usera
|
||||
|
||||
|
||||
def test_resolve_authorization_code_http_400(app, franceconnect, caplog):
|
||||
franceconnect.token_endpoint_response = {
|
||||
'status_code': 400,
|
||||
'content': json.dumps({'error': 'invalid_request'}),
|
||||
}
|
||||
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
|
||||
assert 'invalid_request' in response
|
||||
|
||||
|
||||
def test_resolve_authorization_code_http_400_error_description(app, franceconnect, caplog):
|
||||
franceconnect.token_endpoint_response = {
|
||||
'status_code': 400,
|
||||
'content': json.dumps({'error': 'invalid_request', 'error_description': 'Requête invalide'}),
|
||||
}
|
||||
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
|
||||
assert 'invalid_request' not in response
|
||||
assert 'Requête invalide' in response
|
||||
|
||||
|
||||
def test_resolve_authorization_code_not_json(app, franceconnect, caplog):
|
||||
franceconnect.token_endpoint_response = 'not json'
|
||||
franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*resolve_authorization_code.*not JSON.*not json', caplog.text)
|
||||
|
||||
|
||||
def test_get_user_info_http_400(app, franceconnect, caplog):
|
||||
franceconnect.user_info_endpoint_response = {
|
||||
'status_code': 400,
|
||||
'content': json.dumps({'error': 'invalid_request'}),
|
||||
}
|
||||
|
||||
franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*invalid_request', caplog.text)
|
||||
|
||||
|
||||
def test_get_user_info_http_400_text_content(app, franceconnect, caplog):
|
||||
franceconnect.user_info_endpoint_response = {
|
||||
'status_code': 400,
|
||||
'content': 'coin',
|
||||
}
|
||||
franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*coin', caplog.text)
|
||||
|
||||
|
||||
def test_get_user_info_not_json(app, franceconnect, caplog):
|
||||
franceconnect.user_info_endpoint_response = {
|
||||
'status_code': 200,
|
||||
'content': 'coin',
|
||||
}
|
||||
franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert re.match(r'WARNING.*get_user_info.*not JSON.*coin', caplog.text)
|
||||
|
||||
|
||||
def test_fc_is_down(app, franceconnect, freezer, caplog):
|
||||
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
|
||||
|
||||
# first error -> warning
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert len(caplog.records) == 1
|
||||
assert caplog.records[-1].levelname == 'WARNING'
|
||||
assert 'Unable to connect to FranceConnect' in response
|
||||
|
||||
# second error, four minutes later -> warning
|
||||
freezer.move_to(datetime.timedelta(seconds=+240))
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert len(caplog.records) == 2
|
||||
assert caplog.records[-1].levelname == 'WARNING'
|
||||
assert 'Unable to connect to FranceConnect' in response
|
||||
|
||||
# after 5 minutes an error is logged
|
||||
freezer.move_to(datetime.timedelta(seconds=+240))
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert len(caplog.records) == 4
|
||||
assert caplog.records[-1].levelname == 'ERROR'
|
||||
assert 'Unable to connect to FranceConnect' in response
|
||||
|
||||
# but only every 5 minutes
|
||||
freezer.move_to(datetime.timedelta(seconds=+60))
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert len(caplog.records) == 5
|
||||
assert caplog.records[-1].levelname == 'WARNING'
|
||||
assert 'Unable to connect to FranceConnect' in response
|
||||
|
||||
# a success clear the down flag
|
||||
franceconnect.token_endpoint_response = None
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/')
|
||||
assert app.session['_auth_user_id']
|
||||
app.session.flush()
|
||||
assert len(caplog.records) == 7
|
||||
|
||||
# such that 5 minutes later only a warning is emitted
|
||||
freezer.move_to(datetime.timedelta(seconds=310))
|
||||
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/').follow()
|
||||
assert len(caplog.records) == 8
|
||||
assert caplog.records[-1].levelname == 'WARNING'
|
||||
assert 'Unable to connect to FranceConnect' in response
|
||||
|
||||
|
||||
def test_authorization_error(app, franceconnect):
|
||||
error = 'unauthorized'
|
||||
error_description = 'Vous n\'êtes pas autorisé à vous connecter.'
|
||||
|
||||
response = app.get(
|
||||
'/fc/callback/', params={'error': error, 'error_description': error_description, 'next': '/accounts/'}
|
||||
).maybe_follow()
|
||||
messages = response.pyquery('.messages').text()
|
||||
assert error not in messages
|
||||
assert error_description in messages
|
||||
|
||||
response = app.get('/fc/callback/', params={'error': error, 'next': '/accounts/'}).maybe_follow()
|
||||
messages = response.pyquery('.messages').text()
|
||||
assert error in messages
|
||||
assert error_description not in messages
|
||||
|
|
Loading…
Reference in New Issue