rest_authentication: improve signature errors reporting (#57450)

This commit is contained in:
Benjamin Dauvergne 2021-09-30 13:19:39 +02:00
parent 1e2cf7f618
commit 9b46019149
3 changed files with 93 additions and 27 deletions

View File

@ -54,7 +54,7 @@ class PublikAuthenticationFailed(exceptions.APIException):
default_code = 'invalid-signature'
def __init__(self, code):
self.detail = {'err': code}
self.detail = {'err': 1, 'err_desc': code}
class PublikAuthentication(authentication.BaseAuthentication):
@ -113,9 +113,13 @@ class PublikAuthentication(authentication.BaseAuthentication):
if not request.GET.get('orig') or not request.GET.get('signature'):
return None
key = self.get_orig_key(request.GET['orig'])
if not signature.check_url(full_path, key):
self.logger.warning('invalid signature')
raise PublikAuthenticationFailed('invalid-signature')
try:
assert signature.check_url(
full_path, key, raise_on_error=True
), 'signature.check_url should never return False with raise_on_error'
except signature.SignatureError as e:
self.logger.warning('publik rest-framework-authentication failed: %s', e)
raise PublikAuthenticationFailed(str(e))
user = self.resolve_user(request)
self.logger.info('user authenticated with signature %s', user)
return (user, None)

View File

@ -3,6 +3,7 @@ import datetime
import hashlib
import hmac
import random
import secrets
from django.utils import six
from django.utils.encoding import smart_bytes
@ -12,6 +13,10 @@ from django.utils.six.moves.urllib import parse as urlparse
'''Simple signature scheme for query strings'''
class SignatureError(Exception):
pass
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
@ -43,36 +48,75 @@ def sign_string(s, key, algo='sha256'):
return hash.digest()
def check_url(url, key, known_nonce=None, timedelta=30):
def check_url(url, key, known_nonce=None, timedelta=30, raise_on_error=False):
parsed = urlparse.urlparse(url, 'https')
return check_query(parsed.query, key, known_nonce=known_nonce, timedelta=timedelta)
return check_query(
parsed.query, key, known_nonce=known_nonce, timedelta=timedelta, raise_on_error=raise_on_error
)
def check_query(query, key, known_nonce=None, timedelta=30):
def check_query(query, key, known_nonce=None, timedelta=30, raise_on_error=False):
parsed = urlparse.parse_qs(query)
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed):
parsed = {key: value[0] if len(value) == 1 else value for key, value in parsed.items()}
signature = parsed.get('signature')
if not signature or not isinstance(signature, str):
if raise_on_error:
raise SignatureError('multiple/missing signature')
return False
algo = parsed.get('algo')
if not algo or not isinstance(algo, str):
if raise_on_error:
raise SignatureError('multiple/missing algo')
return False
if algo != 'sha256':
if raise_on_error:
raise SignatureError('invalid algo, must be sha256')
return False
timestamp = parsed.get('timestamp')
if not timestamp or not isinstance(timestamp, str):
if raise_on_error:
raise SignatureError('multiple/missing timestamp')
return False
if known_nonce is not None:
if ('nonce' not in parsed) or known_nonce(parsed['nonce'][0]):
nonce = parsed.get('nonce')
if not nonce or not isinstance(nonce, str):
if raise_on_error:
raise SignatureError('multiple/missing nonce')
return False
if known_nonce(nonce):
if raise_on_error:
raise SignatureError('nonce replayed')
return False
unsigned_query, signature_content = query.split('&signature=', 1)
if '&' in signature_content:
if raise_on_error:
raise SignatureError('signature is not the last parameter')
return False # signature must be the last parameter
signature = base64.b64decode(parsed['signature'][0])
algo = parsed['algo'][0]
timestamp = parsed['timestamp'][0]
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
try:
signature = base64.b64decode(signature)
except ValueError:
if raise_on_error:
raise SignatureError('signature is invalid base64')
return False
return check_string(unsigned_query, signature, key, algo=algo)
try:
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
except ValueError as e:
if raise_on_error:
raise SignatureError('invalid timestamp, %s' % e)
return False
delta = abs(datetime.datetime.utcnow() - timestamp)
if delta > datetime.timedelta(seconds=timedelta):
if raise_on_error:
raise SignatureError('timestamp delta is more than %s seconds: %s' % (timedelta, delta))
return False
return check_string(unsigned_query, signature, key, algo=algo, raise_on_error=raise_on_error)
def check_string(s, signature, key, algo='sha256'):
def check_string(s, signature, key, algo='sha256', raise_on_error=False):
# constant time compare
signature2 = sign_string(s, key, algo=algo)
if len(signature2) != len(signature):
if not secrets.compare_digest(signature, signature2):
if raise_on_error:
raise SignatureError('HMAC hash is different')
return False
res = 0
for a, b in zip(signature, signature2):
res |= a ^ b
return res == 0
return True

View File

@ -14,6 +14,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import pytest
from django.contrib.auth import get_user_model
from django.test import RequestFactory
@ -88,7 +90,8 @@ def test_publik_authentication(tenant, settings):
publik_authentication = rest_authentication.PublikAuthentication()
with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info:
publik_authentication.authenticate(request)
assert exc_info.value.detail['err'] == 'invalid-signature'
assert exc_info.value.detail['err'] == 1
assert exc_info.value.detail['err_desc'] == 'HMAC hash is different'
def test_response(rf, settings, tenant):
@ -114,7 +117,8 @@ def test_response(rf, settings, tenant):
response = view(request)
assert response.status_code == 401
assert response.data['err'] == 'no-known-services-setting'
assert response.data['err'] == 1
assert response.data['err_desc'] == 'no-known-services-setting'
secret_key = 'bbb'
settings.KNOWN_SERVICES = {
@ -127,19 +131,33 @@ def test_response(rf, settings, tenant):
response = view(request)
assert response.status_code == 401
assert response.data['err'] == 'no-secret-found-for-orig'
assert response.data['err'] == 1
assert response.data['err_desc'] == 'no-secret-found-for-orig'
settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key
response = view(request)
assert response.status_code == 401
assert response.data['err'] == 'invalid-signature'
assert response.data['err'] == 1
assert response.data['err_desc'] == 'multiple/missing algo'
# User authentication
request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key))
response = view(request)
assert response.status_code == 401
assert response.data == {'err': 'user-not-found'}
assert response.data == {'err': 1, 'err_desc': 'user-not-found'}
# Service authentication, wrong timestamp
request = rf.get(
re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key))
)
response = view(request)
assert response.status_code == 401
assert response.data == {
'err': 1,
'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'",
}
# Service authentication
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
@ -151,4 +169,4 @@ def test_response(rf, settings, tenant):
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
response = view(request)
assert response.status_code == 401
assert response.data == {'err': 'no-user-for-orig'}
assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'}