api: raise 401 on authenticated API access where basic auth is allowed (#41766)
This commit is contained in:
parent
c6fb3b312a
commit
84fe1caa53
|
@ -24,6 +24,7 @@ from django.utils.six.moves.urllib import parse as urlparse
|
|||
from quixote import cleanup, get_publisher
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.qommon.form import PicklableUpload
|
||||
from wcs.qommon.ident.password_accounts import PasswordAccount
|
||||
from wcs.qommon import ods
|
||||
from wcs.users import User
|
||||
from wcs.roles import Role
|
||||
|
@ -39,7 +40,7 @@ from wcs import fields, qommon
|
|||
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
|
||||
from wcs.qommon.errors import AccessForbiddenError
|
||||
|
||||
from utilities import get_app, create_temporary_pub, clean_temporary_pub
|
||||
from utilities import get_app, create_temporary_pub, clean_temporary_pub, login
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
|
@ -89,6 +90,12 @@ def admin_user():
|
|||
user.name_identifiers = ['0123456789']
|
||||
user.is_admin = True
|
||||
user.store()
|
||||
|
||||
account = PasswordAccount(id='admin')
|
||||
account.set_password('admin')
|
||||
account.user_id = user.id
|
||||
account.store()
|
||||
|
||||
return user
|
||||
|
||||
|
||||
|
@ -2442,13 +2449,22 @@ def test_api_ics_formdata(pub, local_user, ics_data):
|
|||
assert resp.text.count('DTEND') == 10
|
||||
|
||||
|
||||
def test_api_ics_formdata_http_auth(pub, local_user, ics_data):
|
||||
def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
|
||||
role = Role.select()[0]
|
||||
|
||||
# check as admin
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/api/forms/test/ics/foobar', status=200)
|
||||
|
||||
# no access
|
||||
app = get_app(pub)
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
|
||||
assert resp.headers['Www-Authenticate']
|
||||
|
||||
# auth but no access
|
||||
app = get_app(pub)
|
||||
app.authorization = ('Basic', ('user', 'password'))
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
|
||||
|
||||
# add authentication info
|
||||
pub.load_site_options()
|
||||
|
@ -2459,6 +2475,9 @@ def test_api_ics_formdata_http_auth(pub, local_user, ics_data):
|
|||
# check access is denied if the user has not the appropriate role
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
||||
|
||||
# check access is denied if the user is not specified
|
||||
resp = app.get('/api/forms/test/ics/foobar', status=403)
|
||||
|
||||
# add proper role to user
|
||||
local_user.roles = [role.id]
|
||||
local_user.store()
|
||||
|
@ -2470,7 +2489,7 @@ def test_api_ics_formdata_http_auth(pub, local_user, ics_data):
|
|||
|
||||
# check it fails with a different password
|
||||
app.authorization = ('Basic', ('user', 'password2'))
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
||||
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
|
||||
|
||||
|
||||
def test_roles(pub, local_user):
|
||||
|
|
|
@ -28,8 +28,8 @@ from django.http import HttpResponse, HttpResponseBadRequest
|
|||
|
||||
from .qommon import _
|
||||
from .qommon import misc
|
||||
from .qommon.errors import (AccessForbiddenError, QueryError, TraversalError,
|
||||
UnknownNameIdAccessForbiddenError, RequestError)
|
||||
from .qommon.errors import (AccessForbiddenError, HttpResponse401Error,
|
||||
QueryError, TraversalError, UnknownNameIdAccessForbiddenError, RequestError)
|
||||
from .qommon.form import ComputedExpressionWidget, ConditionWidget
|
||||
from .qommon.storage import Equal
|
||||
|
||||
|
@ -179,10 +179,10 @@ class ApiFormPage(BackofficeFormPage):
|
|||
if not is_url_signed() or (get_request().user and get_request().user.is_admin):
|
||||
raise AccessForbiddenError('user not authenticated')
|
||||
else:
|
||||
if get_request().user and get_request().user.is_admin:
|
||||
return # grant access to admins, to ease debug
|
||||
api_user = get_user_from_api_query_string(api_name=api_name)
|
||||
if not api_user:
|
||||
if get_request().user and get_request().user.is_admin:
|
||||
return # grant access to admins, to ease debug
|
||||
raise AccessForbiddenError('user not authenticated')
|
||||
if not self.formdef.is_of_concern_for_user(api_user):
|
||||
raise AccessForbiddenError('unsufficient roles')
|
||||
|
|
|
@ -29,7 +29,7 @@ from django.utils.six.moves.urllib import parse as urllib
|
|||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_request, get_publisher
|
||||
from .qommon.errors import (AccessForbiddenError, UnknownNameIdAccessForbiddenError)
|
||||
from .qommon.errors import (AccessForbiddenError, HttpResponse401Error, UnknownNameIdAccessForbiddenError)
|
||||
import qommon.misc
|
||||
|
||||
DEFAULT_DURATION = 30
|
||||
|
@ -102,20 +102,28 @@ def is_url_signed(utcnow=None, duration=DEFAULT_DURATION):
|
|||
return True
|
||||
|
||||
|
||||
def get_user_from_api_query_string(api_name=None):
|
||||
def check_http_basic_auth(api_name):
|
||||
auth_header = get_request().get_header('Authorization', '')
|
||||
if auth_header and api_name:
|
||||
if not auth_header.startswith('Basic '):
|
||||
# we do not handle other authentication schemes
|
||||
raise AccessForbiddenError('unhandled authorization header')
|
||||
auth_header = auth_header.split(' ', 1)[1]
|
||||
if not auth_header.startswith('Basic '):
|
||||
# we do not handle other authentication schemes
|
||||
raise HttpResponse401Error(api_name, 'unhandled authorization header')
|
||||
auth_header = auth_header.split(' ', 1)[1]
|
||||
try:
|
||||
username, password = force_text(base64.decodestring(force_bytes(auth_header))).split(':', 1)
|
||||
configured_password = get_publisher().get_site_option(
|
||||
username, section='api-http-auth-%s' % api_name)
|
||||
if configured_password != password:
|
||||
raise AccessForbiddenError('invalid authorization')
|
||||
elif not is_url_signed():
|
||||
return None
|
||||
except ValueError: # invalid base64 or not enough values to unpack
|
||||
raise HttpResponse401Error(api_name, 'invalid authorization header')
|
||||
configured_password = get_publisher().get_site_option(
|
||||
username, section='api-http-auth-%s' % api_name)
|
||||
if configured_password != password:
|
||||
raise HttpResponse401Error(api_name, 'invalid authorization')
|
||||
|
||||
|
||||
def get_user_from_api_query_string(api_name=None):
|
||||
if not is_url_signed():
|
||||
if api_name:
|
||||
check_http_basic_auth(api_name)
|
||||
else:
|
||||
return None
|
||||
# Signature or auth header are ok.
|
||||
# Look for the user, by email/NameID.
|
||||
user = None
|
||||
|
|
|
@ -2067,7 +2067,9 @@ class FormPage(Directory):
|
|||
raise errors.AccessForbiddenError()
|
||||
charset = get_publisher().site_charset
|
||||
self.check_access('ics')
|
||||
user = get_user_from_api_query_string('ics') or get_request().user
|
||||
user = get_request().user
|
||||
if not (user and user.is_admin):
|
||||
user = get_user_from_api_query_string('ics') or user
|
||||
|
||||
formdef = self.formdef
|
||||
selected_filter = self.get_filter_from_query()
|
||||
|
|
|
@ -59,6 +59,14 @@ class AccessUnauthorizedError(AccessForbiddenError):
|
|||
return quixote.redirect(login_url)
|
||||
|
||||
|
||||
class HttpResponse401Error(AccessError):
|
||||
status_code = 401
|
||||
|
||||
def __init__(self, realm, public_msg=None):
|
||||
self.realm = realm
|
||||
super(HttpResponse401Error, self).__init__(public_msg=public_msg)
|
||||
|
||||
|
||||
class EmailError(Exception):
|
||||
pass
|
||||
|
||||
|
|
|
@ -175,6 +175,9 @@ class QommonPublisher(Publisher, object):
|
|||
request = get_request()
|
||||
original_response = request.response
|
||||
request.response = HTTPResponse(status=exc.status_code)
|
||||
if exc.status_code == 401:
|
||||
# include WWW-Authenticate header
|
||||
request.response.headers['WWW-Authenticate'] = 'Basic realm="%s"' % exc.realm
|
||||
if request.is_json():
|
||||
request.response.set_content_type('application/json')
|
||||
return json.dumps({'err': 1, 'err_class': exc.title, 'err_desc': exc.public_msg})
|
||||
|
|
Loading…
Reference in New Issue