api: add option to restrict access to anonymised data (#52960)
This commit is contained in:
parent
28a9f746c3
commit
7e11c14272
|
@ -14,6 +14,7 @@ from django.utils.encoding import force_bytes
|
|||
from quixote import get_publisher
|
||||
|
||||
from wcs import fields
|
||||
from wcs.api_access import ApiAccess
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.formdata import Evolution
|
||||
|
@ -730,6 +731,72 @@ def test_api_anonymized_formdata(pub, local_user, admin_user):
|
|||
assert 'name' in resp.json['evolution'][1]['who']
|
||||
|
||||
|
||||
def test_api_access_restrict_to_anonymised_data(pub, local_user):
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
||||
local_user.roles = [role.id]
|
||||
local_user.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.workflow_roles = {'_receiver': role.id}
|
||||
formdef.fields = [
|
||||
fields.StringField(id='1', label='foobar', varname='foobar'),
|
||||
fields.StringField(id='2', label='foobar2', varname='foobar2', anonymise=False),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
data_class = formdef.data_class()
|
||||
data_class.wipe()
|
||||
|
||||
for _i in range(10):
|
||||
formdata = data_class()
|
||||
formdata.data = {'1': 'FOO BAR1', '2': 'FOO BAR 2'}
|
||||
formdata.user_id = local_user.id
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
# check normal API behaviour: get all data
|
||||
access = ApiAccess()
|
||||
access.name = 'test'
|
||||
access.access_identifier = 'test'
|
||||
access.access_key = '12345'
|
||||
access.store()
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri(
|
||||
'/api/forms/test/list?full=on',
|
||||
user=local_user,
|
||||
orig=access.access_identifier,
|
||||
key=access.access_key,
|
||||
)
|
||||
)
|
||||
assert len(resp.json) == 10
|
||||
assert resp.json[0]['fields']['foobar'] == 'FOO BAR1'
|
||||
assert resp.json[0]['fields']['foobar2'] == 'FOO BAR 2'
|
||||
assert resp.json[0].get('user')
|
||||
|
||||
# restrict API access to anonymised data
|
||||
access.restrict_to_anonymised_data = True
|
||||
access.store()
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri(
|
||||
'/api/forms/test/list?full=on',
|
||||
user=local_user,
|
||||
orig=access.access_identifier,
|
||||
key=access.access_key,
|
||||
)
|
||||
)
|
||||
assert len(resp.json) == 10
|
||||
assert 'foobar' not in resp.json[0]['fields']
|
||||
assert resp.json[0]['fields']['foobar2'] == 'FOO BAR 2'
|
||||
assert not resp.json[0].get('user')
|
||||
|
||||
|
||||
def test_api_geojson_formdata(pub, local_user):
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
|
|
|
@ -9,17 +9,17 @@ import urllib.parse
|
|||
from django.utils.encoding import force_bytes
|
||||
|
||||
|
||||
def sign_uri(uri, user=None, format='json'):
|
||||
def sign_uri(uri, user=None, format='json', orig='coucou', key='1234'):
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(uri)
|
||||
if query:
|
||||
query += '&'
|
||||
if format:
|
||||
query += 'format=%s&' % format
|
||||
query += 'orig=coucou&algo=sha256×tamp=' + timestamp
|
||||
query += 'orig=%s&algo=sha256×tamp=%s' % (orig, timestamp)
|
||||
if user:
|
||||
query += '&email=' + urllib.parse.quote(user.email)
|
||||
query += '&signature=%s' % urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
|
||||
base64.b64encode(hmac.new(force_bytes(key), force_bytes(query), hashlib.sha256).digest())
|
||||
)
|
||||
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
||||
|
|
|
@ -23,7 +23,7 @@ from quixote.html import TemplateIO, htmltext
|
|||
from wcs.api_access import ApiAccess
|
||||
from wcs.qommon import _, errors, template
|
||||
from wcs.qommon.backoffice.menu import html_top
|
||||
from wcs.qommon.form import Form, HtmlWidget, StringWidget, TextWidget
|
||||
from wcs.qommon.form import CheckboxWidget, Form, HtmlWidget, StringWidget, TextWidget
|
||||
|
||||
|
||||
class ApiAccessUI:
|
||||
|
@ -59,6 +59,12 @@ class ApiAccessUI:
|
|||
size=30,
|
||||
value=self.api_access.access_key or str(uuid.uuid4()),
|
||||
)
|
||||
form.add(
|
||||
CheckboxWidget,
|
||||
'restrict_to_anonymised_data',
|
||||
title=_('Restrict to anonymised data'),
|
||||
value=self.api_access.restrict_to_anonymised_data,
|
||||
)
|
||||
if not self.api_access.is_readonly():
|
||||
form.add_submit('submit', _('Submit'))
|
||||
form.add_submit('cancel', _('Cancel'))
|
||||
|
@ -79,9 +85,9 @@ class ApiAccessUI:
|
|||
raise ValueError()
|
||||
|
||||
self.api_access.name = name
|
||||
self.api_access.description = form.get_widget('description').parse()
|
||||
self.api_access.access_identifier = access_identifier
|
||||
self.api_access.access_key = form.get_widget('access_key').parse()
|
||||
for attribute in ('description', 'access_key', 'restrict_to_anonymised_data'):
|
||||
setattr(self.api_access, attribute, form.get_widget(attribute).parse())
|
||||
self.api_access.store()
|
||||
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ class ApiFormPageMixin:
|
|||
raise TraversalError()
|
||||
|
||||
def check_access(self, api_name=None):
|
||||
if 'anonymise' in get_request().form:
|
||||
if get_request().has_anonymised_data_api_restriction():
|
||||
if not is_url_signed() or (get_request().user and get_request().user.is_admin):
|
||||
raise AccessForbiddenError('user not authenticated')
|
||||
else:
|
||||
|
|
|
@ -24,6 +24,7 @@ class ApiAccess(XmlStorableObject):
|
|||
access_identifier = None
|
||||
access_key = None
|
||||
description = None
|
||||
restrict_to_anonymised_data = False
|
||||
|
||||
# declarations for serialization
|
||||
XML_NODES = [
|
||||
|
@ -31,11 +32,19 @@ class ApiAccess(XmlStorableObject):
|
|||
('description', 'str'),
|
||||
('access_identifier', 'str'),
|
||||
('access_key', 'str'),
|
||||
('restrict_to_anonymised_data', 'bool'),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_access_key(cls, access_identifier):
|
||||
def get_by_identifier(cls, access_identifier):
|
||||
for api_access in cls.select():
|
||||
if api_access.access_identifier == access_identifier:
|
||||
return api_access.access_key
|
||||
return api_access
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_access_key(cls, access_identifier):
|
||||
api_access = cls.get_by_identifier(access_identifier)
|
||||
if api_access:
|
||||
return api_access.access_key
|
||||
return None
|
||||
|
|
|
@ -2184,7 +2184,7 @@ class FormPage(Directory):
|
|||
|
||||
def ods(self):
|
||||
self.check_access()
|
||||
if 'anonymise' in get_request().form:
|
||||
if get_request().has_anonymised_data_api_restriction():
|
||||
# api/ will let this pass but we don't want that.
|
||||
raise errors.AccessForbiddenError()
|
||||
fields = self.get_fields_from_query()
|
||||
|
@ -2219,7 +2219,7 @@ class FormPage(Directory):
|
|||
return job.file_content
|
||||
|
||||
def json(self):
|
||||
anonymise = 'anonymise' in get_request().form
|
||||
anonymise = get_request().has_anonymised_data_api_restriction()
|
||||
self.check_access()
|
||||
get_response().set_content_type('application/json')
|
||||
user = get_user_from_api_query_string() or get_request().user if not anonymise else None
|
||||
|
@ -2276,7 +2276,7 @@ class FormPage(Directory):
|
|||
def geojson(self):
|
||||
if not self.formdef.geolocations:
|
||||
raise errors.TraversalError()
|
||||
if 'anonymise' in get_request().form:
|
||||
if get_request().has_anonymised_data_api_restriction():
|
||||
# api/ will let this pass but we don't want that.
|
||||
raise errors.AccessForbiddenError()
|
||||
self.check_access('geojson')
|
||||
|
@ -2301,7 +2301,7 @@ class FormPage(Directory):
|
|||
return json.dumps(geojson_formdatas(items, fields=fields))
|
||||
|
||||
def ics(self):
|
||||
if 'anonymise' in get_request().form:
|
||||
if get_request().has_anonymised_data_api_restriction():
|
||||
# api/ will let this pass but we don't want that.
|
||||
raise errors.AccessForbiddenError()
|
||||
charset = get_publisher().site_charset
|
||||
|
|
|
@ -158,7 +158,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
|||
session = get_session()
|
||||
mine = False
|
||||
if api_call:
|
||||
if 'anonymise' in get_request().form:
|
||||
if get_request().has_anonymised_data_api_restriction():
|
||||
if is_url_signed() or (get_request().user and get_request().user.is_admin):
|
||||
return None
|
||||
else:
|
||||
|
@ -179,7 +179,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
|||
|
||||
def json(self):
|
||||
self.check_auth(api_call=True)
|
||||
anonymise = 'anonymise' in get_request().form
|
||||
anonymise = get_request().has_anonymised_data_api_restriction()
|
||||
return self.export_to_json(anonymise=anonymise)
|
||||
|
||||
def workflow_messages(self, position='top'):
|
||||
|
|
|
@ -211,6 +211,18 @@ class HTTPRequest(quixote.http_request.HTTPRequest):
|
|||
or user_agent.startswith('Wget')
|
||||
)
|
||||
|
||||
def has_anonymised_data_api_restriction(self):
|
||||
from wcs.api_access import ApiAccess
|
||||
|
||||
if 'anonymise' in self.form:
|
||||
return True
|
||||
orig = self.form.get('orig')
|
||||
if orig:
|
||||
api_access = ApiAccess.get_by_identifier(orig)
|
||||
if api_access:
|
||||
return api_access.restrict_to_anonymised_data
|
||||
return False
|
||||
|
||||
@property
|
||||
def META(self):
|
||||
return self.environ
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
<ul>
|
||||
<li>{% trans "Access identifier:" %} {{ api_access.access_identifier }}</li>
|
||||
<li>{% trans "Access key:" %} {{ api_access.access_key }}</li>
|
||||
{% if api_access.restrict_to_anonymised_data %}<li>{% trans "Restricted to anonymised data" %}</li>{% endif %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
|
Loading…
Reference in New Issue