rest_authentication: improve signature errors reporting (#57450)
This commit is contained in:
parent
1e2cf7f618
commit
9b46019149
|
@ -54,7 +54,7 @@ class PublikAuthenticationFailed(exceptions.APIException):
|
|||
default_code = 'invalid-signature'
|
||||
|
||||
def __init__(self, code):
|
||||
self.detail = {'err': code}
|
||||
self.detail = {'err': 1, 'err_desc': code}
|
||||
|
||||
|
||||
class PublikAuthentication(authentication.BaseAuthentication):
|
||||
|
@ -113,9 +113,13 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
|||
if not request.GET.get('orig') or not request.GET.get('signature'):
|
||||
return None
|
||||
key = self.get_orig_key(request.GET['orig'])
|
||||
if not signature.check_url(full_path, key):
|
||||
self.logger.warning('invalid signature')
|
||||
raise PublikAuthenticationFailed('invalid-signature')
|
||||
try:
|
||||
assert signature.check_url(
|
||||
full_path, key, raise_on_error=True
|
||||
), 'signature.check_url should never return False with raise_on_error'
|
||||
except signature.SignatureError as e:
|
||||
self.logger.warning('publik rest-framework-authentication failed: %s', e)
|
||||
raise PublikAuthenticationFailed(str(e))
|
||||
user = self.resolve_user(request)
|
||||
self.logger.info('user authenticated with signature %s', user)
|
||||
return (user, None)
|
||||
|
|
|
@ -3,6 +3,7 @@ import datetime
|
|||
import hashlib
|
||||
import hmac
|
||||
import random
|
||||
import secrets
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import smart_bytes
|
||||
|
@ -12,6 +13,10 @@ from django.utils.six.moves.urllib import parse as urlparse
|
|||
'''Simple signature scheme for query strings'''
|
||||
|
||||
|
||||
class SignatureError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
|
||||
parsed = urlparse.urlparse(url)
|
||||
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
|
||||
|
@ -43,36 +48,75 @@ def sign_string(s, key, algo='sha256'):
|
|||
return hash.digest()
|
||||
|
||||
|
||||
def check_url(url, key, known_nonce=None, timedelta=30):
|
||||
def check_url(url, key, known_nonce=None, timedelta=30, raise_on_error=False):
|
||||
parsed = urlparse.urlparse(url, 'https')
|
||||
return check_query(parsed.query, key, known_nonce=known_nonce, timedelta=timedelta)
|
||||
return check_query(
|
||||
parsed.query, key, known_nonce=known_nonce, timedelta=timedelta, raise_on_error=raise_on_error
|
||||
)
|
||||
|
||||
|
||||
def check_query(query, key, known_nonce=None, timedelta=30):
|
||||
def check_query(query, key, known_nonce=None, timedelta=30, raise_on_error=False):
|
||||
parsed = urlparse.parse_qs(query)
|
||||
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed):
|
||||
parsed = {key: value[0] if len(value) == 1 else value for key, value in parsed.items()}
|
||||
signature = parsed.get('signature')
|
||||
if not signature or not isinstance(signature, str):
|
||||
if raise_on_error:
|
||||
raise SignatureError('multiple/missing signature')
|
||||
return False
|
||||
algo = parsed.get('algo')
|
||||
if not algo or not isinstance(algo, str):
|
||||
if raise_on_error:
|
||||
raise SignatureError('multiple/missing algo')
|
||||
return False
|
||||
if algo != 'sha256':
|
||||
if raise_on_error:
|
||||
raise SignatureError('invalid algo, must be sha256')
|
||||
return False
|
||||
timestamp = parsed.get('timestamp')
|
||||
if not timestamp or not isinstance(timestamp, str):
|
||||
if raise_on_error:
|
||||
raise SignatureError('multiple/missing timestamp')
|
||||
return False
|
||||
if known_nonce is not None:
|
||||
if ('nonce' not in parsed) or known_nonce(parsed['nonce'][0]):
|
||||
nonce = parsed.get('nonce')
|
||||
if not nonce or not isinstance(nonce, str):
|
||||
if raise_on_error:
|
||||
raise SignatureError('multiple/missing nonce')
|
||||
return False
|
||||
if known_nonce(nonce):
|
||||
if raise_on_error:
|
||||
raise SignatureError('nonce replayed')
|
||||
return False
|
||||
unsigned_query, signature_content = query.split('&signature=', 1)
|
||||
if '&' in signature_content:
|
||||
if raise_on_error:
|
||||
raise SignatureError('signature is not the last parameter')
|
||||
return False # signature must be the last parameter
|
||||
signature = base64.b64decode(parsed['signature'][0])
|
||||
algo = parsed['algo'][0]
|
||||
timestamp = parsed['timestamp'][0]
|
||||
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
|
||||
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
|
||||
try:
|
||||
signature = base64.b64decode(signature)
|
||||
except ValueError:
|
||||
if raise_on_error:
|
||||
raise SignatureError('signature is invalid base64')
|
||||
return False
|
||||
return check_string(unsigned_query, signature, key, algo=algo)
|
||||
try:
|
||||
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
|
||||
except ValueError as e:
|
||||
if raise_on_error:
|
||||
raise SignatureError('invalid timestamp, %s' % e)
|
||||
return False
|
||||
delta = abs(datetime.datetime.utcnow() - timestamp)
|
||||
if delta > datetime.timedelta(seconds=timedelta):
|
||||
if raise_on_error:
|
||||
raise SignatureError('timestamp delta is more than %s seconds: %s' % (timedelta, delta))
|
||||
return False
|
||||
return check_string(unsigned_query, signature, key, algo=algo, raise_on_error=raise_on_error)
|
||||
|
||||
|
||||
def check_string(s, signature, key, algo='sha256'):
|
||||
def check_string(s, signature, key, algo='sha256', raise_on_error=False):
|
||||
# constant time compare
|
||||
signature2 = sign_string(s, key, algo=algo)
|
||||
if len(signature2) != len(signature):
|
||||
if not secrets.compare_digest(signature, signature2):
|
||||
if raise_on_error:
|
||||
raise SignatureError('HMAC hash is different')
|
||||
return False
|
||||
res = 0
|
||||
for a, b in zip(signature, signature2):
|
||||
res |= a ^ b
|
||||
return res == 0
|
||||
return True
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.test import RequestFactory
|
||||
|
@ -88,7 +90,8 @@ def test_publik_authentication(tenant, settings):
|
|||
publik_authentication = rest_authentication.PublikAuthentication()
|
||||
with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info:
|
||||
publik_authentication.authenticate(request)
|
||||
assert exc_info.value.detail['err'] == 'invalid-signature'
|
||||
assert exc_info.value.detail['err'] == 1
|
||||
assert exc_info.value.detail['err_desc'] == 'HMAC hash is different'
|
||||
|
||||
|
||||
def test_response(rf, settings, tenant):
|
||||
|
@ -114,7 +117,8 @@ def test_response(rf, settings, tenant):
|
|||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'no-known-services-setting'
|
||||
assert response.data['err'] == 1
|
||||
assert response.data['err_desc'] == 'no-known-services-setting'
|
||||
|
||||
secret_key = 'bbb'
|
||||
settings.KNOWN_SERVICES = {
|
||||
|
@ -127,19 +131,33 @@ def test_response(rf, settings, tenant):
|
|||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'no-secret-found-for-orig'
|
||||
assert response.data['err'] == 1
|
||||
assert response.data['err_desc'] == 'no-secret-found-for-orig'
|
||||
|
||||
settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'invalid-signature'
|
||||
assert response.data['err'] == 1
|
||||
assert response.data['err_desc'] == 'multiple/missing algo'
|
||||
|
||||
# User authentication
|
||||
request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key))
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data == {'err': 'user-not-found'}
|
||||
assert response.data == {'err': 1, 'err_desc': 'user-not-found'}
|
||||
|
||||
# Service authentication, wrong timestamp
|
||||
request = rf.get(
|
||||
re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key))
|
||||
)
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data == {
|
||||
'err': 1,
|
||||
'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'",
|
||||
}
|
||||
|
||||
# Service authentication
|
||||
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
|
||||
|
@ -151,4 +169,4 @@ def test_response(rf, settings, tenant):
|
|||
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data == {'err': 'no-user-for-orig'}
|
||||
assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'}
|
||||
|
|
Loading…
Reference in New Issue