tests: replace httmock by responses in auth_fc tests (#85701)
gitea/authentic/pipeline/head This commit looks good Details

This commit is contained in:
Yann Weber 2024-02-29 11:12:22 +01:00 committed by Yann Weber
parent 2d9a56b43e
commit 6b5c57ef9c
2 changed files with 47 additions and 33 deletions

View File

@ -18,11 +18,12 @@ import base64
import contextlib
import datetime
import json
import re
import urllib.parse
import uuid
import httmock
import pytest
import responses
from django.http import QueryDict
from django.urls import reverse
from django.utils.http import urlencode
@ -100,7 +101,7 @@ class FranceConnectMock:
response = response.click(href='callback')
return self.handle_authorization(app, response.location, status=302)
def access_token_response(self, url, request):
def access_token_response(self, request):
if self.token_endpoint_response:
return self.token_endpoint_response
@ -122,8 +123,12 @@ class FranceConnectMock:
}
)
id_token.update(self.user_info)
return json.dumps(
{'access_token': self.access_token, 'id_token': self.hmac_jwt(id_token, self.client_secret)}
return (
200,
{'Content-Type': 'application/json'},
json.dumps(
{'access_token': self.access_token, 'id_token': self.hmac_jwt(id_token, self.client_secret)}
),
)
def hmac_jwt(self, payload, key):
@ -133,21 +138,25 @@ class FranceConnectMock:
t.make_signed_token(k)
return t.serialize()
def user_info_response(self, url, request):
def user_info_response(self, request):
if self.user_info_endpoint_response:
return self.user_info_endpoint_response
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
user_info = self.user_info.copy()
user_info['sub'] = self.sub
return json.dumps(user_info)
return (200, {'Content-Type': 'application/json'}, json.dumps(user_info))
@contextlib.contextmanager
def __call__(self):
with httmock.HTTMock(
httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response),
):
with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps:
for method in ('GET', 'POST'):
rsps.add_callback(
method, url=re.compile(r'.*/token(\?.*)?$'), callback=self.access_token_response
)
rsps.add_callback(
method, url=re.compile(r'.*userinfo(\?.*)?$'), callback=self.user_info_response
)
yield None
def handle_logout(self, app, url):

View File

@ -507,10 +507,11 @@ def test_order_1_is_returned(app, db, rf):
def test_resolve_authorization_code_http_400(app, franceconnect, caplog):
franceconnect.token_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request'}),
}
franceconnect.token_endpoint_response = (
400,
{'Content-Type': 'application/json'},
json.dumps({'error': 'invalid_request'}),
)
response = franceconnect.login_with_fc(app, path='/accounts/')
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
@ -520,10 +521,11 @@ def test_resolve_authorization_code_http_400(app, franceconnect, caplog):
def test_resolve_authorization_code_http_400_error_description(app, franceconnect, caplog):
franceconnect.token_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request', 'error_description': 'Requête invalide'}),
}
franceconnect.token_endpoint_response = (
400,
{'Content-Type': 'application/json'},
json.dumps({'error': 'invalid_request', 'error_description': 'Requête invalide'}),
)
response = franceconnect.login_with_fc(app, path='/accounts/')
assert re.match(r'WARNING.*token request failed.*invalid_request', caplog.text)
@ -534,41 +536,44 @@ def test_resolve_authorization_code_http_400_error_description(app, franceconnec
def test_resolve_authorization_code_not_json(app, franceconnect, caplog):
franceconnect.token_endpoint_response = 'not json'
franceconnect.token_endpoint_response = (200, {}, 'not json')
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*resolve_authorization_code.*not JSON.*not json', caplog.text)
def test_get_user_info_http_400(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 400,
'content': json.dumps({'error': 'invalid_request'}),
}
franceconnect.user_info_endpoint_response = (
400,
{'Content-Type': 'application/json'},
json.dumps({'error': 'invalid_request'}),
)
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*invalid_request', caplog.text)
def test_get_user_info_http_400_text_content(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 400,
'content': 'coin',
}
franceconnect.user_info_endpoint_response = (
400,
{},
'coin',
)
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*is not 200.*status_code=400.*coin', caplog.text)
def test_get_user_info_not_json(app, franceconnect, caplog):
franceconnect.user_info_endpoint_response = {
'status_code': 200,
'content': 'coin',
}
franceconnect.user_info_endpoint_response = (
200,
{},
'coin',
)
franceconnect.login_with_fc(app, path='/accounts/').follow()
assert re.match(r'WARNING.*get_user_info.*not JSON.*coin', caplog.text)
def test_fc_is_down(app, franceconnect, freezer, caplog):
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
franceconnect.token_endpoint_response = (500, {}, 'Internal server error')
# first error -> warning
response = franceconnect.login_with_fc(app, path='/accounts/')
@ -610,7 +615,7 @@ def test_fc_is_down(app, franceconnect, freezer, caplog):
# such that 5 minutes later only a warning is emitted
freezer.move_to(datetime.timedelta(seconds=310))
franceconnect.token_endpoint_response = {'status_code': 500, 'content': 'Internal server error'}
franceconnect.token_endpoint_response = (500, {}, 'Internal server error')
response = franceconnect.login_with_fc(app, path='/accounts/')
assert len(caplog.records) == 10
assert caplog.records[-1].levelname == 'WARNING'