rest_authentication: add api client authentication (#67085)
gitea-wip/hobo/pipeline/head There was a failure building this commit Details
gitea/hobo/pipeline/head Something is wrong with the build of this commit Details

This commit is contained in:
Emmanuel Cazenave 2022-08-22 14:23:04 +02:00
parent 57d8569376
commit bc959e928e
6 changed files with 206 additions and 1 deletions

View File

@ -397,7 +397,10 @@ STATICS_HASH_COUNTER = '/var/lib/publik/statics-counter'
if 'rest_framework' in INSTALLED_APPS:
if 'REST_FRAMEWORK' not in globals():
REST_FRAMEWORK = {}
REST_FRAMEWORK['DEFAULT_AUTHENTICATION_CLASSES'] = ('hobo.rest_authentication.PublikAuthentication',)
REST_FRAMEWORK['DEFAULT_AUTHENTICATION_CLASSES'] = (
'hobo.rest_authentication.PublikAuthentication',
'hobo.rest_authentication.APIClientAuthentication',
)
REST_FRAMEWORK['DEFAULT_PERMISSION_CLASSES'] = ('rest_framework.permissions.IsAuthenticated',)
REST_FRAMEWORK['DEFAULT_RENDERER_CLASSES'] = ('rest_framework.renderers.JSONRenderer',)

View File

@ -1,5 +1,6 @@
import logging
import requests
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
@ -8,6 +9,7 @@ from django.utils.module_loading import import_string
from rest_framework import authentication, exceptions, status
from hobo import signature
from hobo.requests_wrapper import Requests
try:
from mellon.models import UserSAMLIdentifier
@ -51,6 +53,32 @@ class AnonymousAdminServiceUser(AnonymousUser):
return 'Publik Service Admin'
class APIClientUser:
is_active = True
is_anonymous = False
is_authenticated = True
is_superuser = False
roles = []
def __init__(self, is_active, is_anonymous, is_authenticated, is_superuser, roles):
self.is_active = is_active
self.is_anonymous = is_anonymous
self.is_authenticated = is_authenticated
self.is_superuser = is_superuser
self.roles = roles
@classmethod
def from_dict(cls, data):
return cls(
is_active=data['is_active'],
is_anonymous=data['is_anonymous'],
is_authenticated=data['is_authenticated'],
is_superuser=data['is_superuser'],
roles=data['roles'],
)
class PublikAuthenticationFailed(exceptions.APIException):
status_code = status.HTTP_401_UNAUTHORIZED
default_code = 'invalid-signature'
@ -125,3 +153,41 @@ class PublikAuthentication(authentication.BaseAuthentication):
user = self.resolve_user(request)
self.logger.info('user authenticated with signature %s', user)
return (user, None)
class APIClientAuthenticationUnavailable(exceptions.APIException):
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
default_code = 'IDP temporarily unavailable, try again later.'
def __init__(self):
self.detail = {'err': 1, 'err_desc': self.default_code}
class APIClientAuthentication(authentication.BasicAuthentication):
def authenticate_credentials(self, identifier, password, request=None):
idp_services = list(getattr(settings, 'KNOWN_SERVICES', {}).get('authentic', {}).values())
if not idp_services:
return None
authentic = idp_services[0]
url = authentic['url'] + 'api/check-api-client/'
try:
response = Requests().post(url, json={'identifier': identifier, 'password': password})
except requests.Timeout:
raise APIClientAuthenticationUnavailable()
except requests.RequestException as err:
raise APIClientAuthenticationUnavailable()
try:
response.raise_for_status()
except requests.exceptions.RequestException:
return None
result = response.json()
if 'err' not in result or 'data' not in result or result['err'] == 1:
return None
try:
api_client = APIClientUser.from_dict(result['data'])
except Exception:
return None
return api_client, None

View File

@ -3,6 +3,9 @@ import logging
from django.conf.urls import url
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework.views import APIView
def helloworld(request):
@ -16,6 +19,15 @@ def helloworld(request):
return HttpResponse('Hello world %s' % request.META['REMOTE_ADDR'])
class AuthenticatedTestView(APIView):
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
return Response({'some': 'data'})
urlpatterns = [
url(r'^$', helloworld),
url(r'^authenticated-testview/', AuthenticatedTestView.as_view()),
]

View File

@ -26,3 +26,11 @@ DATABASES = {
}
TEMPLATES[0]['OPTIONS'].setdefault('builtins', []).append('hobo.templatetags.hobo')
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'hobo.rest_authentication.PublikAuthentication',
'hobo.rest_authentication.APIClientAuthentication',
),
'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer',),
}

View File

@ -0,0 +1,115 @@
import base64
import pytest
import requests
import responses
from django.test import RequestFactory
@pytest.fixture
def settings_with_idp(settings):
settings.ROOT_URLCONF = 'hobo.test_urls'
settings.KNOWN_SERVICES = {
'authentic': {
'idp': {
'title': 'Foobar',
'url': 'https://idp.example.invalid/',
'orig': 'example.org',
'secret': 'xxx',
}
}
}
return settings
@pytest.fixture
def app_with_auth(app):
app.authorization = ('Basic', ('foo', 'bar'))
return app
def test_no_known_services(app_with_auth, db, settings):
settings.ROOT_URLCONF = 'hobo.test_urls'
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_no_idp_in_known_services(app_with_auth, db, settings):
settings.ROOT_URLCONF = 'hobo.test_urls'
settings.KNOWN_SERVICES = {}
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_connection_error(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post('https://idp.example.invalid/api/check-api-client/', status=403)
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_timeout(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post('https://idp.example.invalid/api/check-api-client/', body=requests.Timeout('...'))
resp = app_with_auth.get('/authenticated-testview/', status=503)
assert resp.json == {'err': 1, 'err_desc': 'IDP temporarily unavailable, try again later.'}
def test_idp_connection_error(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post('https://idp.example.invalid/api/check-api-client/', body=requests.RequestException('...'))
resp = app_with_auth.get('/authenticated-testview/', status=503)
assert resp.json == {'err': 1, 'err_desc': 'IDP temporarily unavailable, try again later.'}
def test_idp_no_err_key(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post(
'https://idp.example.invalid/api/check-api-client/',
json={'foo': 'bar'},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_app_error(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post(
'https://idp.example.invalid/api/check-api-client/',
json={'err': 1},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_idp_wrong_serialization(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post(
'https://idp.example.invalid/api/check-api-client/',
json={'err': 0, 'data': {'foo': 'bar'}},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/', status=403)
def test_no_credentials(app, db, settings_with_idp):
# test that the '/authenticated-testview/' really requires authentication,
# otherwise all the others tests are meaningless.
resp = app.get('/authenticated-testview/', status=403)
def test_access_granted(app_with_auth, db, settings_with_idp):
with responses.RequestsMock() as rsps:
rsps.post(
'https://idp.example.invalid/api/check-api-client/',
json={
'err': 0,
'data': {
'is_active': True,
'is_anonymous': False,
'is_authenticated': True,
'is_superuser': False,
'restrict_to_anonymised_data': False,
'roles': [],
},
},
status=200,
)
resp = app_with_auth.get('/authenticated-testview/')

View File

@ -57,6 +57,7 @@ deps:
mock<4
httmock
requests
responses
pytest-freezegun
xmlschema<1.1
enum34<=1.1.6