163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
# authentic2-auth-fc - authentic2 authentication for FranceConnect
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import contextlib
|
|
import datetime
|
|
import json
|
|
import urllib.parse as urlparse
|
|
import uuid
|
|
|
|
from jwcrypto import jwk, jwt
|
|
import httmock
|
|
import pytest
|
|
|
|
from django.http import QueryDict
|
|
from django.urls import reverse
|
|
from django.utils.http import urlencode
|
|
from django.utils.timezone import now
|
|
|
|
|
|
from authentic2.models import Service
|
|
from authentic2.utils import make_url
|
|
|
|
from ..utils import assert_equals_url
|
|
|
|
CLIENT_ID = 'xxx'
|
|
CLIENT_SECRET = 'yyy'
|
|
|
|
|
|
class FranceConnectMock:
|
|
exp = None
|
|
|
|
def __init__(self):
|
|
self.sub = '1234'
|
|
self.id_token = {
|
|
'aud': 'xxx',
|
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
|
}
|
|
self.user_info = {
|
|
'family_name': 'Frédérique',
|
|
'given_name': 'Ÿuñe',
|
|
'email': 'john.doe@example.com',
|
|
}
|
|
self.access_token = str(uuid.uuid4())
|
|
self.client_id = CLIENT_ID
|
|
self.client_secret = CLIENT_SECRET
|
|
self.scopes = {'openid', 'profile', 'email'}
|
|
self.callback_params = {'service': 'portail', 'next': '/idp/'}
|
|
|
|
def handle_authorization(self, app, url, **kwargs):
|
|
assert url.startswith('https://fcp.integ01')
|
|
parsed_url = urlparse.urlparse(url)
|
|
query = QueryDict(parsed_url.query)
|
|
assert_equals_url(query['redirect_uri'], self.callback_url)
|
|
assert query['client_id'] == self.client_id
|
|
assert set(query['scope'].split()) == self.scopes
|
|
assert query['state']
|
|
assert query['nonce']
|
|
assert query['response_type'] == 'code'
|
|
assert query['acr_values'] == 'eidas1'
|
|
self.state = query['state']
|
|
self.nonce = query['nonce']
|
|
self.code = str(uuid.uuid4().hex)
|
|
return app.get(
|
|
make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs)
|
|
|
|
@property
|
|
def callback_url(self):
|
|
return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
|
|
|
|
def login_with_fc_fixed_params(self, app):
|
|
if app.session:
|
|
app.session.flush()
|
|
response = app.get('/login/?' + urlencode(self.callback_params))
|
|
response = response.click(href='callback')
|
|
return self.handle_authorization(app, response.location, status=302)
|
|
|
|
def login_with_fc(self, app, path):
|
|
if app.session:
|
|
app.session.flush()
|
|
response = app.get(path)
|
|
self.callback_params = {k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()}
|
|
response = response.follow()
|
|
response = response.click(href='callback')
|
|
return self.handle_authorization(app, response.location, status=302).follow()
|
|
|
|
def access_token_response(self, url, request):
|
|
formdata = QueryDict(request.body)
|
|
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret',
|
|
'redirect_uri', 'grant_type'}
|
|
assert formdata['code'] == self.code
|
|
assert formdata['client_id'] == self.client_id
|
|
assert formdata['client_secret'] == self.client_secret
|
|
assert formdata['grant_type'] == 'authorization_code'
|
|
assert_equals_url(formdata['redirect_uri'], self.callback_url)
|
|
|
|
# make response
|
|
id_token = self.id_token.copy()
|
|
id_token.update({
|
|
'sub': self.sub,
|
|
'nonce': self.nonce,
|
|
'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()),
|
|
})
|
|
id_token.update(self.user_info)
|
|
return json.dumps({
|
|
'access_token': self.access_token,
|
|
'id_token': self.hmac_jwt(id_token, self.client_secret)
|
|
})
|
|
|
|
def hmac_jwt(self, payload, key):
|
|
header = {'alg': 'HS256'}
|
|
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii'))
|
|
t = jwt.JWT(header=header, claims=payload)
|
|
t.make_signed_token(k)
|
|
return t.serialize()
|
|
|
|
def user_info_response(self, url, request):
|
|
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
|
|
user_info = self.user_info.copy()
|
|
user_info['sub'] = self.sub
|
|
return json.dumps(user_info)
|
|
|
|
@contextlib.contextmanager
|
|
def __call__(self):
|
|
with httmock.HTTMock(
|
|
httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
|
|
httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response)):
|
|
yield None
|
|
|
|
def handle_logout(self, app, url):
|
|
assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
|
|
parsed_url = urlparse.urlparse(url)
|
|
query = QueryDict(parsed_url.query)
|
|
assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
|
|
assert query['state']
|
|
self.state = query['state']
|
|
return app.get(reverse('fc-logout') + '?state=' + self.state)
|
|
|
|
|
|
@pytest.fixture
|
|
def franceconnect(settings, service):
|
|
settings.A2_FC_ENABLE = True
|
|
settings.A2_FC_CLIENT_ID = CLIENT_ID
|
|
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
|
|
|
|
Service.objects.create(name='portail', slug='portail')
|
|
mock_object = FranceConnectMock()
|
|
with mock_object():
|
|
yield mock_object
|