api: accept HTTP Basic authentication scheme for API accesses (#20624)
This commit is contained in:
parent
674ab42b3a
commit
d27d92dc4e
|
@ -262,6 +262,51 @@ def test_cards_restricted_api(pub, local_user):
|
|||
assert resp.json['err_desc'] == 'unsufficient roles'
|
||||
|
||||
|
||||
def test_cards_http_auth_access(pub, local_user):
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
||||
CardDef.wipe()
|
||||
carddef = CardDef()
|
||||
carddef.name = 'test'
|
||||
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
|
||||
carddef.workflow_roles = {'_viewer': role.id}
|
||||
carddef.store()
|
||||
|
||||
carddef.data_class().wipe()
|
||||
formdata = carddef.data_class()()
|
||||
formdata.data = {'0': 'blah'}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
access = ApiAccess()
|
||||
access.name = 'test'
|
||||
access.access_identifier = 'test'
|
||||
access.access_key = '12345'
|
||||
access.store()
|
||||
|
||||
app = get_app(pub)
|
||||
app.set_authorization(('Basic', ('test', '12345')))
|
||||
|
||||
# no role restrictions, no admin
|
||||
resp = app.get('/api/cards/test/list', status=403)
|
||||
|
||||
# restricted to the correct role, get it
|
||||
access.roles = [role]
|
||||
access.store()
|
||||
resp = app.get('/api/cards/test/list')
|
||||
assert len(resp.json['data']) == 1
|
||||
|
||||
# restricted to another role, do not get it
|
||||
role2 = pub.role_class(name='second')
|
||||
role2.store()
|
||||
access.roles = [role2]
|
||||
access.store()
|
||||
resp = app.get('/api/cards/test/list', status=403)
|
||||
assert resp.json['err_desc'] == 'unsufficient roles'
|
||||
|
||||
|
||||
def test_post_invalid_json(pub, local_user):
|
||||
resp = get_app(pub).post(
|
||||
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400
|
||||
|
|
|
@ -319,6 +319,58 @@ def test_workflow_trigger_api_access(pub, local_user):
|
|||
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
|
||||
|
||||
|
||||
def test_workflow_trigger_http_auth_access(pub, local_user):
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='xxx')
|
||||
role.store()
|
||||
role2 = pub.role_class(name='xxx2')
|
||||
role2.store()
|
||||
|
||||
workflow = Workflow(name='test')
|
||||
st1 = workflow.add_status('Status1', 'st1')
|
||||
jump = JumpWorkflowStatusItem()
|
||||
jump.trigger = 'XXX'
|
||||
jump.status = 'st2'
|
||||
st1.items.append(jump)
|
||||
jump.parent = st1
|
||||
workflow.add_status('Status2', 'st2')
|
||||
workflow.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.fields = []
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdef.data_class().wipe()
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
jump.by = [role.id]
|
||||
workflow.store()
|
||||
|
||||
access = ApiAccess()
|
||||
access.name = 'test'
|
||||
access.access_identifier = 'test'
|
||||
access.access_key = '12345'
|
||||
access.roles = [role2]
|
||||
access.store()
|
||||
|
||||
app = get_app(pub)
|
||||
app.set_authorization(('Basic', ('test', '12345')))
|
||||
app.post(formdata.get_url() + 'jump/trigger/XXX/', status=403)
|
||||
assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change
|
||||
|
||||
access.roles = [role]
|
||||
access.store()
|
||||
|
||||
app.post(formdata.get_url() + 'jump/trigger/XXX/', headers={'accept': 'application/json'}, status=200)
|
||||
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
|
||||
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
|
||||
|
||||
|
||||
def test_workflow_global_webservice_trigger(pub, local_user):
|
||||
workflow = Workflow(name='test')
|
||||
workflow.add_status('Status1', 'st1')
|
||||
|
|
|
@ -16,6 +16,7 @@ from quixote.http_request import Upload as QuixoteUpload
|
|||
|
||||
import wcs.qommon.storage as st
|
||||
from wcs import fields
|
||||
from wcs.api_access import ApiAccess
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.categories import Category
|
||||
|
@ -6227,3 +6228,16 @@ def test_backoffice_table_varname_filter(pub):
|
|||
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert resp.text.count('<tr') == 6
|
||||
|
||||
|
||||
def test_backoffice_http_basic_auth(pub):
|
||||
access = ApiAccess()
|
||||
access.name = 'test'
|
||||
access.access_identifier = 'test'
|
||||
access.access_key = '12345'
|
||||
access.store()
|
||||
|
||||
create_superuser(pub)
|
||||
app = get_app(pub)
|
||||
app.set_authorization(('Basic', ('test', '12345')))
|
||||
app.get('/backoffice/', status=403)
|
||||
|
|
|
@ -88,9 +88,19 @@ class ApiAccess(XmlStorableObject):
|
|||
is_admin = False
|
||||
is_api_user = True
|
||||
|
||||
def can_go_in_backoffice(self):
|
||||
return False
|
||||
|
||||
def get_roles(self):
|
||||
return self.roles
|
||||
|
||||
user = RestrictedApiUser()
|
||||
user.roles = [x.id for x in self.get_roles()]
|
||||
return user
|
||||
|
||||
@classmethod
|
||||
def get_with_credentials(cls, username, password):
|
||||
api_access = cls.get_by_identifier(username)
|
||||
if not api_access or api_access.access_key != password:
|
||||
raise KeyError
|
||||
return api_access.get_as_api_user()
|
||||
|
|
|
@ -123,6 +123,9 @@ def check_http_basic_auth(api_name):
|
|||
def get_user_from_api_query_string(api_name=None):
|
||||
# check signature or auth header
|
||||
if not is_url_signed():
|
||||
user = getattr(get_request(), 'user', None)
|
||||
if user and user.is_api_user:
|
||||
return user
|
||||
if api_name:
|
||||
check_http_basic_auth(api_name)
|
||||
else:
|
||||
|
|
|
@ -61,12 +61,19 @@ class HTTPRequest(quixote.http_request.HTTPRequest):
|
|||
# padding or invalid base64-encoded string).
|
||||
self._user = None
|
||||
return
|
||||
|
||||
from wcs.api_access import ApiAccess
|
||||
|
||||
from .ident.password_accounts import PasswordAccount
|
||||
|
||||
try:
|
||||
self._user = PasswordAccount.get_with_credentials(username, password)
|
||||
except KeyError:
|
||||
self._user = None
|
||||
try:
|
||||
self._user = ApiAccess.get_with_credentials(username, password)
|
||||
except KeyError:
|
||||
self._user = None
|
||||
|
||||
return
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue