This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-fc/tests/test_auth_fc.py

450 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import pytest
import re
import urlparse
import httmock
import mock
import json
import base64
from jwcrypto import jwk, jwt
import datetime
import requests
from django.core.urlresolvers import reverse
from django.contrib.auth import get_user_model
from django.utils.timezone import now
from authentic2.utils import timestamp_from_datetime
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
User = get_user_model()
def path(url):
return urlparse.urlparse(url).path
def path_and_query(url):
parsed = urlparse.urlparse(url)
return parsed.path + '?' + parsed.query
def get_links_from_mail(mail):
'''Extract links from mail sent by Django'''
return re.findall('https?://[^ \n]*', mail.body)
def hmac_jwt(payload, key):
header = {'alg': 'HS256'}
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')))
t = jwt.JWT(header=header, claims=payload)
t.make_signed_token(k)
return t.serialize()
def test_login_redirect(app, fc_settings):
url = reverse('fc-login-or-link')
response = app.get(url, status=302)
assert response['Location'].startswith('https://fcp.integ01')
def check_authorization_url(url):
callback = reverse('fc-login-or-link')
assert url.startswith('https://fcp.integ01')
query_string = url.split('?')[1]
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
assert 'redirect_uri' in parsed
assert callback in parsed['redirect_uri']
assert 'client_id' in parsed
assert parsed['client_id'] == 'xxx'
assert 'scope' in parsed
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'birth', 'email'])
assert 'state' in parsed
assert 'nonce' in parsed
assert parsed['state'] == parsed['nonce']
assert 'response_type' in parsed
assert parsed['response_type'] == 'code'
return parsed['state']
@pytest.mark.parametrize('exp', [timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
timestamp_from_datetime(now() - datetime.timedelta(seconds=1000))])
def test_login_simple(app, fc_settings, caplog, hooks, exp):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
})
callback = reverse('fc-login-or-link')
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
fc_settings.A2_FC_CREATE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
if exp < timestamp_from_datetime(now()):
assert User.objects.count() == 0
else:
assert User.objects.count() == 1
if User.objects.count():
assert path(response['Location']) == '/idp/'
assert hooks.event[1]['kwargs']['name'] == 'login'
assert hooks.event[1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert models.FcAccount.objects.count() == 1
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_login_email_is_unique(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
user.set_password('toto')
user.save()
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 1
assert app.session['_auth_user_id']
# logout, test unlinking when logging with password
app.session.flush()
response = app.get('/login/')
response.form.set('username', User.objects.get().email)
response.form.set('password', 'toto')
response = response.form.submit(name='login-password-submit').follow()
response = app.get('/accounts/')
response = response.click('Delete link')
assert 'new_password1' not in response.form.fields
response = response.form.submit(name='unlink').follow()
assert 'The link with the FranceConnect account has been deleted' in response.content
assert response.request.path == '/accounts/'
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
EMAIL = 'john.doe@example.com'
SUB = '1234'
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': SUB,
'aud': 'xxx',
'nonce': state,
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': EMAIL,
})
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
assert 'is already used' in str(response)
assert User.objects.count() == 1
assert '_auth_user_id' not in app.session
def test_requests_proxies_support(app, fc_settings, caplog):
session = requests_retry_session()
assert session.proxies == {}
other_session = requests.Session()
other_session.proxies = {'http': 'http://example.net'}
session = requests_retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = requests_retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
with mock.patch('authentic2_auth_fc.utils.requests.Session.send') as mocked_send:
mocked_send.return_value = mock.Mock(status_code=200, content='whatever')
session.get('https://example.net/')
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
def test_password_reset(app, mailoutbox):
user = User.objects.create(email='john.doe@example.com')
response = app.get('/login/')
response = response.click('Reset it!').maybe_follow()
response.form['email'] = user.email
assert len(mailoutbox) == 0
response = response.form.submit()
assert len(mailoutbox) == 1
url = get_links_from_mail(mailoutbox[0])[0]
app.get(url, status=302)
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
response = app.get(url)
assert 'new_password1' in response.form.fields
def test_registration1(app, fc_settings, caplog, hooks):
exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href="callback")
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/login/'
response = response.follow()
response = response.click('Create your account with FranceConnect')
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert path_and_query(response['Location']) == path_and_query(callback)
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_registration2(app, fc_settings, caplog, hooks):
exp = timestamp_from_datetime(now() + datetime.timedelta(seconds=1000))
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
assert callback in parsed['redirect_uri']
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': exp,
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/accounts/fc/register/'
response = response.follow()
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
# remove the registration parameter
callback = callback.replace('&registration=', '')
callback = callback.replace('?registration=', '?')
callback = callback.replace('?&', '?')
assert path_and_query(response['Location']) == path_and_query(callback)
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.content
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'