tests: replace httmock with responses for auth_oidc tests (#85702)
gitea/authentic/pipeline/head This commit looks good Details

This commit is contained in:
Yann Weber 2024-03-05 10:00:30 +01:00
parent 6b5c57ef9c
commit 851ac267b3
1 changed files with 60 additions and 73 deletions

View File

@ -24,6 +24,7 @@ import urllib.parse
from unittest import mock
import pytest
import responses
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.messages import constants as message_constants
@ -34,7 +35,6 @@ from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.timezone import now, utc
from httmock import HTTMock, urlmatch
from jwcrypto.common import base64url_decode, base64url_encode, json_encode
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jws import JWS, InvalidJWSObject
@ -244,6 +244,11 @@ def _signature(oidc_provider):
return json.loads(jws.serialize())['signature']
def any_params_matcher(*args):
"""Wildcard matcher for responses URL parameters"""
return (True, '')
def oidc_provider_mock(
oidc_provider,
oidc_provider_jwkset,
@ -256,13 +261,9 @@ def oidc_provider_mock(
kid=None,
idtoken_algo=None,
):
token_endpoint = urllib.parse.urlparse(oidc_provider.token_endpoint)
userinfo_endpoint = urllib.parse.urlparse(oidc_provider.userinfo_endpoint)
token_revocation_endpoint = urllib.parse.urlparse(oidc_provider.token_revocation_endpoint)
idtoken_algo = idtoken_algo or oidc_provider.idtoken_algo
@urlmatch(netloc=token_endpoint.netloc, path=token_endpoint.path)
def token_endpoint_mock(url, request):
def token_endpoint_mock(request):
if urllib.parse.parse_qs(request.body).get('code') == [code]:
exp = now() + datetime.timedelta(seconds=10)
id_token = {
@ -308,29 +309,15 @@ def oidc_provider_mock(
'token_type': random.choice(['B', 'b']) + 'earer',
'id_token': jwt.serialize(),
}
return {
'content': json.dumps(content),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
return (200, {'Content-Type': 'application/json'}, json.dumps(content))
else:
return {
'content': json.dumps(
{
'error': 'invalid request',
'error_description': 'Requête invalide',
}
),
'headers': {
'content-type': 'application/json',
},
'status_code': 400,
}
return (
400,
{'Content-Type': 'application/json'},
json.dumps({'error': 'invalid request', 'error_description': 'Requête invalide'}),
)
@urlmatch(netloc=userinfo_endpoint.netloc, path=userinfo_endpoint.path)
def user_info_endpoint_mock(url, request):
def user_info_endpoint_mock(request):
user_info = {
'sub': sub,
'iss': oidc_provider.issuer,
@ -342,23 +329,32 @@ def oidc_provider_mock(
}
if extra_user_info:
user_info.update(extra_user_info)
return {
'content': json.dumps(user_info),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
@urlmatch(netloc=token_revocation_endpoint.netloc, path=token_revocation_endpoint.path)
def token_revocation_endpoint_mock(url, request):
return (200, {'Content-Type': 'application/json'}, json.dumps(user_info))
def token_revocation_endpoint_mock(request):
query = urllib.parse.parse_qs(request.body)
assert 'token' in query
return {
'status_code': 200,
}
return (200, {}, '')
return HTTMock(token_endpoint_mock, user_info_endpoint_mock, token_revocation_endpoint_mock)
rsps = responses.RequestsMock(assert_all_requests_are_fired=False)
rsps.add_callback(
'POST', url=oidc_provider.token_endpoint, match=[any_params_matcher], callback=token_endpoint_mock
)
rsps.add_callback(
'GET',
url=oidc_provider.userinfo_endpoint,
match=[any_params_matcher],
callback=user_info_endpoint_mock,
)
rsps.add_callback(
'POST',
url=oidc_provider.token_revocation_endpoint,
match=[any_params_matcher],
callback=token_revocation_endpoint_mock,
)
return rsps
def login_callback_url(oidc_provider):
@ -380,24 +376,16 @@ def test_oidc_provider_key_sig_consistency(db):
def test_oidc_provider_jwkset_url(db):
jwkset_url = urllib.parse.urlparse(JWKSET_URL)
@urlmatch(netloc=jwkset_url.netloc, path=jwkset_url.path)
def jwkset_url_mock(url, request):
def jwkset_url_mock(request):
key_rsa = JWK.generate(kty='RSA', size=512, kid=ANOTHER_KID_RSA)
key_ec = JWK.generate(kty='EC', size=256, kid=ANOTHER_KID_EC)
jwkset = JWKSet()
jwkset.add(key_rsa)
jwkset.add(key_ec)
return {
'content': json.dumps(jwkset.export(as_dict=True)),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
return (200, {'Content-Type': 'application/json'}, json.dumps(jwkset.export(as_dict=True)))
with HTTMock(jwkset_url_mock):
with responses.RequestsMock() as rsps:
rsps.add_callback('GET', url=JWKSET_URL, match=[any_params_matcher], callback=jwkset_url_mock)
issuer = ('https://www.example.com',)
provider = OIDCProvider(
ou=get_default_ou(),
@ -437,22 +425,15 @@ def test_claim_mapping_wrong_source(app, oidc_provider, rf):
claim.required = False
claim.save()
@urlmatch(netloc='server.example.com', path='/user_info')
def empty_user_info(url, request):
return {
'content': 'null',
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
request = rf.get('/')
header = _header(oidc_provider)
signature = _signature(oidc_provider)
id_token = f'{header}.{payload}.{signature}'
with HTTMock(empty_user_info):
with responses.RequestsMock() as rsps:
rsps.add(
'GET', url=oidc_provider.userinfo_endpoint, match=[any_params_matcher], body='null', status=200
)
backend.authenticate(request, access_token='auietrns', id_token=id_token, provider=oidc_provider)
@ -546,6 +527,7 @@ def test_login_autorun(oidc_provider, app, settings):
assert response['Location'].startswith('https://server.example.com/authorize')
@responses.activate
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
cassis = OrganizationalUnit.objects.create(name='Cassis', slug='cassis')
@ -929,13 +911,16 @@ def test_register_issuer(db, app, caplog, oidc_provider_jwkset):
config_file = os.path.join(config_dir, 'openid_configuration.json')
with open(config_file) as f:
oidc_conf = json.load(f)
jwks_uri = urllib.parse.urlparse(oidc_conf['jwks_uri'])
@urlmatch(netloc=jwks_uri.netloc, path=jwks_uri.path)
def jwks_mock(url, request):
return oidc_provider_jwkset.export()
with HTTMock(jwks_mock):
mock_args = {
'method': 'GET',
'url': oidc_conf['jwks_uri'],
'body': oidc_provider_jwkset.export(),
'match': [any_params_matcher],
'status': 200,
}
with responses.RequestsMock() as rsps:
rsps.add(**mock_args)
register_issuer(
name='test_issuer',
client_id='abc',
@ -945,7 +930,8 @@ def test_register_issuer(db, app, caplog, oidc_provider_jwkset):
)
oidc_conf['id_token_signing_alg_values_supported'] = ['HS256']
with HTTMock(jwks_mock):
with responses.RequestsMock() as rsps:
rsps.add(**mock_args)
register_issuer(
name='test_issuer_hmac_only',
client_id='ghi',
@ -1084,11 +1070,12 @@ def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hook
caplog.clear()
@urlmatch()
def norequest(url, request):
def norequest(request):
assert False, 'no request should be done'
with HTTMock(norequest):
with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps:
for meth in ('POST', 'GET', 'PATCH', 'PUT'):
rsps.add_callback(meth, url=re.compile('^.*$'), callback=norequest)
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
# not logged