utils: add signature tools (#44159)

This commit is contained in:
Valentin Deniaud 2020-07-08 16:07:55 +02:00
parent 895ddbc712
commit 10caec88f6
5 changed files with 426 additions and 0 deletions

View File

@ -161,6 +161,10 @@ MELLON_IDENTITY_PROVIDERS = []
# (see http://docs.python-requests.org/en/master/user/advanced/#proxies)
REQUESTS_PROXIES = None
# timeout used in python-requests call, in seconds
# we use 28s by default: timeout just before web server, which is usually 30s
REQUESTS_TIMEOUT = 28
local_settings_file = os.environ.get(
'CHRONO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')
)

View File

@ -0,0 +1,154 @@
# combo - content management system
# Copyright (C) 2015-2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import logging
from requests import Response, Session as RequestsSession
from requests.auth import AuthBase
from django.conf import settings
from django.core.cache import cache
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.six import BytesIO
from .signature import sign_url
class NothingInCacheException(Exception):
pass
class PublikSignature(AuthBase):
def __init__(self, secret):
self.secret = secret
def __call__(self, request):
request.url = sign_url(request.url, self.secret)
return request
class Requests(RequestsSession):
def request(self, method, url, **kwargs):
remote_service = kwargs.pop('remote_service', None)
cache_duration = kwargs.pop('cache_duration', 15)
invalidate_cache = kwargs.pop('invalidate_cache', False)
user = kwargs.pop('user', None)
django_request = kwargs.pop('django_request', None)
without_user = kwargs.pop('without_user', False)
federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid'
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
log_errors = kwargs.pop('log_errors', True)
# don't use persistent cookies
self.cookies.clear()
if remote_service == 'auto':
remote_service = None
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
for services in settings.KNOWN_SERVICES.values():
for service in services.values():
remote_url = service.get('url')
remote_scheme, remote_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse(
remote_url
)
if remote_scheme == scheme and remote_netloc == netloc:
remote_service = service
break
else:
continue
break
if remote_service:
# only keeps the path (URI) in url parameter, scheme and netloc are
# in remote_service
url = urlparse.urlunparse(('', '', path, params, query, fragment))
else:
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
if remote_service:
if isinstance(user, dict):
query_params = user.copy()
elif not user or not user.is_authenticated:
if without_user:
query_params = {}
else:
query_params = {'NameID': '', 'email': ''}
else:
query_params = {}
if federation_key == 'nameid':
query_params['NameID'] = user.get_name_id()
elif federation_key == 'email':
query_params['email'] = user.email
else: # 'auto'
user_name_id = user.get_name_id()
if user_name_id:
query_params['NameID'] = user_name_id
else:
query_params['email'] = user.email
query_params['orig'] = remote_service.get('orig')
remote_service_base_url = remote_service.get('url')
scheme, netloc, old_path, params, old_query, fragment = urlparse.urlparse(remote_service_base_url)
query = urlencode(query_params)
if '?' in url:
path, old_query = url.split('?', 1)
query += '&' + old_query
else:
path = url
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
if method == 'GET' and cache_duration:
# handle cache
cache_key = hashlib.md5(smart_bytes(url)).hexdigest()
cache_content = cache.get(cache_key)
if cache_content and not invalidate_cache:
response = Response()
response.status_code = 200
response.raw = BytesIO(smart_bytes(cache_content))
return response
elif raise_if_not_cached:
raise NothingInCacheException()
if remote_service: # sign
kwargs['auth'] = PublikSignature(remote_service.get('secret'))
kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
response = super(Requests, self).request(method, url, **kwargs)
if log_errors and (response.status_code // 100 != 2):
extra = {}
if django_request:
extra['request'] = django_request
if log_errors == 'warn':
logging.warning(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
else:
logging.error(
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
)
if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
cache.set(cache_key, response.content, cache_duration)
return response
requests = Requests()

107
chrono/utils/signature.py Normal file
View File

@ -0,0 +1,107 @@
# combo - content management system
# Copyright (C) 2015-2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import hmac
import hashlib
import random
from django.conf import settings
from django.utils.encoding import smart_bytes
from django.utils.http import quote, urlencode
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
# Simple signature scheme for query strings
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:]
new_query = query
if new_query:
new_query += '&'
new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + quote(signature)
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
return hash.digest()
def check_request_signature(django_request, keys=[]):
query_string = django_request.META['QUERY_STRING']
if not query_string:
return False
orig = django_request.GET.get('orig', '')
known_services = getattr(settings, 'KNOWN_SERVICES', None)
if known_services and orig:
for services in known_services.values():
for service in services.values():
if 'verif_orig' in service and service['verif_orig'] == orig:
keys.append(service['secret'])
break
return check_query(query_string, keys)
def check_query(query, keys, known_nonce=None, timedelta=30):
parsed = urlparse.parse_qs(query)
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed):
return False
unsigned_query, signature_content = query.split('&signature=', 1)
if '&' in signature_content:
return False # signature must be the last parameter
signature = base64.b64decode(parsed['signature'][0])
algo = parsed['algo'][0]
timestamp = parsed['timestamp'][0]
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
nonce = parsed['nonce']
if known_nonce is not None and known_nonce(nonce):
return False
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
return False
return check_string(unsigned_query, signature, keys, algo=algo)
def check_string(s, signature, keys, algo='sha256'):
if not isinstance(keys, list):
keys = [keys]
for key in keys:
signature2 = sign_string(s, key, algo=algo)
if len(signature2) != len(signature):
continue
res = 0
# constant time compare
for a, b in zip(signature, signature2):
res |= a ^ b
if res == 0:
return True
return False

