From 674ab42b3a4de3bcc790e5d71f5904f4259d7d90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20P=C3=A9ters?= Date: Sat, 17 Apr 2021 14:40:28 +0200 Subject: [PATCH] api: add roles-based access restrictions (#48752) --- tests/api/test_carddef.py | 44 ++++++++++++++++++++++++++++++ tests/api/test_user.py | 32 ++++++++++++++++++++++ tests/api/test_workflow.py | 56 ++++++++++++++++++++++++++++++++++++++ wcs/api.py | 9 ++++-- wcs/api_access.py | 14 ++++++++++ wcs/api_utils.py | 12 ++++++-- wcs/users.py | 1 + 7 files changed, 164 insertions(+), 4 deletions(-) diff --git a/tests/api/test_carddef.py b/tests/api/test_carddef.py index c29494448..8731e6f25 100644 --- a/tests/api/test_carddef.py +++ b/tests/api/test_carddef.py @@ -13,6 +13,7 @@ from django.utils.encoding import force_text from quixote import get_publisher from wcs import fields, qommon +from wcs.api_access import ApiAccess from wcs.api_utils import sign_url from wcs.carddef import CardDef from wcs.categories import CardDefCategory @@ -218,6 +219,49 @@ def test_cards_import_csv(pub, local_user): assert resp.json['data']['creation_time'] <= resp.json['data']['completion_time'] +def test_cards_restricted_api(pub, local_user): + pub.role_class.wipe() + role = pub.role_class(name='test') + role.store() + + CardDef.wipe() + carddef = CardDef() + carddef.name = 'test' + carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')] + carddef.workflow_roles = {'_viewer': role.id} + carddef.store() + + carddef.data_class().wipe() + formdata = carddef.data_class()() + formdata.data = {'0': 'blah'} + formdata.just_created() + formdata.store() + + access = ApiAccess() + access.name = 'test' + access.access_identifier = 'test' + access.access_key = '12345' + access.store() + + # no role restrictions, get it + resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345')) + assert len(resp.json['data']) == 1 + + # restricted to the correct role, get it + access.roles = [role] + access.store() + resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345')) + assert len(resp.json['data']) == 1 + + # restricted to another role, do not get it + role2 = pub.role_class(name='second') + role2.store() + access.roles = [role2] + access.store() + resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'), status=403) + assert resp.json['err_desc'] == 'unsufficient roles' + + def test_post_invalid_json(pub, local_user): resp = get_app(pub).post( '/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400 diff --git a/tests/api/test_user.py b/tests/api/test_user.py index bf299fe74..dd03335a4 100644 --- a/tests/api/test_user.py +++ b/tests/api/test_user.py @@ -8,6 +8,7 @@ from quixote import get_publisher from wcs import fields from wcs.admin.settings import UserFieldsFormDef +from wcs.api_access import ApiAccess from wcs.categories import Category from wcs.formdef import FormDef from wcs.qommon.http_request import HTTPRequest @@ -335,6 +336,37 @@ def test_user_forms(pub, local_user): assert resp2.json['data'][0] == resp.json['data'][1] assert resp2.json['data'][1] == resp.json['data'][0] + # check there is no access with roles-limited API users + role = pub.role_class(name='test') + role.store() + + access = ApiAccess() + access.name = 'test' + access.access_identifier = 'test' + access.access_key = '12345' + access.roles = [role] + access.store() + + resp = get_app(pub).get(sign_uri('/api/user/forms', orig='test', key='12345'), status=403) + assert resp.json['err'] == 1 + assert resp.json['err_desc'] == 'restricted API access' + + +def test_user_api_with_restricted_access(pub): + role = pub.role_class(name='test') + role.store() + + access = ApiAccess() + access.name = 'test' + access.access_identifier = 'test' + access.access_key = '12345' + access.roles = [role] + access.store() + + resp = get_app(pub).get(sign_uri('/api/user/', orig='test', key='12345'), status=403) + assert resp.json['err'] == 1 + assert resp.json['err_desc'] == 'restricted API access' + def test_user_forms_limit_offset(pub, local_user): if not pub.is_using_postgresql(): diff --git a/tests/api/test_workflow.py b/tests/api/test_workflow.py index cc52c8f0e..0b3904af0 100644 --- a/tests/api/test_workflow.py +++ b/tests/api/test_workflow.py @@ -6,6 +6,7 @@ import pytest from quixote import get_publisher from wcs import fields +from wcs.api_access import ApiAccess from wcs.formdef import FormDef from wcs.qommon.http_request import HTTPRequest from wcs.qommon.ident.password_accounts import PasswordAccount @@ -101,6 +102,7 @@ def test_workflow_trigger(pub, local_user): get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=200) assert formdef.data_class().get(formdata.id).status == 'wf-st2' + assert formdef.data_class().get(formdata.id).evolution[-1].who is None # check with trailing slash formdata.store() # reset @@ -263,6 +265,60 @@ def test_workflow_trigger_jump_once(pub, local_user): assert formdef.data_class().get(formdata.id).status == 'wf-st3' +def test_workflow_trigger_api_access(pub, local_user): + pub.role_class.wipe() + role = pub.role_class(name='xxx') + role.store() + role2 = pub.role_class(name='xxx2') + role2.store() + + workflow = Workflow(name='test') + st1 = workflow.add_status('Status1', 'st1') + jump = JumpWorkflowStatusItem() + jump.trigger = 'XXX' + jump.status = 'st2' + st1.items.append(jump) + jump.parent = st1 + workflow.add_status('Status2', 'st2') + workflow.store() + + FormDef.wipe() + formdef = FormDef() + formdef.name = 'test' + formdef.fields = [] + formdef.workflow_id = workflow.id + formdef.store() + + formdef.data_class().wipe() + formdata = formdef.data_class()() + formdata.just_created() + formdata.store() + + jump.by = [role.id] + workflow.store() + + access = ApiAccess() + access.name = 'test' + access.access_identifier = 'test' + access.access_key = '12345' + access.roles = [role2] + access.store() + + get_app(pub).post( + sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=403 + ) + assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change + + access.roles = [role] + access.store() + + get_app(pub).post( + sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=200 + ) + assert formdef.data_class().get(formdata.id).status == 'wf-st2' + assert formdef.data_class().get(formdata.id).evolution[-1].who is None + + def test_workflow_global_webservice_trigger(pub, local_user): workflow = Workflow(name='test') workflow.add_status('Status1', 'st1') diff --git a/wcs/api.py b/wcs/api.py index b27e01352..b6e03d206 100644 --- a/wcs/api.py +++ b/wcs/api.py @@ -313,7 +313,7 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage): ) if formdata_user: formdata.user_id = formdata_user[0].id - else: + elif user and not user.is_api_user: formdata.user_id = user.id formdata.store() @@ -557,8 +557,9 @@ class ApiFormdefDirectory(Directory): ) if formdata_user: formdata.user_id = formdata_user[0].id - elif user: + elif user and not user.is_api_user: formdata.user_id = user.id + if json_input.get('context'): formdata.submission_context = json_input['context'] formdata.submission_channel = formdata.submission_context.pop('channel', None) @@ -838,6 +839,8 @@ class ApiUserDirectory(Directory): user = self.user or get_user_from_api_query_string() or get_request().user if not user: raise AccessForbiddenError('no user specified') + if user.is_api_user: + raise AccessForbiddenError('restricted API access') user_info = user.get_substitution_variables(prefix='') del user_info['user'] user_info['id'] = user.id @@ -896,6 +899,8 @@ class ApiUserDirectory(Directory): return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []}) if not user: return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []}) + if user.is_api_user: + raise AccessForbiddenError('restricted API access') forms = self.get_user_forms(user) diff --git a/wcs/api_access.py b/wcs/api_access.py index c6713bcde..fd2e19855 100644 --- a/wcs/api_access.py +++ b/wcs/api_access.py @@ -80,3 +80,17 @@ class ApiAccess(XmlStorableObject): if role_name: criterias.append(Equal('name', role_name)) return get_publisher().role_class.select([Or(criterias)], order_by='name') + + def get_as_api_user(self): + class RestrictedApiUser: + # kept as inner class so cannot be pickled + id = Ellipsis # make sure it fails all over the place if used + is_admin = False + is_api_user = True + + def get_roles(self): + return self.roles + + user = RestrictedApiUser() + user.roles = [x.id for x in self.get_roles()] + return user diff --git a/wcs/api_utils.py b/wcs/api_utils.py index 01978c434..44b14bc03 100644 --- a/wcs/api_utils.py +++ b/wcs/api_utils.py @@ -121,13 +121,21 @@ def check_http_basic_auth(api_name): def get_user_from_api_query_string(api_name=None): + # check signature or auth header if not is_url_signed(): if api_name: check_http_basic_auth(api_name) else: return None - # Signature or auth header are ok. - # Look for the user, by email/NameID. + + # check access restriction defined in API access object + orig = get_request().form.get('orig') + if orig: + api_access = ApiAccess.get_by_identifier(orig) + if api_access and api_access.get_roles(): + return api_access.get_as_api_user() + + # get user reference from query string user = None if get_request().form.get('email'): email = get_request().form.get('email') diff --git a/wcs/users.py b/wcs/users.py index 587a4dedf..b8ca643c9 100644 --- a/wcs/users.py +++ b/wcs/users.py @@ -43,6 +43,7 @@ class User(StorableObject): deleted_timestamp = None last_seen = None + is_api_user = False default_search_result_template = """{{ user_email|default:"" }} {% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %}