api: card data submit (#49520)

This commit is contained in:
Lauréline Guérin 2021-02-22 16:15:12 +01:00
parent 69e97b1c42
commit 113089e4b8
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
3 changed files with 436 additions and 19 deletions

View File

@ -1,16 +1,27 @@
# -*- coding: utf-8 -*-
import base64
import json
import os
import time
import mock
import pytest
from django.utils.encoding import force_text
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app
from wcs import fields
from wcs import fields, qommon
from wcs.api_utils import sign_url
from wcs.carddef import CardDef
from wcs.categories import CardDefCategory
from wcs.data_sources import NamedDataSource
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from .utils import sign_uri
@ -182,3 +193,364 @@ def test_cards_import_csv(pub, local_user):
)
assert carddef.data_class().count() == 2
assert set([x.data['0'] for x in carddef.data_class().select()]) == {'first entry', 'second entry'}
def test_post_invalid_json(pub, local_user):
resp = get_app(pub).post(
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400
)
assert resp.json['err'] == 1
assert resp.json['err_class'] == 'Invalid request'
def test_card_submit(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [fields.StringField(id='0', label='foobar')]
carddef.store()
data_class = carddef.data_class()
resp = get_app(pub).post_json('/api/cards/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
def url():
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'cannot create card'
carddef.backoffice_submission_roles = [role.id]
carddef.store()
resp = get_app(pub).post_json(url(), {'data': {}})
assert resp.json['err'] == 0
assert resp.json['data']['url'] == (
'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id']
)
assert resp.json['data']['backoffice_url'] == (
'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id']
)
assert resp.json['data']['api_url'] == ('http://example.net/api/cards/test/%s/' % resp.json['data']['id'])
assert data_class.get(resp.json['data']['id']).status == 'wf-recorded'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
local_user2 = get_publisher().user_class()
local_user2.name = 'Test'
local_user2.email = 'foo@localhost'
local_user2.store()
resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
resp = get_app(pub).post(
url(), json.dumps({'data': {}}), status=400
) # missing Content-Type: application/json header
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
# check qualified content type are recognized
resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8')
assert resp.json['data']['url']
def test_carddef_submit_with_varname(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]
data_source.data_source = {'type': 'formula', 'value': repr(source)}
data_source.store()
data_source = NamedDataSource(name='foobar_jsonp')
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [
fields.StringField(id='0', label='foobar0', varname='foobar0'),
fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}),
fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}),
fields.DateField(id='3', label='foobar3', varname='date'),
fields.FileField(id='4', label='foobar4', varname='file'),
fields.MapField(id='5', label='foobar5', varname='map'),
fields.StringField(id='6', label='foobar6', varname='foobar6'),
]
carddef.backoffice_submission_roles = [role.id]
carddef.store()
data_class = carddef.data_class()
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
payload = {
'data': {
'foobar0': 'xxx',
'foobar1': '1',
'foobar1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'foobar2': 'bar',
'foobar2_raw': '10',
'date': '1970-01-01',
'file': {
'filename': 'test.txt',
'content': force_text(base64.b64encode(b'test')),
},
'map': {
'lat': 1.5,
'lon': 2.25,
},
}
}
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
assert data_class.get(resp.json['data']['id']).status == 'wf-recorded'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
(1970, 1, 1, 0, 0, 0, 3, 1, -1)
)
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test'
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
# test bijectivity
assert (
carddef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3'])
== payload['data']['date']
)
for k in payload['data']['file']:
data = data_class.get(resp.json['data']['id']).data['4']
assert carddef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
assert (
carddef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5'])
== payload['data']['map']
)
def test_carddef_submit_from_wscall(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]
data_source.data_source = {'type': 'formula', 'value': repr(source)}
data_source.store()
data_source = NamedDataSource(name='foobar_jsonp')
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
local_user2 = get_publisher().user_class()
local_user2.name = 'Jean Darmette 2'
local_user2.email = 'jean.darmette2@triffouilis.fr'
local_user2.name_identifiers = ['0123456789bis']
local_user2.store()
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
]
workflow.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [
fields.StringField(id='0', label='foobar0', varname='foobar0'),
fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}),
fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}),
fields.DateField(id='3', label='foobar3', varname='date'),
fields.FileField(id='4', label='foobar4', varname='file'),
fields.MapField(id='5', label='foobar5', varname='map'),
fields.StringField(id='6', label='foobar6', varname='foobar6'),
]
carddef.backoffice_submission_roles = [role.id]
carddef.workflow = workflow
carddef.store()
carddata = carddef.data_class()()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'test'])
carddata.data = {
'0': 'xxx',
'1': '1',
'1_display': '1',
'1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'2': '10',
'2_display': 'bar',
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
'4': upload,
'5': '1.5;2.25',
'bo1': 'backoffice field',
}
carddata.just_created()
carddata.store()
def url():
signed_url = sign_url(
'http://example.net/api/cards/test/submit?orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
resp = get_app(pub).post_json(url(), payload)
assert resp.json['err'] == 0
new_carddata = carddef.data_class().get(resp.json['data']['id'])
assert new_carddata.data['0'] == carddata.data['0']
assert new_carddata.data['1'] == carddata.data['1']
assert new_carddata.data['1_display'] == carddata.data['1_display']
assert new_carddata.data['1_structured'] == carddata.data['1_structured']
assert new_carddata.data['2'] == carddata.data['2']
assert new_carddata.data['2_display'] == carddata.data['2_display']
assert new_carddata.data['3'] == carddata.data['3']
assert new_carddata.data['4'].get_content() == carddata.data['4'].get_content()
assert new_carddata.data['5'] == carddata.data['5']
assert new_carddata.data['bo1'] == carddata.data['bo1']
assert not new_carddata.data.get('6')
assert new_carddata.user_id == str(local_user.id)
# add an extra attribute
payload['extra'] = {'foobar6': 'YYY'}
resp = get_app(pub).post_json(url(), payload)
assert resp.json['err'] == 0
new_carddata = carddef.data_class().get(resp.json['data']['id'])
assert new_carddata.data['0'] == carddata.data['0']
assert new_carddata.data['6'] == 'YYY'
# add user
carddata.user_id = local_user2.id
carddata.store()
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
resp = get_app(pub).post_json(url(), payload)
assert resp.json['err'] == 0
new_carddata = carddef.data_class().get(resp.json['data']['id'])
assert str(new_carddata.user_id) == str(local_user2.id)
# test missing map data
del carddata.data['5']
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
resp = get_app(pub).post_json(url(), payload)
assert resp.json['err'] == 0
new_carddata = carddef.data_class().get(resp.json['data']['id'])
assert new_carddata.data.get('5') is None
def test_formdef_submit_structured(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [
fields.ItemField(
id='0',
label='foobar',
varname='foobar',
data_source={
'type': 'json',
'value': 'http://datasource.com',
},
),
fields.ItemField(
id='1',
label='foobar1',
varname='foobar1',
data_source={
'type': 'formula',
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
},
),
]
carddef.backoffice_submission_roles = [role.id]
carddef.store()
data_class = carddef.data_class()
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
)
resp = get_app(pub).post_json(
url,
{
'data': {
'0': '0',
"1": '3',
}
},
)
formdata = data_class.get(resp.json['data']['id'])
assert formdata.status == 'wf-recorded'
assert formdata.data['0'] == '0'
assert formdata.data['0_display'] == 'zéro'
assert formdata.data['0_structured'] == {
'id': 0,
'text': 'zéro',
'foo': 'bar',
}
assert formdata.data['1'] == '3'
assert formdata.data['1_display'] == 'label 3'
assert formdata.data['1_structured'] == {
'id': 3,
'text': 'label 3',
'foo': 3,
}

View File

@ -543,12 +543,6 @@ def test_formdef_submit(pub, local_user):
def test_formdef_submit_only_one(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
@ -594,12 +588,6 @@ def test_formdef_submit_with_varname(pub, local_user):
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
@ -776,12 +764,6 @@ def test_formdef_submit_from_wscall(pub, local_user):
def test_formdef_submit_structured(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'

View File

@ -245,6 +245,7 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage):
'geojson',
'ods',
('@schema', 'schema'),
'submit',
]
def check_access(self, api_name=None):
@ -262,6 +263,68 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage):
get_response().set_content_type('application/json')
return self.formdef.export_to_json(anonymise=not is_url_signed())
def submit(self):
get_response().set_content_type('application/json')
if not is_url_signed():
raise AccessForbiddenError('unsigned API call')
user = get_user_from_api_query_string()
get_request()._user = user
json_input = get_request().json
formdata = self.formdef.data_class()()
if not (user and self.can_user_add_cards()):
raise AccessForbiddenError('cannot create card')
if 'data' in json_input:
# the published API expects data in 'data'.
data = json_input['data']
elif 'fields' in json_input:
# but the API also supports data in 'fields', to match the json
# output produded by wf/wscall.py.
data = json_input['fields']
if 'workflow' in json_input and json_input['workflow'].get('fields'):
# handle workflow fields, put them all in the same data dictionary.
data.update(json_input['workflow']['fields'])
if 'extra' in json_input:
data.update(json_input['extra'])
else:
data = {}
formdata.data = posted_json_data_to_formdata_data(self.formdef, data)
if 'user' in json_input:
formdata_user = None
for name_id in json_input['user'].get('NameID') or []:
formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id)
if formdata_user:
break
else:
if json_input['user'].get('email'):
formdata_user = get_publisher().user_class.get_users_with_email(
json_input['user'].get('email')
)
if formdata_user:
formdata.user_id = formdata_user[0].id
else:
formdata.user_id = user.id
formdata.store()
formdata.just_created()
formdata.store()
formdata.perform_workflow()
formdata.store()
return json.dumps(
{
'err': 0,
'data': {
'id': formdata.id,
'url': formdata.get_url(),
'backoffice_url': formdata.get_url(backoffice=True),
'api_url': formdata.get_api_url(),
},
}
)
def import_csv(self):
if get_request().get_method() != 'PUT':
raise MethodNotAllowedError(allowed_methods=['PUT'])