api: add basic auth support to cards submit and import from csv APIs (#54350)

This commit is contained in:
Frédéric Péters 2021-05-30 22:10:19 +02:00
parent 57544f2059
commit 6bf202a98d
2 changed files with 99 additions and 27 deletions

View File

@ -169,13 +169,46 @@ def test_cards(pub, local_user):
assert resp.json['fields'][0]['varname'] == 'foo'
def test_cards_import_csv(pub, local_user):
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_cards_import_csv(pub, local_user, auth):
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
app = get_app(pub)
if auth == 'http-basic':
access.roles = [role]
access.store()
def get_url(url, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.get(url, **kwargs)
def put_url(url, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.put(url, **kwargs)
else:
def get_url(url, **kwargs):
return app.get(
sign_uri(url, user=local_user, orig=access.access_identifier, key=access.access_key), **kwargs
)
def put_url(url, **kwargs):
return app.put(
sign_uri(url, user=local_user, orig=access.access_identifier, key=access.access_key), **kwargs
)
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
@ -192,8 +225,8 @@ def test_cards_import_csv(pub, local_user):
get_app(pub).get(sign_uri('/api/cards/test/import-csv'), status=405)
get_app(pub).put(sign_uri('/api/cards/test/import-csv'), status=403)
resp = get_app(pub).put(
sign_uri('/api/cards/test/import-csv', user=local_user),
resp = put_url(
'/api/cards/test/import-csv',
params=b'foobar;foobar2\nfirst entry;plop\nsecond entry;plop\n',
headers={'content-type': 'text/csv'},
)
@ -204,8 +237,8 @@ def test_cards_import_csv(pub, local_user):
# async mode
carddef.data_class().wipe()
assert carddef.data_class().count() == 0
resp = get_app(pub).put(
sign_uri('/api/cards/test/import-csv?async=on', user=local_user),
resp = put_url(
'/api/cards/test/import-csv?async=on',
params=b'foobar;foobar2\nfirst entry;plop\nsecond entry;plop\n',
headers={'content-type': 'text/csv'},
)
@ -217,7 +250,7 @@ def test_cards_import_csv(pub, local_user):
job_id = resp.json['data']['job']['id']
assert AfterJob.get(job_id).status == 'completed'
# get job status from its api url
resp = get_app(pub).get(sign_uri(resp.json['data']['job']['url'], user=local_user))
resp = get_url(resp.json['data']['job']['url'])
assert resp.json['err'] == 0
assert resp.json['data']['label'] == 'Importing data into cards'
assert resp.json['data']['status'] == 'completed'
@ -328,13 +361,50 @@ def test_post_invalid_json(pub, local_user):
assert resp.json['err_class'] == 'Invalid request'
def test_card_submit(pub, local_user):
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_card_submit(pub, local_user, auth):
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
app = get_app(pub)
if auth == 'http-basic':
access.roles = [role]
access.store()
def post_url(url, *args, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.post(url, *args, **kwargs)
def post_json_url(url, *args, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.post_json(url, *args, **kwargs)
else:
def post_url(url, *args, **kwargs):
return app.post(
sign_uri(url, user=local_user, orig=access.access_identifier, key=access.access_key),
*args,
**kwargs,
)
def post_json_url(url, *args, **kwargs):
return app.post_json(
sign_uri(url, user=local_user, orig=access.access_identifier, key=access.access_key),
*args,
**kwargs,
)
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
@ -345,23 +415,15 @@ def test_card_submit(pub, local_user):
resp = get_app(pub).post_json('/api/cards/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
assert resp.json['err_desc'] == 'cannot create card'
def url():
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
resp = post_json_url('/api/cards/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'cannot create card'
carddef.backoffice_submission_roles = [role.id]
carddef.store()
resp = get_app(pub).post_json(url(), {'data': {}})
resp = post_json_url('/api/cards/test/submit', {'data': {}})
assert resp.json['err'] == 0
assert resp.json['data']['url'] == (
'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id']
@ -371,23 +433,28 @@ def test_card_submit(pub, local_user):
)
assert resp.json['data']['api_url'] == ('http://example.net/api/cards/test/%s/' % resp.json['data']['id'])
assert data_class.get(resp.json['data']['id']).status == 'wf-recorded'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
if auth == 'signature':
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
local_user2 = get_publisher().user_class()
local_user2.name = 'Test'
local_user2.email = 'foo@localhost'
local_user2.store()
resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
resp = post_json_url(
'/api/cards/test/submit', {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}}
)
assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
resp = get_app(pub).post(
url(), json.dumps({'data': {}}), status=400
resp = post_url(
'/api/cards/test/submit', json.dumps({'data': {}}), status=400
) # missing Content-Type: application/json header
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
# check qualified content type are recognized
resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8')
resp = post_url(
'/api/cards/test/submit', json.dumps({'data': {}}), content_type='application/json; charset=utf-8'
)
assert resp.json['data']['url']

View File

@ -276,10 +276,12 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage):
def submit(self):
get_response().set_content_type('application/json')
if not is_url_signed():
raise AccessForbiddenError('unsigned API call')
user = get_user_from_api_query_string()
get_request()._user = user
if user and user.is_api_user:
pass # API users are ok
else:
get_request()._user = user
json_input = get_request().json
formdata = self.formdef.data_class()()
@ -1168,7 +1170,10 @@ class AfterJobsDirectory(Directory):
_q_exports = ['']
def _q_lookup(self, component):
if not (is_url_signed() or (get_request().user and get_request().user.is_admin)):
api_user = get_user_from_api_query_string()
if api_user and api_user.is_api_user:
pass # API users are ok
elif not (is_url_signed() or (get_request().user and get_request().user.is_admin)):
raise AccessForbiddenError('unsigned request or user is not admin')
try:
afterjob = AfterJob.get(component, ignore_errors=False)