175 lines
6.3 KiB
Python
175 lines
6.3 KiB
Python
# hobo - portal to configure and deploy applications
|
|
# Copyright (C) 2015-2020 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import re
|
|
import urllib.parse
|
|
|
|
import pytest
|
|
from django.contrib.auth import get_user_model
|
|
from django.test import RequestFactory
|
|
from rest_framework import permissions
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from hobo import rest_authentication, signature
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def test_publik_authentication(tenant, settings):
|
|
settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS = 'hobo.rest_authentication.AnonymousAdminServiceUser'
|
|
with tenant_context(tenant):
|
|
key = settings.KNOWN_SERVICES['welco']['other']['secret']
|
|
|
|
settings.HOBO_ROLE_EXPORT = False
|
|
User = get_user_model()
|
|
user = User.objects.create(username='foo', password='foo')
|
|
ORIG = 'other.example.net'
|
|
AUTH_QUERY = '&NameID=%s&orig=%s' % (user.uuid, urllib.parse.quote(ORIG))
|
|
|
|
URL = '/api/?coucou=zob'
|
|
factory = RequestFactory()
|
|
request = factory.get(signature.sign_url(URL + AUTH_QUERY, key))
|
|
|
|
publik_authentication = rest_authentication.PublikAuthentication()
|
|
result = publik_authentication.authenticate(request)
|
|
assert result is not None
|
|
assert isinstance(result, tuple)
|
|
assert len(result) == 2
|
|
assert result[0] == user
|
|
assert result[1] is None
|
|
|
|
# Test anonymous user
|
|
AUTH_QUERY = '&orig=%s' % urllib.parse.quote(ORIG)
|
|
request = factory.get(signature.sign_url(URL + AUTH_QUERY, key))
|
|
|
|
publik_authentication = rest_authentication.PublikAuthentication()
|
|
result = publik_authentication.authenticate(request)
|
|
assert result is not None
|
|
assert isinstance(result, tuple)
|
|
assert len(result) == 2
|
|
assert result[0].__class__ is rest_authentication.AnonymousAdminServiceUser
|
|
assert result[0].is_authenticated
|
|
assert result[0].is_authenticated
|
|
assert result[0].is_anonymous
|
|
assert result[0].is_anonymous
|
|
assert result[0].is_staff
|
|
assert result[1] is None
|
|
|
|
# Test user named after service orig
|
|
service_user = User.objects.create(username=ORIG)
|
|
|
|
AUTH_QUERY = '&orig=%s' % urllib.parse.quote(ORIG)
|
|
request = factory.get(signature.sign_url(URL + AUTH_QUERY, key))
|
|
|
|
publik_authentication = rest_authentication.PublikAuthentication()
|
|
result = publik_authentication.authenticate(request)
|
|
assert result is not None
|
|
assert isinstance(result, tuple)
|
|
assert len(result) == 2
|
|
assert result[0] == service_user
|
|
assert result[1] is None
|
|
|
|
# Failure
|
|
request = factory.get(signature.sign_url(URL + AUTH_QUERY, key + 'zob'))
|
|
|
|
publik_authentication = rest_authentication.PublikAuthentication()
|
|
with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info:
|
|
publik_authentication.authenticate(request)
|
|
assert exc_info.value.detail['err'] == 1
|
|
assert exc_info.value.detail['err_desc'] == 'HMAC hash is different'
|
|
|
|
|
|
def test_response(rf, settings, tenant):
|
|
with tenant_context(tenant):
|
|
request = rf.get('/')
|
|
|
|
del settings.KNOWN_SERVICES
|
|
|
|
class View(APIView):
|
|
authentication_classes = (rest_authentication.PublikAuthentication,)
|
|
permission_classes = (permissions.IsAuthenticated,)
|
|
|
|
def get(self, request, format=None):
|
|
return Response({'err': 0})
|
|
|
|
view = View.as_view()
|
|
|
|
response = view(request)
|
|
assert response.status_code == 403
|
|
|
|
request = rf.get('/?signature=aaa&orig=zzz')
|
|
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert response.data['err_desc'] == 'no-known-services-setting'
|
|
|
|
secret_key = 'bbb'
|
|
settings.KNOWN_SERVICES = {
|
|
'whatever': {
|
|
'whatever': {
|
|
'verif_orig': 'zzz',
|
|
}
|
|
}
|
|
}
|
|
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert response.data['err_desc'] == 'no-secret-found-for-orig'
|
|
|
|
settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert response.data['err_desc'] == 'multiple/missing algo'
|
|
|
|
# User authentication
|
|
request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key))
|
|
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert response.data['err_desc'] == 'user-not-found'
|
|
|
|
# Service authentication, wrong timestamp
|
|
request = rf.get(
|
|
re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key))
|
|
)
|
|
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert (
|
|
response.data['err_desc']
|
|
== "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'"
|
|
)
|
|
|
|
# Service authentication
|
|
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
|
|
|
|
response = view(request)
|
|
assert response.status_code == 200
|
|
assert response.data == {'err': 0}
|
|
|
|
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
|
|
response = view(request)
|
|
assert response.status_code == 401
|
|
assert response.data['err'] == 1
|
|
assert response.data['err_desc'] == 'no-user-for-orig'
|