summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Dauvergne <bdauvergne@entrouvert.com>2019-06-11 10:02:31 (GMT)
committerBenjamin Dauvergne <bdauvergne@entrouvert.com>2019-07-29 22:30:30 (GMT)
commit6aeece1d25ac7f968d6e4be89710c7914d073be1 (patch)
treee666cc14533af4f2406b50115da23fc33c4da2ce
parenteb9f62b53557fb4a2c49725024b2992b8007a64a (diff)
downloadauthentic-6aeece1d25ac7f968d6e4be89710c7914d073be1.zip
authentic-6aeece1d25ac7f968d6e4be89710c7914d073be1.tar.gz
authentic-6aeece1d25ac7f968d6e4be89710c7914d073be1.tar.bz2
auth_fc: make request, token and user_info mandatory in backend (#33991)
-rw-r--r--src/authentic2_auth_fc/app_settings.py9
-rw-r--r--src/authentic2_auth_fc/backends.py151
-rw-r--r--src/authentic2_auth_fc/models.py4
-rw-r--r--src/authentic2_auth_fc/views.py187
-rw-r--r--tests/auth_fc/test_auth_fc.py95
5 files changed, 310 insertions, 136 deletions
diff --git a/src/authentic2_auth_fc/app_settings.py b/src/authentic2_auth_fc/app_settings.py
index a7b5cf0..220feb9 100644
--- a/src/authentic2_auth_fc/app_settings.py
+++ b/src/authentic2_auth_fc/app_settings.py
@@ -134,6 +134,15 @@ class AppSettings(object):
def popup(self):
return self._setting('POPUP', False)
+ @property
+ def link_by_email(self):
+ return self._setting('LINK_BY_EMAIL', True)
+
+ @property
+ def link_on_login(self):
+ return self._setting('LINK_ON_LOGIN', True)
+
+
app_settings = AppSettings('A2_FC_')
app_settings.__name__ = __name__
sys.modules[__name__] = app_settings
diff --git a/src/authentic2_auth_fc/backends.py b/src/authentic2_auth_fc/backends.py
index f873555..86b94ab 100644
--- a/src/authentic2_auth_fc/backends.py
+++ b/src/authentic2_auth_fc/backends.py
@@ -16,66 +16,139 @@
import json
import logging
+import mock
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
-from django.core.exceptions import PermissionDenied, MultipleObjectsReturned
-from django.db import IntegrityError
+from django.core.exceptions import MultipleObjectsReturned
+from django.utils import functional
from authentic2.a2_rbac.utils import get_default_ou
-from authentic2 import hooks
+from authentic2 import hooks, app_settings as a2_app_settings
from . import models, app_settings, utils
logger = logging.getLogger(__name__)
+User = get_user_model()
class FcBackend(ModelBackend):
- def authenticate(self, sub, **kwargs):
- user_info = kwargs.get('user_info')
- user = None
+ @functional.cached_property
+ def user_ou(self):
+ return get_default_ou()
+
+ @functional.cached_property
+ def email_uniqueness_qs(self):
+ if a2_app_settings.A2_EMAIL_IS_UNIQUE:
+ return User.objects.all()
+ if self.user_ou.email_is_unique:
+ return User.objects.filter(ou=self.user_ou)
+ return None
+
+ def authenticate(self, request, sub, user_info, token):
+ account = self._lookup_user(sub=sub, user_info=user_info, token=token)
+
+ if not account and app_settings.link_by_email:
+ account = self._link_by_email(request, sub=sub, user_info=user_info, token=token)
+
+ if not account and app_settings.create:
+ # How knowing the mail of the future user ? Apply attribute mapping on a mock object !
+ mocked_user = mock.Mock()
+ utils.apply_user_info_mappings(mocked_user, user_info)
+ email = mocked_user.email if not isinstance(mocked_user.email, mock.Mock) else None
+ account = self._create_user(request, sub=sub, user_info=user_info, token=token, email=email)
+
+ if not account:
+ return None
+
+ utils.apply_user_info_mappings(account.user, user_info)
+
+ account.user.fc_account = account
+ return account.user
+
+ def _lookup_user(self, sub, user_info, token):
try:
try:
account = models.FcAccount.objects.select_related().get(sub=sub)
except MultipleObjectsReturned:
account = models.FcAccount.objects.select_related().get(sub=sub, order=0)
except models.FcAccount.DoesNotExist:
- logger.debug(u'user with the sub %s does not exist.', sub)
+ logger.debug(u'auth_fc: no user with sub %s.', sub)
+ return None
else:
- user = account.user
- logger.debug(u'found user %s with sub %s', user, sub)
- if not user.is_active:
- logger.info(u'user %s login refused, it is inactive', user)
- raise PermissionDenied
- if user_info:
- if not user and app_settings.create:
- User = get_user_model()
- user = User.objects.create(ou=get_default_ou())
- try:
- models.FcAccount.objects.create(
- user=user,
- sub=sub,
- order=0,
- token=json.dumps(kwargs['token']))
- except IntegrityError:
- # uniqueness check failed, as the user is new, it can only means that the sub is not unique
- # let's try again
- user.delete()
- return self.authenticate(sub, **kwargs)
- else:
- logger.debug(u'user creation enabled with fc_account (sub : %s - token : %s)',
- sub, json.dumps(kwargs['token']))
- hooks.call_hooks('event', name='fc-create', user=user, sub=sub)
-
- if not user:
+ if not account.user.is_active:
+ logger.info(u'auth_fc: refused authenticating user %s with sub %s, it is inactive', account.user, sub)
+ return None
+ if account.get_user_info() != user_info or account.get_token() != token:
+ account.user_info = json.dumps(user_info)
+ account.token = json.dumps(token)
+ account.save(update_fields=['token', 'user_info'])
+ logger.debug(u'auth_fc: authenticating existing user %s with sub %s', account.user, sub)
+ return account
+
+ def _link_by_email(self, request, sub, user_info, token):
+ if not user_info.get('email'):
+ logger.debug('auth_fc: no email, no link by email')
+ return None
+
+ email = user_info['email']
+ qs = User.objects.filter(email__iexact=email)
+
+ if not a2_app_settings.A2_EMAIL_IS_UNIQUE:
+ qs = qs.filter(ou=self.user_ou)
+
+ count = len(qs)
+ if count == 0:
+ logger.debug('auth_fc: no account found with email %s', email)
+ return None
+ if count > 1:
+ logger.debug('auth_fc: too many accounts with email %s', email)
+ return None
+ user = qs[0]
+
+ account, created = models.FcAccount.objects.get_or_create(
+ user=user, defaults={'user_info': json.dumps(user_info), 'token': json.dumps(token), 'sub': sub})
+
+ if not created:
+ # concurrent creation of the same link, retry normal lookup
+ if account.sub == sub:
+ return account
+ else:
+ logger.debug('auth_fc: account with email %s is already linked to sub %s', email, account.sub)
return None
- logger.debug(u'updated (given_name : %s - family_name : %s)', user_info['given_name'],
- user_info['family_name'])
- user.first_name = user_info['given_name']
- user.last_name = user_info['family_name']
- utils.apply_user_info_mappings(user, user_info)
- return user
+ logger.info('auth_fc: account with email %s linked by email with sub %s', email, sub)
+ hooks.call_hooks('event', name='fc-link', user=user, sub=sub, request=request, fc_account=account)
+ return account
+
+ def _create_user(self, request, sub, user_info, token, email):
+ # we cannot create an user with the same email, stop.
+ if self.email_uniqueness_qs is not None and self.email_uniqueness_qs.filter(email__iexact=email).count() > 0:
+ return None
+ user = User.objects.create(ou=self.user_ou, email=email or '')
+ # we still should not... but we do not have a unique index on "email"
+ if self.email_uniqueness_qs is not None and self.email_uniqueness_qs.filter(email__iexact=email).count() > 1:
+ user.delete()
+ return None
+
+ utils.apply_user_info_mappings(user, user_info)
+ account, created = models.FcAccount.objects.get_or_create(
+ sub=sub,
+ defaults={
+ 'user': user,
+ 'user_info': json.dumps(user_info),
+ 'token': json.dumps(token),
+ })
+
+ # link with the same sub already exists, retry
+ if not created:
+ logger.warning('auth_fc: sub %s is already linked to an user, cannot create', sub)
+ return None
+
+ logger.info(u'auth_fc: automatically created new user %s for sub %s', user, sub)
+
+ hooks.call_hooks('event', name='fc-create', user=user, sub=sub, request=request, fc_account=account)
+ return account
def get_saml2_authn_context(self):
import lasso
diff --git a/src/authentic2_auth_fc/models.py b/src/authentic2_auth_fc/models.py
index e34a8b0..2ce0990 100644
--- a/src/authentic2_auth_fc/models.py
+++ b/src/authentic2_auth_fc/models.py
@@ -98,10 +98,10 @@ class FcAccount(models.Model):
return parse_id_token(self.get_token()['id_token'])
def get_token(self):
- return json.loads(self.token)
+ return self.token and json.loads(self.token)
def get_user_info(self):
- return json.loads(self.user_info)
+ return self.user_info and json.loads(self.user_info)
def __unicode__(self):
user_info = self.get_user_info()
diff --git a/src/authentic2_auth_fc/views.py b/src/authentic2_auth_fc/views.py
index f154a72..577ea4c 100644
--- a/src/authentic2_auth_fc/views.py
+++ b/src/authentic2_auth_fc/views.py
@@ -50,6 +50,8 @@ from authentic2.forms.passwords import SetPasswordForm
from . import app_settings, models, utils
+User = get_user_model()
+
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
@@ -353,12 +355,32 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
error message.
'''
- def update_user_info(self):
- self.fc_account.token = json.dumps(self.token)
- self.fc_account.user_info = json.dumps(self.user_info)
- self.fc_account.save(update_fields=['token', 'user_info'])
- utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
- self.logger.debug('updating user_info %s', self.fc_account.user_info)
+ def update_user_info(self, fc_account):
+ if fc_account.get_token() != self.token or fc_account.get_user_info() != self.user_info:
+ fc_account.user_info = json.dumps(self.user_info)
+ fc_account.token = json.dumps(self.token)
+ fc_account.save(update_fields=['token', 'user_info'])
+ self.logger.debug('applying user info mapping %s to user %s', self.user_info, fc_account.user)
+ utils.apply_user_info_mappings(fc_account.user, self.user_info)
+
+ def link_existing_user(self, request):
+ try:
+ fc_account, created = models.FcAccount.objects.get_or_create(
+ sub=self.sub, user=request.user, order=0,
+ defaults={'token': json.dumps(self.token)})
+ except IntegrityError:
+ # unique index check failed, find why.
+ return self.uniqueness_check_failed(request)
+
+ if created:
+ self.logger.info('fc link created sub %s', self.sub)
+ messages.info(request,
+ _('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
+ hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
+ else:
+ messages.info(request, _('Your local account has been updated.'))
+ self.update_user_info(fc_account)
+ return self.redirect(request)
def uniqueness_check_failed(self, request):
if models.FcAccount.objects.filter(user=request.user, order=0).count():
@@ -371,101 +393,82 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
return self.redirect(request)
def get(self, request, *args, **kwargs):
- registration = True if 'registration' in request.GET else False
'''Request an access grant code and associate it to the current user'''
+
self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
+
if request.user.is_authenticated():
- try:
- self.fc_account, created = models.FcAccount.objects.get_or_create(
- sub=self.sub, user=request.user, order=0,
- defaults={'token': json.dumps(self.token)})
- except IntegrityError:
- # unique index check failed, find why.
- return self.uniqueness_check_failed(request)
-
- if created:
- self.logger.info('fc link created sub %s', self.sub)
- messages.info(request,
- _('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
- hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
- else:
- messages.info(request, _('Your local account has been updated.'))
- self.update_user_info()
+ return self.link_existing_user(request)
+
+ user = a2_utils.authenticate(request, sub=self.sub, user_info=self.user_info, token=self.token)
+
+ if not user:
+ return self.authentication_failed(request)
+ else:
+ # normal login
+ a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
+ self.logger.debug('logged in using fc sub %s', self.sub)
return self.redirect(request)
- default_ou = get_default_ou()
- email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
- user = a2_utils.authenticate(
- request,
- sub=self.sub,
- user_info=self.user_info,
- token=self.token)
- if not user and self.user_info.get('email') and email_is_unique:
- email = self.user_info['email']
- User = get_user_model()
- qs = User.objects.filter(email__iexact=email)
- if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
- qs = qs.filter(ou=default_ou)
-
- if qs.exists():
- # there should not be multiple accounts with the same mail
- if len(qs) > 1:
- self.logger.error(u'multiple accounts with the same mail %s, %s', email,
- list(qs))
- # ok we have one account
- elif len(qs) == 1:
- user = qs[0]
- # but does he have already a link to an FC account ?
- if not user.fc_accounts.exists():
- try:
- self.fc_account, created = models.FcAccount.objects.get_or_create(
- defaults={'token': json.dumps(self.token)},
- sub=self.sub, user=user, order=0)
- except IntegrityError:
- # unique index check failed, find why.
- return self.uniqueness_check_failed(request)
-
- if created:
- self.logger.info(u'fc link created sub %s user %s', self.sub, user)
- hooks.call_hooks('event', name='fc-link', user=user, sub=self.sub,
- request=request)
- user = a2_utils.authenticate(
- request=request,
- sub=self.sub,
- user_info=self.user_info,
- token=self.token)
+ def authentication_failed(self, request):
+ # show message for special case of link by email of an already used
+ # email and auto-creation should be possible
+ email = self.user_info.get('email')
+ if email:
+ if a2_app_settings.A2_EMAIL_IS_UNIQUE:
+ users_email_qs = User.objects.filter(email__iexact=email)
+ else:
+ users_email_qs = User.objects.filter(email__iexact=email, ou=get_default_ou())
+ users_count = users_email_qs.count()
+
+ if users_count:
+ if app_settings.link_by_email:
+ if users_count > 1:
+ messages.warning(
+ request,
+ _('There are too many accounts with email "%s" we cannot link them automatically.')
+ % email)
else:
messages.warning(
request,
- _('Your FranceConnect email address \'%s\' is already used by another '
- 'account, so we cannot create an account for you. Please create an '
- 'account with another email address then link it to FranceConnect '
- 'using your account management page.') % email)
- return self.redirect(request)
- if user:
- a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
- self.update_user_info()
- self.logger.info('logged in using fc sub %s', self.sub)
- return self.redirect(request)
+ _('Your email "%s" is already linked to another FranceConnect account,'
+ ' we cannot link it automatically.')
+ % email)
+ if app_settings.create:
+ messages.warning(
+ request,
+ _('Your FranceConnect email address \'%s\' is already used by another '
+ 'account, so we cannot create an account for you. Please create an '
+ 'account with another email address then link it to FranceConnect '
+ 'using your account management page.') % email)
+ if 'registration' in request.GET or not app_settings.link_on_login:
+ return self.register(request)
else:
- params = {}
- if self.service_slug:
- params[constants.SERVICE_FIELD_NAME] = self.service_slug
- if registration:
- return self.redirect_and_come_back(request,
- a2_utils.make_url('fc-registration',
- params=params),
- params=params)
- else:
- messages.info(request, _('If you already have an account, please log in, else '
- 'create your account.'))
-
- login_params = params.copy()
- if not app_settings.show_button_quick_account_creation:
- login_params['nofc'] = 1
-
- login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
- return self.redirect_and_come_back(request, login_url, params=params)
+ return self.link_on_login(request)
+
+ def link_on_login(self, request):
+ params = {}
+ if self.service_slug:
+ params[constants.SERVICE_FIELD_NAME] = self.service_slug
+ messages.info(request, _('If you already have an account, please log in, else '
+ 'create your account.'))
+
+ login_params = params.copy()
+ if not app_settings.show_button_quick_account_creation:
+ login_params['nofc'] = 1
+
+ login_url = a2_utils.make_url(settings.LOGIN_URL, params=login_params)
+ return self.redirect_and_come_back(request, login_url, params=params)
+
+ def register(self, request):
+ params = {}
+ if self.service_slug:
+ params[constants.SERVICE_FIELD_NAME] = self.service_slug
+
+ return self.redirect_and_come_back(request,
+ a2_utils.make_url('fc-registration',
+ params=params),
+ params=params)
class RegistrationView(PopupViewMixin, LoggerMixin, View):
diff --git a/tests/auth_fc/test_auth_fc.py b/tests/auth_fc/test_auth_fc.py
index bed15eb..fcd0e92 100644
--- a/tests/auth_fc/test_auth_fc.py
+++ b/tests/auth_fc/test_auth_fc.py
@@ -270,7 +270,7 @@ def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
- assert 'is already used' in str(response)
+ assert 'is already linked' in str(response)
assert User.objects.count() == 1
assert '_auth_user_id' not in app.session
@@ -487,6 +487,8 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
location = response['Location']
state = check_authorization_url(location)
+ sub = '1234'
+
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
@@ -498,7 +500,7 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
- 'sub': '1234',
+ 'sub': sub,
'aud': 'xxx',
'nonce': state,
'exp': exp,
@@ -514,7 +516,7 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
- 'sub': '1234',
+ 'sub': sub,
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
@@ -577,6 +579,28 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
response = response.follow()
assert len(response.pyquery('[href*="password/change"]')) == 0
+ # Try to relink
+ response = app.get('/fc/callback/?next=/accounts/')
+ location = response['Location']
+ state = check_authorization_url(location)
+ callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
+ with httmock.HTTMock(access_token_response, user_info_response):
+ response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
+ response = response.follow()
+ assert 'Your local account has been updated' in response.text
+
+ # Try to relink with another sub
+ response = app.get('/fc/callback/?next=/accounts/')
+ location = response['Location']
+ state = check_authorization_url(location)
+ callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
+ sub = 'xyz'
+ with httmock.HTTMock(access_token_response, user_info_response):
+ response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
+ response = response.follow()
+ assert 'Your account is already linked to a FranceConnect account' in response.text
+ sub = '1234'
+
# Unlink
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
@@ -617,3 +641,68 @@ def test_migration_0002(migration):
FcAccount = new_apps.get_model('authentic2_auth_fc', 'FcAccount')
assert len(set(FcAccount.objects.values_list('user_id', 'order'))) == 5
assert len(set(FcAccount.objects.values_list('sub', 'order'))) == 5
+
+
+def test_link_with_already_linked_sub(app, simple_user, fc_settings, caplog, hooks):
+ sub = '1234'
+
+ other_user = User.objects.create()
+ models.FcAccount.objects.create(
+ user=other_user,
+ sub='1234',
+ user_info='',
+ order=0,
+ token='')
+
+ response = app.get('/accounts/').follow()
+ response.form.set('username', simple_user.username)
+ response.form.set('password', simple_user.username)
+ response = response.form.submit(name='login-password-submit').follow()
+ response = response.click('Link')
+
+ exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
+ # 1. Try a login
+ # 2. Verify we come back to login page
+ # 3. Check presence of registration link
+ # 4. Follow it
+ location = response['Location']
+ state = check_authorization_url(location)
+
+ @httmock.urlmatch(path=r'.*/token$')
+ def access_token_response(url, request):
+ parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
+ assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
+ 'grant_type'])
+ assert parsed['code'] == 'zzz'
+ assert parsed['client_id'] == 'xxx'
+ assert parsed['client_secret'] == 'yyy'
+ assert parsed['grant_type'] == 'authorization_code'
+ assert callback in parsed['redirect_uri']
+ id_token = {
+ 'sub': sub,
+ 'aud': 'xxx',
+ 'nonce': state,
+ 'exp': exp,
+ 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
+ 'email': 'john.doe@example.com',
+ }
+ return json.dumps({
+ 'access_token': 'uuu',
+ 'id_token': hmac_jwt(id_token, 'yyy')
+ })
+
+ @httmock.urlmatch(path=r'.*userinfo$')
+ def user_info_response(url, request):
+ assert request.headers['Authorization'] == 'Bearer uuu'
+ return json.dumps({
+ 'sub': sub,
+ 'family_name': u'Frédérique',
+ 'given_name': u'Ÿuñe',
+ 'email': 'john.doe@example.com',
+ })
+
+ callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
+ with httmock.HTTMock(access_token_response, user_info_response):
+ response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
+ response = response.follow()
+ assert 'is already linked with another account.' in response.text