utils.requests: remote_service can be guessed (#13125)

This commit is contained in:
Thomas NOËL 2016-09-09 16:02:41 +02:00
parent 9115f7f3cd
commit e9e2f34ed9
2 changed files with 146 additions and 0 deletions

View File

@ -32,6 +32,7 @@ from Crypto import Random
from requests import Response, Session as RequestsSession
from django.conf import settings
from django.core.cache import cache
from django.utils.html import strip_tags
from django.utils.http import urlencode, quote
@ -87,6 +88,27 @@ class Requests(RequestsSession):
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
log_errors = kwargs.pop('log_errors', True)
if remote_service == 'auto':
remote_service = None
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
remote_url = service.get('url')
remote_scheme, remote_netloc, r_path, r_params, r_query, r_fragment = \
urlparse.urlparse(remote_url)
if remote_scheme == scheme and remote_netloc == netloc:
remote_service = service
break
else:
continue
break
if remote_service:
# only keeps the path (URI) in url parameter, scheme and netloc are
# in remote_service
url = urlparse.urlunparse(('', '', path, params, query, fragment))
else:
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
if remote_service:
if isinstance(user, dict):
query_params = user.copy()

124
tests/test_requests.py Normal file
View File

@ -0,0 +1,124 @@
import mock
import urlparse
from django.contrib.auth.models import AnonymousUser
from combo.utils import requests, check_query
class MockSAMLUser(object):
name_id = 'r2d2'
class MockUser(object):
email = 'foo@example.net'
def is_authenticated(self):
return True
def __init__(self, samlized=True):
class MockSAMLUsers(object):
def exists(self):
return True
def first(self):
return MockSAMLUser()
if samlized:
self.saml_identifiers = MockSAMLUsers()
def test_nosign():
with mock.patch('combo.utils.RequestsSession.request') as request:
requests.get('http://example.org/foo/bar/')
assert request.call_args[0][1] == 'http://example.org/foo/bar/'
def test_sign():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('combo.utils.RequestsSession.request') as request:
requests.get('/foo/bar/', remote_service=remote_service)
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'myself'
assert query['email'][0]== ''
assert query['NameID'][0]== ''
assert check_query(querystring, 'secret') == True
requests.get('/foo/bar/', remote_service=remote_service, without_user=True)
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'myself'
assert 'email' not in query
assert 'NameID' not in query
assert check_query(querystring, 'secret') == True
def test_auto_sign():
with mock.patch('combo.utils.RequestsSession.request') as request:
requests.get('http://example.org/foo/bar/', remote_service='auto')
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'combo'
assert check_query(querystring, 'combo') == True
requests.get('http://doesnotexist/foo/bar/', remote_service='auto')
assert request.call_args[0][1] == 'http://doesnotexist/foo/bar/'
def test_sign_user():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('combo.utils.RequestsSession.request') as request:
user = MockUser(samlized=True)
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['NameID'][0] == 'r2d2'
assert 'email' not in query
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
requests.get('/foo/bar/', remote_service=remote_service, user=user,
federation_key='email')
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['email'][0] == 'foo@example.net'
assert 'NameID' not in query
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
user = MockUser(samlized=False)
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert 'NameID' not in query
assert query['email'][0] == 'foo@example.net'
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
def test_sign_anonymous_user():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('combo.utils.RequestsSession.request') as request:
user = AnonymousUser()
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = request.call_args[0][1]
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['NameID'][0] == ''
assert query['email'][0] == ''
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True