authentic/tests/auth_fc/conftest.py

172 lines
6.2 KiB
Python

# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import contextlib
import datetime
import json
import urllib.parse
import uuid
import httmock
import pytest
from django.http import QueryDict
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.timezone import now
from jwcrypto import jwk, jwt
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Service
from authentic2.utils import make_url
from ..utils import assert_equals_url
CLIENT_ID = 'xxx'
CLIENT_SECRET = 'yyy'
class FranceConnectMock:
exp = None
token_endpoint_response = None
user_info_endpoint_response = None
def __init__(self):
self.sub = '1234'
self.id_token = {
'aud': 'xxx',
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
self.user_info = {
'family_name': 'Frédérique',
'given_name': 'Ÿuñe',
'email': 'john.doe@example.com',
}
self.access_token = str(uuid.uuid4())
self.client_id = CLIENT_ID
self.client_secret = CLIENT_SECRET
self.scopes = {'openid', 'profile', 'email'}
self.callback_params = {'service': 'portail', 'next': '/idp/'}
def handle_authorization(self, app, url, **kwargs):
assert url.startswith('https://fcp.integ01')
parsed_url = urllib.parse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['redirect_uri'], self.callback_url)
assert query['client_id'] == self.client_id
assert set(query['scope'].split()) == self.scopes
assert query['state']
assert query['nonce']
assert query['response_type'] == 'code'
assert query['acr_values'] == 'eidas1'
self.state = query['state']
self.nonce = query['nonce']
self.code = str(uuid.uuid4().hex)
return app.get(make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs)
@property
def callback_url(self):
return 'http://testserver' + reverse('fc-login-or-link')
def login_with_fc_fixed_params(self, app):
if app.session:
app.session.flush()
response = app.get('/login/?' + urlencode(self.callback_params))
response = response.click(href='callback')
return self.handle_authorization(app, response.location, status=302)
def login_with_fc(self, app, path):
if app.session:
app.session.flush()
response = app.get(path)
self.callback_params = {
k: v for k, v in QueryDict(urllib.parse.urlparse(response.location).query).items()
}
response = response.follow()
response = response.click(href='callback')
return self.handle_authorization(app, response.location, status=302).follow()
def access_token_response(self, url, request):
if self.token_endpoint_response:
return self.token_endpoint_response
formdata = QueryDict(request.body)
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret', 'redirect_uri', 'grant_type'}
assert formdata['code'] == self.code
assert formdata['client_id'] == self.client_id
assert formdata['client_secret'] == self.client_secret
assert formdata['grant_type'] == 'authorization_code'
assert_equals_url(formdata['redirect_uri'], self.callback_url)
# make response
id_token = self.id_token.copy()
id_token.update(
{
'sub': self.sub,
'nonce': self.nonce,
'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()),
}
)
id_token.update(self.user_info)
return json.dumps(
{'access_token': self.access_token, 'id_token': self.hmac_jwt(id_token, self.client_secret)}
)
def hmac_jwt(self, payload, key):
header = {'alg': 'HS256'}
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii'))
t = jwt.JWT(header=header, claims=payload)
t.make_signed_token(k)
return t.serialize()
def user_info_response(self, url, request):
if self.user_info_endpoint_response:
return self.user_info_endpoint_response
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
user_info = self.user_info.copy()
user_info['sub'] = self.sub
return json.dumps(user_info)
@contextlib.contextmanager
def __call__(self):
with httmock.HTTMock(
httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response),
):
yield None
def handle_logout(self, app, url):
assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
parsed_url = urllib.parse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
assert query['state']
self.state = query['state']
return app.get(reverse('fc-logout') + '?state=' + self.state)
@pytest.fixture
def franceconnect(settings, db):
settings.A2_FC_ENABLE = True
settings.A2_FC_CLIENT_ID = CLIENT_ID
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
Service.objects.create(name='portail', slug='portail', ou=get_default_ou())
mock_object = FranceConnectMock()
with mock_object():
yield mock_object