api: add roles-based access restrictions (#48752)

This commit is contained in:
Frédéric Péters 2021-04-17 14:40:28 +02:00
parent 27756287a0
commit 674ab42b3a
7 changed files with 164 additions and 4 deletions

View File

@ -13,6 +13,7 @@ from django.utils.encoding import force_text
from quixote import get_publisher
from wcs import fields, qommon
from wcs.api_access import ApiAccess
from wcs.api_utils import sign_url
from wcs.carddef import CardDef
from wcs.categories import CardDefCategory
@ -218,6 +219,49 @@ def test_cards_import_csv(pub, local_user):
assert resp.json['data']['creation_time'] <= resp.json['data']['completion_time']
def test_cards_restricted_api(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
carddef.workflow_roles = {'_viewer': role.id}
carddef.store()
carddef.data_class().wipe()
formdata = carddef.data_class()()
formdata.data = {'0': 'blah'}
formdata.just_created()
formdata.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
# no role restrictions, get it
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
assert len(resp.json['data']) == 1
# restricted to the correct role, get it
access.roles = [role]
access.store()
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
assert len(resp.json['data']) == 1
# restricted to another role, do not get it
role2 = pub.role_class(name='second')
role2.store()
access.roles = [role2]
access.store()
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'), status=403)
assert resp.json['err_desc'] == 'unsufficient roles'
def test_post_invalid_json(pub, local_user):
resp = get_app(pub).post(
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400

View File

@ -8,6 +8,7 @@ from quixote import get_publisher
from wcs import fields
from wcs.admin.settings import UserFieldsFormDef
from wcs.api_access import ApiAccess
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
@ -335,6 +336,37 @@ def test_user_forms(pub, local_user):
assert resp2.json['data'][0] == resp.json['data'][1]
assert resp2.json['data'][1] == resp.json['data'][0]
# check there is no access with roles-limited API users
role = pub.role_class(name='test')
role.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.roles = [role]
access.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', orig='test', key='12345'), status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'restricted API access'
def test_user_api_with_restricted_access(pub):
role = pub.role_class(name='test')
role.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.roles = [role]
access.store()
resp = get_app(pub).get(sign_uri('/api/user/', orig='test', key='12345'), status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'restricted API access'
def test_user_forms_limit_offset(pub, local_user):
if not pub.is_using_postgresql():

View File

@ -6,6 +6,7 @@ import pytest
from quixote import get_publisher
from wcs import fields
from wcs.api_access import ApiAccess
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
@ -101,6 +102,7 @@ def test_workflow_trigger(pub, local_user):
get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=200)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
# check with trailing slash
formdata.store() # reset
@ -263,6 +265,60 @@ def test_workflow_trigger_jump_once(pub, local_user):
assert formdef.data_class().get(formdata.id).status == 'wf-st3'
def test_workflow_trigger_api_access(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
role2 = pub.role_class(name='xxx2')
role2.store()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
jump.by = [role.id]
workflow.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.roles = [role2]
access.store()
get_app(pub).post(
sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=403
)
assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change
access.roles = [role]
access.store()
get_app(pub).post(
sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=200
)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
assert formdef.data_class().get(formdata.id).evolution[-1].who is None
def test_workflow_global_webservice_trigger(pub, local_user):
workflow = Workflow(name='test')
workflow.add_status('Status1', 'st1')

View File

@ -313,7 +313,7 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage):
)
if formdata_user:
formdata.user_id = formdata_user[0].id
else:
elif user and not user.is_api_user:
formdata.user_id = user.id
formdata.store()
@ -557,8 +557,9 @@ class ApiFormdefDirectory(Directory):
)
if formdata_user:
formdata.user_id = formdata_user[0].id
elif user:
elif user and not user.is_api_user:
formdata.user_id = user.id
if json_input.get('context'):
formdata.submission_context = json_input['context']
formdata.submission_channel = formdata.submission_context.pop('channel', None)
@ -838,6 +839,8 @@ class ApiUserDirectory(Directory):
user = self.user or get_user_from_api_query_string() or get_request().user
if not user:
raise AccessForbiddenError('no user specified')
if user.is_api_user:
raise AccessForbiddenError('restricted API access')
user_info = user.get_substitution_variables(prefix='')
del user_info['user']
user_info['id'] = user.id
@ -896,6 +899,8 @@ class ApiUserDirectory(Directory):
return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []})
if not user:
return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []})
if user.is_api_user:
raise AccessForbiddenError('restricted API access')
forms = self.get_user_forms(user)

View File

@ -80,3 +80,17 @@ class ApiAccess(XmlStorableObject):
if role_name:
criterias.append(Equal('name', role_name))
return get_publisher().role_class.select([Or(criterias)], order_by='name')
def get_as_api_user(self):
class RestrictedApiUser:
# kept as inner class so cannot be pickled
id = Ellipsis # make sure it fails all over the place if used
is_admin = False
is_api_user = True
def get_roles(self):
return self.roles
user = RestrictedApiUser()
user.roles = [x.id for x in self.get_roles()]
return user

View File

@ -121,13 +121,21 @@ def check_http_basic_auth(api_name):
def get_user_from_api_query_string(api_name=None):
# check signature or auth header
if not is_url_signed():
if api_name:
check_http_basic_auth(api_name)
else:
return None
# Signature or auth header are ok.
# Look for the user, by email/NameID.
# check access restriction defined in API access object
orig = get_request().form.get('orig')
if orig:
api_access = ApiAccess.get_by_identifier(orig)
if api_access and api_access.get_roles():
return api_access.get_as_api_user()
# get user reference from query string
user = None
if get_request().form.get('email'):
email = get_request().form.get('email')

View File

@ -43,6 +43,7 @@ class User(StorableObject):
deleted_timestamp = None
last_seen = None
is_api_user = False
default_search_result_template = """{{ user_email|default:"" }}
{% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %}