misc: move http utils in authentic2.utils.http (#85934)

This commit is contained in:
Benjamin Dauvergne 2024-01-22 11:16:54 +01:00
parent d50622cb81
commit 50c0bb7f56
5 changed files with 175 additions and 120 deletions

View File

@ -0,0 +1,94 @@
# authentic2 - authentic2 authentication for FranceConnect
# Copyright (C) Entr'ouvert
import requests
import requests.adapters
import requests.exceptions
from django.conf import settings
from django.utils.translation import gettext as _
try:
from urllib3.util import Retry
except ImportError:
from requests.packages.urllib3.util.retry import Retry
def retry_session(
retries=3,
backoff_factor=0.5,
status_forcelist=(500, 502, 504),
session=None,
):
'''Create a requests session which retries after 0.5s then 1s'''
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
# set proxies
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
return session
class HTTPError(Exception):
def __init__(self, message, **details):
super().__init__(message)
self.details = details
def __str__(self):
s = super().__str__()
if self.details:
s += ' ('
s += ' '.join('%s=%r' % (k, v) for k, v in self.details.items())
s += ')'
return s
def request(method, url, session=None, expected_statuses=None, **kwargs):
session = retry_session(session=session)
try:
response = getattr(session, method)(
url,
**kwargs,
)
response.raise_for_status()
except requests.exceptions.HTTPError:
if expected_statuses and response.status_code in expected_statuses:
return response
try:
content = response.json()
except ValueError:
content = response.text[:256]
raise HTTPError(_('Status code is not 200.'), status_code=response.status_code, content=content)
except requests.exceptions.RequestException as e:
raise HTTPError(_('URL is unreachable.'), exception=e)
return response
def parse_json_response(response):
try:
content = response.json()
except ValueError:
raise HTTPError(_('Document at URL is not JSON.'), content=response.content[:1024])
return content
def post_json(url, **kwargs):
response = request('post', url=url, **kwargs)
return parse_json_response(response)
def get(url, **kwargs):
return request('get', url=url, **kwargs)
def get_json(url, **kwargs):
response = request('get', url=url, **kwargs)
return parse_json_response(response)

View File

@ -25,7 +25,6 @@ import os
import urllib.parse
import uuid
import requests
from django.conf import settings
from django.core.exceptions import ValidationError
from django.shortcuts import resolve_url
@ -34,8 +33,6 @@ from django.utils.encoding import force_bytes, force_str
from django.utils.http import urlencode
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error
from authentic2.validators import email_validator
from authentic2_auth_oidc.utils import parse_timestamp
@ -182,82 +179,6 @@ def apply_user_info_mappings(user, user_info):
user.save()
def requests_retry_session(
retries=3,
backoff_factor=0.5,
status_forcelist=(500, 502, 504),
session=None,
):
'''Create a requests session which retries after 0.5s then 1s'''
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
# set proxies
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
return session
class RequestError(Exception):
def __init__(self, message, **details):
super().__init__(message)
self.details = details
def __str__(self):
s = super().__str__()
if self.details:
s += ' ('
s += ' '.join('%s=%r' % (k, v) for k, v in self.details.items())
s += ')'
return s
def request_json(method, url, data=None, session=None, expected_statuses=None):
session = requests_retry_session(session=session)
try:
response = getattr(session, method)(
url,
data=data,
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
response.raise_for_status()
except requests.exceptions.HTTPError:
try:
content = response.json()
except ValueError:
content = response.text[:256]
if expected_statuses and response.status_code in expected_statuses:
return content
raise RequestError('status code is not 200', status_code=response.status_code, content=content)
except requests.exceptions.RequestException as e:
raise RequestError('HTTP request failed', exception=e)
try:
content = response.json()
except ValueError:
raise RequestError('content is not JSON', content=response.content[:1024])
if not isinstance(content, dict):
raise RequestError('content is not a dict', content=content)
return content
def post_json(url, data, expected_statuses=None):
return request_json('post', url, data=data, expected_statuses=expected_statuses)
def get_json(url, session, expected_statuses=None):
return request_json('get', url, session=session, expected_statuses=expected_statuses)
def clean_fc_session(session):
session.pop('fc_id_token', None)
session.pop('fc_id_token_raw', None)

View File

@ -45,17 +45,11 @@ from authentic2.utils import hooks
from authentic2.utils import misc as utils_misc
from authentic2.utils import views as utils_views
from authentic2.utils.crypto import check_hmac_url, hash_chain, hmac_url
from authentic2.utils.http import HTTPError, get_json, post_json
from authentic2.validators import email_validator
from . import app_settings, models, utils
from .utils import (
RequestError,
apply_user_info_mappings,
build_logout_url,
clean_fc_session,
get_json,
post_json,
)
from .utils import apply_user_info_mappings, build_logout_url, clean_fc_session
logger = logging.getLogger(__name__)
User = get_user_model()
@ -318,8 +312,15 @@ class LoginOrLinkView(View):
logger.debug('auth_fc: resolve_access_token request params %s', data)
try:
token = post_json(self.authenticator.token_url, data, expected_statuses=[400])
except RequestError as e:
token = post_json(
url=self.authenticator.token_url,
data=data,
expected_statuses=[400],
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
except HTTPError as e:
logger.warning('auth_fc: resolve_authorization_code error %s', e)
return None
else:
@ -329,10 +330,13 @@ class LoginOrLinkView(View):
def get_user_info(self):
try:
data = get_json(
self.authenticator.userinfo_url + '?schema=openid',
url=self.authenticator.userinfo_url + '?schema=openid',
session=OAuth2Session(self.authenticator.client_id, token=self.token),
verify=app_settings.verify_certificate,
allow_redirects=False,
timeout=3,
)
except RequestError as e:
except HTTPError as e:
logger.warning('auth_fc: get_user_info error %s', e)
return None
logger.debug('auth_fc: get_user_info returned %r', data)

View File

@ -17,13 +17,10 @@
import datetime
import json
import logging
import os
import re
import urllib.parse
from unittest import mock
import pytest
import requests
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.urls import reverse
@ -37,7 +34,6 @@ from authentic2.custom_user.models import DeletedUser
from authentic2.models import Attribute, AttributeValue
from authentic2_auth_fc import models
from authentic2_auth_fc.backends import FcBackend
from authentic2_auth_fc.utils import requests_retry_session
from ..utils import assert_event, decode_cookie, get_link_from_mail, login, set_service
@ -302,31 +298,6 @@ def test_login_email_is_unique_and_already_linked(settings, app, franceconnect,
assert response.location == '/idp/'
def test_requests_proxies_support(settings, app, monkeypatch):
session = requests_retry_session()
assert not session.proxies
other_session = requests.Session()
other_session.proxies = {'http': 'http://example.net'}
session = requests_retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = requests_retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
# on local test execution 'NO_PROXY' env variable might be set
if 'NO_PROXY' in os.environ:
monkeypatch.delenv('NO_PROXY')
if 'no_proxy' in os.environ:
monkeypatch.delenv('no_proxy')
with mock.patch('authentic2_auth_fc.utils.requests.Session.send') as mocked_send:
mocked_send.return_value = mock.Mock(status_code=200, content='whatever')
session.get('https://example.net/')
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox):
user = User(email='john.doe@example.com')
user.set_unusable_password()

65
tests/test_utils_http.py Normal file
View File

@ -0,0 +1,65 @@
# authentic2 - authentic2 authentication for FranceConnect
# Copyright (C) Entr'ouvert
import os
import pytest
import requests
import requests.exceptions
import responses
from authentic2.utils.http import HTTPError, get_json, post_json, retry_session
@responses.activate
def test_requests_proxies(settings, monkeypatch):
session = retry_session()
assert not session.proxies
other_session = requests.Session()
other_session.proxies = {'http': 'http://example.net'}
session = retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
# on local test execution 'NO_PROXY' env variable might be set
if 'NO_PROXY' in os.environ:
monkeypatch.delenv('NO_PROXY')
if 'no_proxy' in os.environ:
monkeypatch.delenv('no_proxy')
response = responses.get('https://example.net/', status=200, body=b'whatever')
session.get('https://example.net/')
assert response.calls[0][0].req_kwargs['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
@responses.activate
def test_get_json():
responses.get('http://example.net/json', json={'foo': 'bar'})
assert get_json('http://example.net/json') == {'foo': 'bar'}
@responses.activate
def test_post_json():
response = responses.post('http://example.net/json', json={'foo': 'bar'})
assert post_json('http://example.net/json', data={'bar': 'foo'}) == {'foo': 'bar'}
assert response.calls[0].request.body == 'bar=foo'
response = responses.post('http://example.net/json', json={'foo': 'bar'})
assert post_json('http://example.net/json', json={'bar': 'foo'}) == {'foo': 'bar'}
assert response.calls[0].request.body == b'{"bar": "foo"}'
@responses.activate
def test_http_error():
responses.get('http://example.net/connection-error', body=requests.exceptions.ConnectionError())
with pytest.raises(HTTPError):
assert get_json('http://example.net/connection-error')
responses.get('http://example.net/bad-json', body=b'{')
with pytest.raises(HTTPError):
assert get_json('http://example.net/connection-error')