api: serialize user as attributes to support api clients in afterjobs (#88697)
gitea/wcs/pipeline/head This commit looks good Details

This commit is contained in:
Frédéric Péters 2024-03-26 15:45:01 +01:00
parent d0358afa40
commit 2e12fbad77
4 changed files with 76 additions and 13 deletions

View File

@ -10,6 +10,7 @@ import zipfile
from contextlib import contextmanager
import pytest
import responses
from django.utils.encoding import force_bytes
from django.utils.timezone import localtime, make_aware
from quixote import get_publisher
@ -49,6 +50,12 @@ def pub(emails):
'''\
[api-secrets]
coucou = 1234
[variables]
idp_api_url = https://authentic.example.invalid/api/'
[wscall-secrets]
authentic.example.invalid = 4460cf12e156d841c116fbebd52d7ebe41282c63ac2605740068ba5fd89b7316
'''
)
@ -2985,9 +2992,12 @@ def test_api_distance_filter(pub, local_user):
get_app(pub).get(sign_uri('/api/forms/test/list?filter-distance=150000', user=local_user), status=400)
@pytest.mark.parametrize('user', ['query-email', 'api-access'])
@pytest.mark.parametrize('user', ['query-email', 'api-access', 'idp-api-client'])
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
@responses.activate
def test_api_ods_formdata(pub, local_user, user, auth):
ApiAccess.wipe()
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
@ -3007,7 +3017,6 @@ def test_api_ods_formdata(pub, local_user, user, auth):
data_class.wipe()
if user == 'api-access':
ApiAccess.wipe()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
@ -3025,6 +3034,29 @@ def test_api_ods_formdata(pub, local_user, user, auth):
def get_url(url, **kwargs):
return app.get(sign_uri(url, orig=access.access_identifier, key=access.access_key), **kwargs)
elif user == 'idp-api-client':
if auth == 'signature':
pytest.skip('signature authentication requires local user')
def get_url(url, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.get(url, **kwargs)
responses.post(
'https://authentic.example.invalid/api/check-api-client/',
json={
'err': 0,
'data': {
'is_active': True,
'is_anonymous': False,
'is_authenticated': True,
'is_superuser': False,
'restrict_to_anonymised_data': False,
'roles': [],
},
},
)
else:
if auth == 'http-basic':
pytest.skip('http basic authentication requires ApiAccess')
@ -3053,6 +3085,21 @@ def test_api_ods_formdata(pub, local_user, user, auth):
if user == 'api-access':
access.roles = [role]
access.store()
elif user == 'idp-api-client':
responses.post(
'https://authentic.example.invalid/api/check-api-client/',
json={
'err': 0,
'data': {
'is_active': True,
'is_anonymous': False,
'is_authenticated': True,
'is_superuser': False,
'restrict_to_anonymised_data': False,
'roles': [role.id],
},
},
)
else:
local_user.roles = [role.id]
local_user.store()

View File

@ -75,6 +75,7 @@ class ApiAccess(XmlStorableObject):
id = Ellipsis # make sure it fails all over the place if used
is_admin = False
is_api_user = True
restrict_to_anonymised_data = False
def __init__(self, api_access):
self.api_access = api_access

View File

@ -4566,13 +4566,7 @@ class CsvExportAfterJob(AfterJob):
label = _('Exporting to CSV file')
def __init__(self, formdef, **kwargs):
user = kwargs.pop('user', None)
if user and user.is_api_user:
kwargs['user_is_api_user'] = True
kwargs['user_id'] = user.api_access.id
else:
kwargs['user_is_api_user'] = False
kwargs['user_id'] = user.id if user else None
kwargs.update(self.serialize_user(kwargs.pop('user', None)))
super().__init__(formdef_class=formdef.__class__, formdef_id=formdef.id, **kwargs)
self.file_name = '%s.csv' % formdef.url_name
@ -4643,10 +4637,7 @@ class CsvExportAfterJob(AfterJob):
query = self.kwargs['query']
criterias = self.kwargs['criterias']
order_by = self.kwargs['order_by']
if self.kwargs['user_is_api_user']:
user = ApiAccess.get(self.kwargs['user_id']).get_as_api_user()
else:
user = get_publisher().user_class.get(self.kwargs['user_id'])
user = self.unserialize_user(self.kwargs)
items, total_count = FormDefUI(formdef).get_listing_items(
fields,

View File

@ -84,6 +84,30 @@ class AfterJob(StorableObject):
def done_action_attributes(self):
return self.done_button_attributes_arg
def serialize_user(self, user):
obj = {}
if user and user.is_api_user:
obj['user_is_api_user'] = True
# serialize as a volatile object to match idp api clients
obj['user_role_ids'] = list(user.roles)
obj['user_restrict_to_anonymised_data'] = user.restrict_to_anonymised_data
else:
obj['user_is_api_user'] = False
obj['user_id'] = user.id if user else None
return obj
def unserialize_user(self, obj):
if obj['user_is_api_user']:
from wcs.api_access import ApiAccess
api_access = ApiAccess.volatile()
api_access._role_ids = obj['user_role_ids']
api_access.restrict_to_anonymised_data = obj['user_restrict_to_anonymised_data']
user = api_access.get_as_api_user()
else:
user = get_publisher().user_class.get(obj['user_id'])
return user
def increment_count(self, amount=1):
self.current_count = (self.current_count or 0) + amount
# delay storage to avoid repeated writes on slow storage