api: limit /schema API to signed or authenticated calls (#47418)

This commit is contained in:
Frédéric Péters 2021-10-23 11:02:30 +02:00
parent 366e1166ef
commit 661ea424c3
8 changed files with 82 additions and 57 deletions

View File

@ -19,12 +19,6 @@ w.c.s expose une API permettant aux logiciels tiers de connaître les différent
formulaires et leurs schémas de données.
</p>
<note><p>Toutes ces URL sont conformes à la spécification de remontée dinformation du
<em>Portail citoyen</em>, acceptent ainsi un paramètre <code>email</code> ou
<code>NameID</code>, et nécessitent alors un paramètre <code>orig</code>.
</p></note>
<section id="forms">
<title>Formulaires</title>
@ -176,8 +170,9 @@ La liste des rôles est disponible à lURL <code>/api/roles</code>.
<p>
Le schéma de données dun formulaire est accessible à ladresse
<code>/api/formdefs/<em>slug</em>/schema</code>.
<code>/api/formdefs/<em>slug</em>/schema</code>; lappel doit obligatoirement
être signé ou réalisé avec un accès disposant des rôles de gestion sur le
formulaire.
</p>
<code mime="application/json">

View File

@ -219,6 +219,8 @@ def test_cards(pub, local_user):
assert resp.json['fields'][0]['label'] == 'foobar'
assert resp.json['fields'][0]['varname'] == 'foo'
resp = get_app(pub).get('/api/cards/test/@schema', status=403)
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_cards_import_csv(pub, local_user, auth):

View File

@ -402,7 +402,7 @@ def test_backoffice_submission_formdef_list(pub, local_user, access, auth):
assert len(resp.json['data']) == 0
def test_formdef_schema(pub):
def test_formdef_schema(pub, access):
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
@ -420,6 +420,11 @@ def test_formdef_schema(pub):
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
]
workflow.store()
Category.wipe()
cat = Category(name='Bar')
cat.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
@ -445,6 +450,7 @@ def test_formdef_schema(pub):
),
]
formdef.category_id = cat.id
formdef.workflow_id = workflow.id
formdef.store()
@ -455,13 +461,31 @@ def test_formdef_schema(pub):
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
)
resp = get_app(pub).get('/api/formdefs/test/schema')
resp2 = get_app(pub).get('/test/schema')
resp3 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
resp4 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
# fails for unauthenticated users
get_app(pub).get('/api/formdefs/test/schema', status=403)
get_app(pub).get('/test/schema', status=403)
# always ok for signed requests
resp = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
# ok for basic auth only if attached role is appropriate for management
get_url = partial(_get_url, app=get_app(pub), access=access, auth='http-basic', user=None)
get_url('/api/formdefs/test/schema', status=403)
role = pub.role_class(name='Foo bar')
role.store()
access.roles = [role]
access.store()
get_url('/api/formdefs/test/schema', status=403)
cat.management_roles = [role]
cat.store()
assert get_url('/api/formdefs/test/schema').json == resp.json
# resp2 won't have detailed list items
resp2 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
# check schema
assert resp.json == resp2.json
assert set(resp.json.keys()) >= {
'enable_tracking_codes',
'url_name',
@ -489,29 +513,24 @@ def test_formdef_schema(pub):
assert resp.json['fields'][1]['label'] == 'foobar1'
assert resp.json['fields'][1]['type'] == 'item'
# check structured items are only exported for authenticated callers
assert resp.json['fields'][1]['items'] == []
assert resp.json['fields'][2]['items'] == []
assert 'structured_items' not in resp.json['fields'][1]
assert 'structured_items' not in resp.json['fields'][2]
# check structured items
assert len(resp.json['fields'][1]['structured_items']) == 3
assert resp.json['fields'][1]['structured_items'][0]['id'] == 0
assert resp.json['fields'][1]['structured_items'][0]['text'] == 'zéro'
assert resp.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
assert resp.json['fields'][1]['items'][0] == 'zéro'
assert len(resp3.json['fields'][1]['structured_items']) == 3
assert resp3.json['fields'][1]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][1]['structured_items'][0]['text'] == 'zéro'
assert resp3.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
assert resp3.json['fields'][1]['items'][0] == 'zéro'
assert resp3.json['fields'][2]['label'] == 'foobar2'
assert resp3.json['fields'][2]['type'] == 'items'
assert len(resp3.json['fields'][2]['structured_items']) == 10
assert resp3.json['fields'][2]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
assert resp3.json['fields'][2]['structured_items'][0]['foo'] == 0
assert resp3.json['fields'][2]['items'][0] == 'label 0'
assert resp.json['fields'][2]['label'] == 'foobar2'
assert resp.json['fields'][2]['type'] == 'items'
assert len(resp.json['fields'][2]['structured_items']) == 10
assert resp.json['fields'][2]['structured_items'][0]['id'] == 0
assert resp.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
assert resp.json['fields'][2]['structured_items'][0]['foo'] == 0
assert resp.json['fields'][2]['items'][0] == 'label 0'
# if structured_items fails no values
assert 'structured_items' not in resp4.json['fields'][1]
assert resp4.json['fields'][1]['items'] == []
assert 'structured_items' not in resp2.json['fields'][1]
assert resp2.json['fields'][1]['items'] == []
# workflow checks
assert len(resp.json['workflow']['statuses']) == 3

