This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-fc/tests/test_auth_fc.py

591 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import re
import urlparse
import httmock
import mock
import json
import base64
from jwcrypto import jwk, jwt
import datetime
import requests
from django.core.urlresolvers import reverse
from django.contrib.auth import get_user_model
from django.utils.timezone import now
from authentic2.utils import timestamp_from_datetime
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
User = get_user_model()
def path(url):
return urlparse.urlparse(url).path
def path_and_query(url):
parsed = urlparse.urlparse(url)
return parsed.path + '?' + parsed.query
def get_links_from_mail(mail):
'''Extract links from mail sent by Django'''
return re.findall('https?://[^ \n]*', mail.body)
def hmac_jwt(payload, key):
header = {'alg': 'HS256'}
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')))
t = jwt.JWT(header=header, claims=payload)
t.make_signed_token(k)
return t.serialize()
def test_login_redirect(app, fc_settings):
url = reverse('fc-login-or-link')
response = app.get(url, status=302)
assert response['Location'].startswith('https://fcp.integ01')
def check_authorization_url(url):
callback = reverse('fc-login-or-link')
assert url.startswith('https://fcp.integ01')
query_string = url.split('?')[1]
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
assert 'redirect_uri' in parsed
assert callback in parsed['redirect_uri']
assert 'client_id' in parsed
assert parsed['client_id'] == 'xxx'
assert 'scope' in parsed
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'birth', 'email'])
assert 'state' in parsed
assert 'nonce' in parsed
assert parsed['state'] == parsed['nonce']
assert 'response_type' in parsed
assert parsed['response_type'] == 'code'
return parsed['state']
@pytest.mark.parametrize('exp', [timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
timestamp_from_datetime(now() - datetime.timedelta(seconds=1000))])
def test_login_simple(app, fc_settings, caplog, hooks, exp):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
})
callback = reverse('fc-login-or-link')
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
fc_settings.A2_FC_CREATE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
if exp < timestamp_from_datetime(now()):
assert User.objects.count() == 0
else:
assert User.objects.count() == 1
if User.objects.count():
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
assert path(response['Location']) == '/idp/'
assert hooks.event[1]['kwargs']['name'] == 'login'
assert hooks.event[1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert models.FcAccount.objects.count() == 1
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_login_email_is_unique(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'jOhn.dOe@eXample.com',
})
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
user.set_password('toto')
user.save()
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 1
assert app.session['_auth_user_id']
# logout, test unlinking when logging with password
app.session.flush()
response = app.get('/login/')
response.form.set('username', User.objects.get().email)
response.form.set('password', 'toto')
response = response.form.submit(name='login-password-submit').follow()
response = app.get('/accounts/')
response = response.click('Delete link')
assert 'new_password1' not in response.form.fields
response = response.form.submit(name='unlink').follow()
assert 'The link with the FranceConnect account has been deleted' in response.content
assert response.request.path == '/accounts/'
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
EMAIL = 'john.doe@example.com'
SUB = '1234'
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': SUB,
'aud': 'xxx',
'nonce': state,
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': EMAIL,
})
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
assert 'is already used' in str(response)
assert User.objects.count() == 1
assert '_auth_user_id' not in app.session
def test_requests_proxies_support(app, fc_settings, caplog):
session = requests_retry_session()
assert session.proxies == {}
other_session = requests.Session()
other_session.proxies = {'http': 'http://example.net'}
session = requests_retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = requests_retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
with mock.patch('authentic2_auth_fc.utils.requests.Session.send') as mocked_send:
mocked_send.return_value = mock.Mock(status_code=200, content='whatever')
session.get('https://example.net/')
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
def test_password_reset(app, mailoutbox):
user = User.objects.create(email='john.doe@example.com')
response = app.get('/login/')
response = response.click('Reset it!').maybe_follow()
response.form['email'] = user.email
assert len(mailoutbox) == 0
response = response.form.submit()
assert len(mailoutbox) == 1
url = get_links_from_mail(mailoutbox[0])[0]
app.get(url, status=302)
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
response = app.get(url)
assert 'new_password1' in response.form.fields
def test_registration1(app, fc_settings, caplog, hooks):
exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href="callback")
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/login/'
response = response.follow()
response = response.click(href='/accounts/fc/register')
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert path_and_query(response['Location']) == path_and_query(callback)
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_registration2(app, fc_settings, caplog, hooks):
exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/accounts/fc/register/'
response = response.follow()
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
# remove the registration parameter
callback = callback.replace('&registration=', '')
callback = callback.replace('?registration=', '?')
callback = callback.replace('?&', '?')
assert path_and_query(response['Location']) == path_and_query(callback)
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_can_change_password(app, fc_settings, caplog, hooks):
exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/accounts/fc/register/'
response = response.follow()
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
# remove the registration parameter
callback = callback.replace('&registration=', '')
callback = callback.replace('?registration=', '?')
callback = callback.replace('?&', '?')
assert path_and_query(response['Location']) == path_and_query(callback)
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
response = app.get('/accounts/')
assert len(response.pyquery('[href*="password/change"]')) == 0
# Login with password
user = User.objects.get()
user.set_password('test')
user.save()
app.session.flush()
response = app.get('/login/')
response.form.set('username', User.objects.get().email)
response.form.set('password', 'test')
response = response.form.submit(name='login-password-submit').follow()
response = app.get('/accounts/')
assert len(response.pyquery('[href*="password/change"]')) > 0
# Relogin with FC
app.session.flush()
response = app.get('/login/?service=portail&next=/accounts/')
response = response.click(href='callback')
location = response['Location']
state = check_authorization_url(location)
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
# we must be connected
assert app.session['_auth_user_id']
assert path(response['Location']) == '/accounts/'
response = response.follow()
assert len(response.pyquery('[href*="password/change"]')) == 0
# Unlink
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
response = response.follow()
assert len(response.pyquery('[href*="password/change"]')) > 0