258 lines
10 KiB
Python
258 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
import pytest
|
|
import urlparse
|
|
import httmock
|
|
import mock
|
|
import json
|
|
import base64
|
|
from jwcrypto import jwk, jwt
|
|
import datetime
|
|
|
|
import requests
|
|
|
|
from django.core.urlresolvers import reverse
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils.timezone import now
|
|
|
|
from authentic2.utils import timestamp_from_datetime
|
|
|
|
from authentic2_auth_fc import models
|
|
from authentic2_auth_fc.utils import requests_retry_session
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
def hmac_jwt(payload, key):
|
|
header = {'alg': 'HS256'}
|
|
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')))
|
|
t = jwt.JWT(header=header, claims=payload)
|
|
t.make_signed_token(k)
|
|
return t.serialize()
|
|
|
|
|
|
def test_login_redirect(app, fc_settings):
|
|
url = reverse('fc-login-or-link')
|
|
response = app.get(url, status=302)
|
|
assert response['Location'].startswith('https://fcp.integ01')
|
|
|
|
|
|
def check_authorization_url(url):
|
|
callback = reverse('fc-login-or-link')
|
|
assert url.startswith('https://fcp.integ01')
|
|
query_string = url.split('?')[1]
|
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
|
|
assert 'redirect_uri' in parsed
|
|
assert callback in parsed['redirect_uri']
|
|
assert 'client_id' in parsed
|
|
assert parsed['client_id'] == 'xxx'
|
|
assert 'scope' in parsed
|
|
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'birth', 'email'])
|
|
assert 'state' in parsed
|
|
assert 'nonce' in parsed
|
|
assert parsed['state'] == parsed['nonce']
|
|
assert 'response_type' in parsed
|
|
assert parsed['response_type'] == 'code'
|
|
return parsed['state']
|
|
|
|
|
|
@pytest.mark.parametrize('exp', [timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
|
|
timestamp_from_datetime(now() - datetime.timedelta(seconds=1000))])
|
|
def test_login(app, fc_settings, caplog, exp):
|
|
callback = reverse('fc-login-or-link')
|
|
response = app.get(callback, status=302)
|
|
location = response['Location']
|
|
state = check_authorization_url(location)
|
|
|
|
@httmock.urlmatch(path=r'.*/token$')
|
|
def access_token_response(url, request):
|
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
|
'grant_type'])
|
|
assert parsed['code'] == 'zzz'
|
|
assert parsed['client_id'] == 'xxx'
|
|
assert parsed['client_secret'] == 'yyy'
|
|
assert parsed['grant_type'] == 'authorization_code'
|
|
assert callback in parsed['redirect_uri']
|
|
id_token = {
|
|
'sub': '1234',
|
|
'aud': 'xxx',
|
|
'nonce': state,
|
|
'exp': exp,
|
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
|
}
|
|
return json.dumps({
|
|
'access_token': 'uuu',
|
|
'id_token': hmac_jwt(id_token, 'yyy')
|
|
})
|
|
|
|
@httmock.urlmatch(path=r'.*userinfo$')
|
|
def user_info_response(url, request):
|
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
|
return json.dumps({
|
|
'sub': '1234',
|
|
'family_name': u'Frédérique',
|
|
'given_name': u'Ÿuñe',
|
|
})
|
|
|
|
with httmock.HTTMock(access_token_response, user_info_response):
|
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
assert User.objects.count() == 0
|
|
fc_settings.A2_FC_CREATE = True
|
|
with httmock.HTTMock(access_token_response, user_info_response):
|
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
if exp < timestamp_from_datetime(now()):
|
|
assert User.objects.count() == 0
|
|
else:
|
|
assert User.objects.count() == 1
|
|
if User.objects.count():
|
|
# we must be connected
|
|
assert app.session['_auth_user_id']
|
|
assert models.FcAccount.objects.count() == 1
|
|
# by default we set a random password on new users, so they can use the
|
|
# recover my password form
|
|
assert User.objects.get().has_usable_password()
|
|
response = app.get('/accounts/')
|
|
response = response.click('Delete link')
|
|
response.form.set('new_password1', 'ikKL1234')
|
|
response.form.set('new_password2', 'ikKL1234')
|
|
response = response.form.submit(name='unlink')
|
|
assert 'The link with the FranceConnect account has been deleted' in response.content
|
|
assert models.FcAccount.objects.count() == 0
|
|
continue_url = response.pyquery('a#a2-continue').attr['href']
|
|
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
|
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
|
response = app.get(reverse('fc-logout') + '?state=' + state)
|
|
assert response['Location'] == 'http://testserver/accounts/'
|
|
|
|
|
|
def test_login_email_is_unique(app, fc_settings, caplog):
|
|
callback = reverse('fc-login-or-link')
|
|
response = app.get(callback, status=302)
|
|
location = response['Location']
|
|
state = check_authorization_url(location)
|
|
|
|
@httmock.urlmatch(path=r'.*/token$')
|
|
def access_token_response(url, request):
|
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
|
'grant_type'])
|
|
assert parsed['code'] == 'zzz'
|
|
assert parsed['client_id'] == 'xxx'
|
|
assert parsed['client_secret'] == 'yyy'
|
|
assert parsed['grant_type'] == 'authorization_code'
|
|
assert callback in parsed['redirect_uri']
|
|
id_token = {
|
|
'sub': '1234',
|
|
'aud': 'xxx',
|
|
'nonce': state,
|
|
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
|
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
|
}
|
|
return json.dumps({
|
|
'access_token': 'uuu',
|
|
'id_token': hmac_jwt(id_token, 'yyy')
|
|
})
|
|
|
|
@httmock.urlmatch(path=r'.*userinfo$')
|
|
def user_info_response(url, request):
|
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
|
return json.dumps({
|
|
'sub': '1234',
|
|
'family_name': u'Frédérique',
|
|
'given_name': u'Ÿuñe',
|
|
'email': 'john.doe@example.com',
|
|
})
|
|
|
|
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
|
|
user.set_password('toto')
|
|
user.save()
|
|
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
|
with httmock.HTTMock(access_token_response, user_info_response):
|
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
assert User.objects.count() == 1
|
|
assert app.session['_auth_user_id']
|
|
|
|
# logout, test unlinking when logging with password
|
|
app.session.flush()
|
|
response = app.get('/login/')
|
|
response.form.set('username', User.objects.get().email)
|
|
response.form.set('password', 'toto')
|
|
response = response.form.submit(name='login-password-submit').follow()
|
|
|
|
response = app.get('/accounts/')
|
|
response = response.click('Delete link')
|
|
assert 'new_password1' not in response.form.fields
|
|
response = response.form.submit(name='unlink').follow()
|
|
assert 'The link with the FranceConnect account has been deleted' in response.content
|
|
assert response.request.path == '/accounts/'
|
|
|
|
|
|
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
|
|
callback = reverse('fc-login-or-link')
|
|
response = app.get(callback, status=302)
|
|
location = response['Location']
|
|
state = check_authorization_url(location)
|
|
|
|
EMAIL = 'john.doe@example.com'
|
|
SUB = '1234'
|
|
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
|
|
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
|
|
|
|
@httmock.urlmatch(path=r'.*/token$')
|
|
def access_token_response(url, request):
|
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
|
'grant_type'])
|
|
assert parsed['code'] == 'zzz'
|
|
assert parsed['client_id'] == 'xxx'
|
|
assert parsed['client_secret'] == 'yyy'
|
|
assert parsed['grant_type'] == 'authorization_code'
|
|
assert callback in parsed['redirect_uri']
|
|
id_token = {
|
|
'sub': SUB,
|
|
'aud': 'xxx',
|
|
'nonce': state,
|
|
'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
|
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
|
}
|
|
return json.dumps({
|
|
'access_token': 'uuu',
|
|
'id_token': hmac_jwt(id_token, 'yyy')
|
|
})
|
|
|
|
@httmock.urlmatch(path=r'.*userinfo$')
|
|
def user_info_response(url, request):
|
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
|
return json.dumps({
|
|
'sub': '1234',
|
|
'family_name': u'Frédérique',
|
|
'given_name': u'Ÿuñe',
|
|
'email': EMAIL,
|
|
})
|
|
|
|
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
|
with httmock.HTTMock(access_token_response, user_info_response):
|
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
assert 'is already used' in str(response)
|
|
assert User.objects.count() == 1
|
|
assert '_auth_user_id' not in app.session
|
|
|
|
|
|
def test_requests_proxies_support(app, fc_settings, caplog):
|
|
session = requests_retry_session()
|
|
assert session.proxies == {}
|
|
other_session = requests.Session()
|
|
other_session.proxies = {'http': 'http://example.net'}
|
|
session = requests_retry_session(session=other_session)
|
|
assert session is other_session
|
|
assert session.proxies == {'http': 'http://example.net'}
|
|
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
|
|
session = requests_retry_session()
|
|
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
|
|
|
|
with mock.patch('authentic2_auth_fc.utils.requests.Session.send') as mocked_send:
|
|
mocked_send.return_value = mock.Mock(status_code=200, content='whatever')
|
|
session.get('https://example.net/')
|
|
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
|