View File

@ -13,3 +13,15 @@ DATABASES = {
'TEST': {'NAME': 'chrono-test-%s' % os.environ.get("BRANCH_NAME", "").replace('/', '-')[:63],},
}
}
KNOWN_SERVICES = {
'wcs': {
'default': {
'title': 'test',
'url': 'http://example.org',
'secret': 'chrono',
'orig': 'chrono',
'backoffice-menu-url': 'http://example.org/manage/',
}
},
}

149
tests/test_requests.py Normal file
View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
import mock
import pytest
from django.contrib.auth.models import AnonymousUser
from django.utils.six.moves.urllib import parse as urlparse
from chrono.utils.requests_wrapper import requests, NothingInCacheException
from chrono.utils.signature import check_query
class MockUser(object):
email = 'foo@example.net'
is_authenticated = True
def get_name_id(self):
if self.samlized:
return 'r2d2'
return None
def __init__(self, samlized=True):
self.samlized = samlized
def test_nosign():
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
requests.get('http://example.org/foo/bar/')
assert send.call_args[0][0].url == 'http://example.org/foo/bar/'
def test_sign():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
requests.get('/foo/bar/', remote_service=remote_service)
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'myself'
assert query['email'][0] == ''
assert query['NameID'][0] == ''
assert check_query(querystring, 'secret') == True
requests.get('/foo/bar/', remote_service=remote_service, without_user=True)
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'myself'
assert 'email' not in query
assert 'NameID' not in query
assert check_query(querystring, 'secret') == True
def test_auto_sign():
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
requests.get('http://example.org/foo/bar/', remote_service='auto')
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['orig'][0] == 'chrono'
assert check_query(querystring, 'chrono') == True
requests.get('http://doesnotexist/foo/bar/', remote_service='auto')
assert send.call_args[0][0].url == 'http://doesnotexist/foo/bar/'
def test_sign_user():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
user = MockUser(samlized=True)
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['NameID'][0] == 'r2d2'
assert 'email' not in query
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
requests.get('/foo/bar/', remote_service=remote_service, user=user, federation_key='email')
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['email'][0] == 'foo@example.net'
assert 'NameID' not in query
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
user = MockUser(samlized=False)
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert 'NameID' not in query
assert query['email'][0] == 'foo@example.net'
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
def test_sign_anonymous_user():
remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
user = AnonymousUser()
requests.get('/foo/bar/', remote_service=remote_service, user=user)
url = send.call_args[0][0].url
assert url.startswith('http://example.org/foo/bar/?')
scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
query = urlparse.parse_qs(querystring, keep_blank_values=True)
assert query['NameID'][0] == ''
assert query['email'][0] == ''
assert query['orig'][0] == 'myself'
assert check_query(querystring, 'secret') == True
def test_requests_cache():
with mock.patch('chrono.utils.requests_wrapper.RequestsSession.request') as requests_get:
requests_get.return_value = mock.Mock(content=b'hello world', status_code=200)
# default cache, nothing in there
assert requests.get('http://cache.example.org/').content == b'hello world'
assert requests_get.call_count == 1
# now there's something in cache
assert requests.get('http://cache.example.org/').content == b'hello world'
assert requests_get.call_count == 1
# value changed
requests_get.return_value = mock.Mock(content=b'hello second world', status_code=200)
assert requests.get('http://cache.example.org/').content == b'hello world'
assert requests_get.call_count == 1
# force cache invalidation
assert (
requests.get('http://cache.example.org/', invalidate_cache=True).content == b'hello second world'
)
assert requests_get.call_count == 2
# check raise_if_not_cached
with pytest.raises(NothingInCacheException):
requests.get('http://cache.example.org/other', raise_if_not_cached=True)
# check with unicode url
assert requests.get(u'http://cache.example.org/éléphant').content == b'hello second world'