View File

@ -1842,16 +1842,8 @@ class FormsDirectory(AccessControlled, Directory):
def _q_lookup(self, component):
directory = self.formdef_page_class(component)
global_access = is_global_accessible(self.section)
if not global_access:
user_roles = set(get_request().user.get_roles())
management_roles = set()
if directory.formdef.category:
management_roles = {
x.id for x in getattr(directory.formdef.category, 'management_roles') or []
}
if not management_roles.intersection(user_roles):
raise AccessForbiddenError()
if not directory.formdef.is_managed_by(get_request().user):
raise AccessForbiddenError()
return directory
def p_import(self):

View File

@ -271,8 +271,10 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage):
return super().check_access(api_name=api_name)
def schema(self):
get_response().set_content_type('application/json')
return self.formdef.export_to_json(anonymise=not is_url_signed())
if is_url_signed() or self.formdef.is_managed_by(get_user_from_api_query_string()):
get_response().set_content_type('application/json')
return self.formdef.export_to_json()
raise AccessForbiddenError()
def submit(self):
get_response().set_content_type('application/json')
@ -507,8 +509,10 @@ class ApiFormdefDirectory(Directory):
self.formdef = formdef
def schema(self):
get_response().set_content_type('application/json')
return self.formdef.export_to_json(anonymise=not is_url_signed())
if is_url_signed() or self.formdef.is_managed_by(get_user_from_api_query_string()):
get_response().set_content_type('application/json')
return self.formdef.export_to_json()
raise AccessForbiddenError()
def submit(self):
# expects json as input

View File

@ -108,6 +108,8 @@ class RootDirectory(BackofficeRootDirectory):
def is_global_accessible(cls, subdirectory):
if cls.check_admin_for_all():
return True
if not get_request().user:
return False
user_roles = set(get_request().user.get_roles())
authorised_roles = set(get_cfg('admin-permissions', {}).get(subdirectory) or [])
if authorised_roles:

View File

@ -297,7 +297,7 @@ class Field:
def get_admin_attributes(self):
return ['label', 'type', 'condition']
def export_to_json(self, include_id=False, anonymise=True):
def export_to_json(self, include_id=False):
field = {}
if include_id:
extra_fields = ['id']
@ -2175,9 +2175,9 @@ class ItemField(WidgetField, MapOptionsMixin, ItemFieldMixin):
values.append(display_value)
return values
def export_to_json(self, include_id=False, anonymise=True):
field = super().export_to_json(include_id=include_id, anonymise=anonymise)
if self.data_source and not anonymise:
def export_to_json(self, include_id=False):
field = super().export_to_json(include_id=include_id)
if self.data_source:
structured_items = data_sources.get_structured_items(self.data_source)
if structured_items:
field['structured_items'] = structured_items
@ -2407,9 +2407,9 @@ class ItemsField(WidgetField, ItemFieldMixin):
structured_value.append(structured_option)
return structured_value
def export_to_json(self, include_id=False, anonymise=True):
field = super().export_to_json(include_id=include_id, anonymise=True)
if self.data_source and not anonymise:
def export_to_json(self, include_id=False):
field = super().export_to_json(include_id=include_id)
if self.data_source:
structured_items = data_sources.get_structured_items(self.data_source)
if structured_items:
field['structured_items'] = structured_items

View File

@ -383,6 +383,17 @@ class FormDef(StorableObject):
return sql.get_formdef_new_id(id_start=id_start)
def is_managed_by(self, user):
if get_publisher().get_backoffice_root().is_global_accessible(self.backoffice_section):
return True
if not user:
return False
if not self.category_id:
return False
management_roles = {x.id for x in getattr(self.category, 'management_roles') or []}
user_roles = set(user.get_roles())
return management_roles.intersection(user_roles)
@classmethod
def wipe(cls):
super().wipe()
@ -824,7 +835,7 @@ class FormDef(StorableObject):
d.update(self.get_field_data(field, widget))
return d
def export_to_json(self, include_id=False, indent=None, anonymise=True):
def export_to_json(self, include_id=False, indent=None):
charset = get_publisher().site_charset
root = {}
root['name'] = force_text(self.name, charset)
@ -853,7 +864,7 @@ class FormDef(StorableObject):
root['fields'] = []
if self.fields:
for field in self.fields:
root['fields'].append(field.export_to_json(include_id=include_id, anonymise=anonymise))
root['fields'].append(field.export_to_json(include_id=include_id))
if self.geolocations:
root['geolocations'] = self.geolocations.copy()