rest_authentication: raise APIError for signature errors (#39911)
This commit is contained in:
parent
8c29562d35
commit
9225efdb96
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
|
||||
from rest_framework import authentication, exceptions
|
||||
from rest_framework import authentication, exceptions, status
|
||||
|
||||
from hobo import signature
|
||||
|
||||
|
@ -61,6 +61,14 @@ class AnonymousAdminServiceUser(AnonymousUser):
|
|||
return 'Publik Service Admin'
|
||||
|
||||
|
||||
class PublikAuthenticationFailed(exceptions.APIException):
|
||||
status_code = status.HTTP_401_UNAUTHORIZED
|
||||
default_code = 'invalid-signature'
|
||||
|
||||
def __init__(self, code):
|
||||
self.detail = {'err': code}
|
||||
|
||||
|
||||
class PublikAuthentication(authentication.BaseAuthentication):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
@ -80,16 +88,15 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
|||
try:
|
||||
return User.objects.get(uuid=name_id)
|
||||
except User.DoesNotExist:
|
||||
raise exceptions.AuthenticationFailed('No user matches uuid=%r' % name_id)
|
||||
raise PublikAuthenticationFailed('user-not-found')
|
||||
|
||||
elif UserSAMLIdentifier:
|
||||
try:
|
||||
return UserSAMLIdentifier.objects.get(name_id=name_id).user
|
||||
except UserSAMLIdentifier.DoesNotExist:
|
||||
raise exceptions.AuthenticationFailed(
|
||||
'No user matches nameid=%r' % name_id)
|
||||
raise PublikAuthenticationFailed('user-not-found')
|
||||
else:
|
||||
raise exceptions.AuthenticationFailed(
|
||||
'No usable model to match nameid=%r' % name_id)
|
||||
raise PublikAuthenticationFailed('no-usable-model')
|
||||
else:
|
||||
orig = request.GET['orig']
|
||||
try:
|
||||
|
@ -100,18 +107,18 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
|||
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
||||
self.logger.info('anonymous signature validated')
|
||||
return klass()
|
||||
raise exceptions.AuthenticationFailed('Anonymous service user is unsupported')
|
||||
raise PublikAuthenticationFailed('no-user-for-orig')
|
||||
|
||||
def get_orig_key(self, orig):
|
||||
if not hasattr(settings, 'KNOWN_SERVICES'):
|
||||
self.logger.warning('no known services')
|
||||
raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting')
|
||||
raise PublikAuthenticationFailed('no-known-services-setting')
|
||||
for service_id in settings.KNOWN_SERVICES:
|
||||
for slug, service in settings.KNOWN_SERVICES[service_id].items():
|
||||
if service.get('verif_orig') == orig and service.get('secret'):
|
||||
return service['secret']
|
||||
self.logger.warning('no secret found for origin %r', orig)
|
||||
raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig)
|
||||
raise PublikAuthenticationFailed('no-secret-found-for-orig')
|
||||
|
||||
def authenticate(self, request):
|
||||
full_path = request.get_full_path()
|
||||
|
@ -120,7 +127,7 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
|||
key = self.get_orig_key(request.GET['orig'])
|
||||
if not signature.check_url(full_path, key):
|
||||
self.logger.warning('invalid signature')
|
||||
raise exceptions.AuthenticationFailed('Invalid signature')
|
||||
raise PublikAuthenticationFailed('invalid-signature')
|
||||
user = self.resolve_user(request)
|
||||
self.logger.info('user authenticated with signature %s', user)
|
||||
return (user, None)
|
||||
|
|
|
@ -1,12 +1,30 @@
|
|||
# hobo - portal to configure and deploy applications
|
||||
# Copyright (C) 2015-2020 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
|
||||
from rest_framework.exceptions import AuthenticationFailed
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.test import RequestFactory
|
||||
from tenant_schemas.utils import tenant_context
|
||||
|
||||
from rest_framework.views import APIView
|
||||
from rest_framework.response import Response
|
||||
from rest_framework import permissions
|
||||
|
||||
from hobo import signature, rest_authentication
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
@ -71,6 +89,69 @@ def test_publik_authentication(tenant, settings):
|
|||
request = factory.get(signature.sign_url(URL + AUTH_QUERY, key + 'zob'))
|
||||
|
||||
publik_authentication = rest_authentication.PublikAuthentication()
|
||||
with pytest.raises(AuthenticationFailed):
|
||||
with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info:
|
||||
publik_authentication.authenticate(request)
|
||||
assert exc_info.value.detail['err'] == 'invalid-signature'
|
||||
|
||||
|
||||
def test_response(rf, settings, tenant):
|
||||
|
||||
with tenant_context(tenant):
|
||||
request = rf.get('/')
|
||||
|
||||
del settings.KNOWN_SERVICES
|
||||
|
||||
class View(APIView):
|
||||
authentication_classes = (rest_authentication.PublikAuthentication,)
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
|
||||
def get(self, request, format=None):
|
||||
return Response({'err': 0})
|
||||
|
||||
view = View.as_view()
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 403
|
||||
|
||||
request = rf.get('/?signature=aaa&orig=zzz')
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'no-known-services-setting'
|
||||
|
||||
secret_key = 'bbb'
|
||||
settings.KNOWN_SERVICES = {
|
||||
'whatever': {
|
||||
'whatever': {
|
||||
'verif_orig': 'zzz',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'no-secret-found-for-orig'
|
||||
|
||||
settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data['err'] == 'invalid-signature'
|
||||
|
||||
# User authentication
|
||||
request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key))
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data == {'err': 'user-not-found'}
|
||||
|
||||
# Service authentication
|
||||
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
|
||||
|
||||
response = view(request)
|
||||
assert response.status_code == 200
|
||||
assert response.data == {'err': 0}
|
||||
|
||||
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
|
||||
response = view(request)
|
||||
assert response.status_code == 401
|
||||
assert response.data == {'err': 'no-user-for-orig'}
|
||||
|
|
Loading…
Reference in New Issue