api: accept HTTP Basic authentication scheme for API accesses (#20624)

This commit is contained in:
Frédéric Péters 2021-05-02 17:28:09 +02:00
parent 674ab42b3a
commit d27d92dc4e
6 changed files with 132 additions and 1 deletions

View File

@ -262,6 +262,51 @@ def test_cards_restricted_api(pub, local_user):
assert resp.json['err_desc'] == 'unsufficient roles'
def test_cards_http_auth_access(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
carddef.workflow_roles = {'_viewer': role.id}
carddef.store()
carddef.data_class().wipe()
formdata = carddef.data_class()()
formdata.data = {'0': 'blah'}
formdata.just_created()
formdata.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
app = get_app(pub)
app.set_authorization(('Basic', ('test', '12345')))
# no role restrictions, no admin
resp = app.get('/api/cards/test/list', status=403)
# restricted to the correct role, get it
access.roles = [role]
access.store()
resp = app.get('/api/cards/test/list')
assert len(resp.json['data']) == 1
# restricted to another role, do not get it
role2 = pub.role_class(name='second')
role2.store()
access.roles = [role2]
access.store()
resp = app.get('/api/cards/test/list', status=403)
assert resp.json['err_desc'] == 'unsufficient roles'
def test_post_invalid_json(pub, local_user):
resp = get_app(pub).post(
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400

View File

@ -319,6 +319,58 @@ def test_workflow_trigger_api_access(pub, local_user):
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
def test_workflow_trigger_http_auth_access(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
role2 = pub.role_class(name='xxx2')
role2.store()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
jump.by = [role.id]
workflow.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.roles = [role2]
access.store()
app = get_app(pub)
app.set_authorization(('Basic', ('test', '12345')))
app.post(formdata.get_url() + 'jump/trigger/XXX/', status=403)
assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change
access.roles = [role]
access.store()
app.post(formdata.get_url() + 'jump/trigger/XXX/', headers={'accept': 'application/json'}, status=200)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
def test_workflow_global_webservice_trigger(pub, local_user):
workflow = Workflow(name='test')
workflow.add_status('Status1', 'st1')

View File

@ -16,6 +16,7 @@ from quixote.http_request import Upload as QuixoteUpload
import wcs.qommon.storage as st
from wcs import fields
from wcs.api_access import ApiAccess
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import Category
@ -6227,3 +6228,16 @@ def test_backoffice_table_varname_filter(pub):
resp = resp.forms['listing-settings'].submit()
assert resp.text.count('<tr') == 6
def test_backoffice_http_basic_auth(pub):
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
create_superuser(pub)
app = get_app(pub)
app.set_authorization(('Basic', ('test', '12345')))
app.get('/backoffice/', status=403)

View File

@ -88,9 +88,19 @@ class ApiAccess(XmlStorableObject):
is_admin = False
is_api_user = True
def can_go_in_backoffice(self):
return False
def get_roles(self):
return self.roles
user = RestrictedApiUser()
user.roles = [x.id for x in self.get_roles()]
return user
@classmethod
def get_with_credentials(cls, username, password):
api_access = cls.get_by_identifier(username)
if not api_access or api_access.access_key != password:
raise KeyError
return api_access.get_as_api_user()

View File

@ -123,6 +123,9 @@ def check_http_basic_auth(api_name):
def get_user_from_api_query_string(api_name=None):
# check signature or auth header
if not is_url_signed():
user = getattr(get_request(), 'user', None)
if user and user.is_api_user:
return user
if api_name:
check_http_basic_auth(api_name)
else:

View File

@ -61,12 +61,19 @@ class HTTPRequest(quixote.http_request.HTTPRequest):
# padding or invalid base64-encoded string).
self._user = None
return
from wcs.api_access import ApiAccess
from .ident.password_accounts import PasswordAccount
try:
self._user = PasswordAccount.get_with_credentials(username, password)
except KeyError:
self._user = None
try:
self._user = ApiAccess.get_with_credentials(username, password)
except KeyError:
self._user = None
return
try: