diff --git a/tests/api/test_carddef.py b/tests/api/test_carddef.py index db94f405e..cb0dd6913 100644 --- a/tests/api/test_carddef.py +++ b/tests/api/test_carddef.py @@ -1,16 +1,27 @@ # -*- coding: utf-8 -*- +import base64 +import json import os +import time +import mock import pytest +from django.utils.encoding import force_text +from django.utils.six import StringIO +from django.utils.six.moves.urllib import parse as urllib from quixote import get_publisher from utilities import clean_temporary_pub, create_temporary_pub, get_app -from wcs import fields +from wcs import fields, qommon +from wcs.api_utils import sign_url from wcs.carddef import CardDef from wcs.categories import CardDefCategory +from wcs.data_sources import NamedDataSource +from wcs.qommon.form import PicklableUpload from wcs.qommon.http_request import HTTPRequest from wcs.roles import Role +from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef from .utils import sign_uri @@ -182,3 +193,364 @@ def test_cards_import_csv(pub, local_user): ) assert carddef.data_class().count() == 2 assert set([x.data['0'] for x in carddef.data_class().select()]) == {'first entry', 'second entry'} + + +def test_post_invalid_json(pub, local_user): + resp = get_app(pub).post( + '/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400 + ) + assert resp.json['err'] == 1 + assert resp.json['err_class'] == 'Invalid request' + + +def test_card_submit(pub, local_user): + Role.wipe() + role = Role(name='test') + role.store() + local_user.roles = [role.id] + local_user.store() + + CardDef.wipe() + carddef = CardDef() + carddef.name = 'test' + carddef.fields = [fields.StringField(id='0', label='foobar')] + carddef.store() + + data_class = carddef.data_class() + + resp = get_app(pub).post_json('/api/cards/test/submit', {'data': {}}, status=403) + assert resp.json['err'] == 1 + assert resp.json['err_desc'] == 'unsigned API call' + + def url(): + signed_url = sign_url( + 'http://example.net/api/cards/test/submit' + + '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), + '1234', + ) + return signed_url[len('http://example.net') :] + + resp = get_app(pub).post_json(url(), {'data': {}}, status=403) + assert resp.json['err'] == 1 + assert resp.json['err_desc'] == 'cannot create card' + + carddef.backoffice_submission_roles = [role.id] + carddef.store() + resp = get_app(pub).post_json(url(), {'data': {}}) + assert resp.json['err'] == 0 + assert resp.json['data']['url'] == ( + 'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id'] + ) + assert resp.json['data']['backoffice_url'] == ( + 'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id'] + ) + assert resp.json['data']['api_url'] == ('http://example.net/api/cards/test/%s/' % resp.json['data']['id']) + assert data_class.get(resp.json['data']['id']).status == 'wf-recorded' + assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) + assert data_class.get(resp.json['data']['id']).tracking_code is None + + local_user2 = get_publisher().user_class() + local_user2.name = 'Test' + local_user2.email = 'foo@localhost' + local_user2.store() + resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}}) + assert data_class.get(resp.json['data']['id']).user.email == local_user2.email + + resp = get_app(pub).post( + url(), json.dumps({'data': {}}), status=400 + ) # missing Content-Type: application/json header + assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type' + + # check qualified content type are recognized + resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8') + assert resp.json['data']['url'] + + +def test_carddef_submit_with_varname(pub, local_user): + NamedDataSource.wipe() + data_source = NamedDataSource(name='foobar') + source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}] + data_source.data_source = {'type': 'formula', 'value': repr(source)} + data_source.store() + + data_source = NamedDataSource(name='foobar_jsonp') + data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'} + data_source.store() + + Role.wipe() + role = Role(name='test') + role.store() + local_user.roles = [role.id] + local_user.store() + + CardDef.wipe() + carddef = CardDef() + carddef.name = 'test' + carddef.fields = [ + fields.StringField(id='0', label='foobar0', varname='foobar0'), + fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}), + fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}), + fields.DateField(id='3', label='foobar3', varname='date'), + fields.FileField(id='4', label='foobar4', varname='file'), + fields.MapField(id='5', label='foobar5', varname='map'), + fields.StringField(id='6', label='foobar6', varname='foobar6'), + ] + carddef.backoffice_submission_roles = [role.id] + carddef.store() + data_class = carddef.data_class() + + signed_url = sign_url( + 'http://example.net/api/cards/test/submit' + + '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), + '1234', + ) + url = signed_url[len('http://example.net') :] + payload = { + 'data': { + 'foobar0': 'xxx', + 'foobar1': '1', + 'foobar1_structured': { + 'id': '1', + 'text': 'foo', + 'more': 'XXX', + }, + 'foobar2': 'bar', + 'foobar2_raw': '10', + 'date': '1970-01-01', + 'file': { + 'filename': 'test.txt', + 'content': force_text(base64.b64encode(b'test')), + }, + 'map': { + 'lat': 1.5, + 'lon': 2.25, + }, + } + } + resp = get_app(pub).post_json(url, payload) + assert resp.json['err'] == 0 + assert data_class.get(resp.json['data']['id']).status == 'wf-recorded' + assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) + assert data_class.get(resp.json['data']['id']).tracking_code is None + assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx' + assert data_class.get(resp.json['data']['id']).data['1'] == '1' + assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0] + assert data_class.get(resp.json['data']['id']).data['2'] == '10' + assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar' + assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time( + (1970, 1, 1, 0, 0, 0, 3, 1, -1) + ) + + assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt' + assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test' + assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25' + # test bijectivity + assert ( + carddef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3']) + == payload['data']['date'] + ) + for k in payload['data']['file']: + data = data_class.get(resp.json['data']['id']).data['4'] + assert carddef.fields[4].get_json_value(data)[k] == payload['data']['file'][k] + assert ( + carddef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5']) + == payload['data']['map'] + ) + + +def test_carddef_submit_from_wscall(pub, local_user): + NamedDataSource.wipe() + data_source = NamedDataSource(name='foobar') + source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}] + data_source.data_source = {'type': 'formula', 'value': repr(source)} + data_source.store() + + data_source = NamedDataSource(name='foobar_jsonp') + data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'} + data_source.store() + + Role.wipe() + role = Role(name='test') + role.store() + local_user.roles = [role.id] + local_user.store() + + local_user2 = get_publisher().user_class() + local_user2.name = 'Jean Darmette 2' + local_user2.email = 'jean.darmette2@triffouilis.fr' + local_user2.name_identifiers = ['0123456789bis'] + local_user2.store() + + workflow = Workflow.get_default_workflow() + workflow.id = '2' + workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow) + workflow.backoffice_fields_formdef.fields = [ + fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'), + ] + workflow.store() + + CardDef.wipe() + carddef = CardDef() + carddef.name = 'test' + carddef.fields = [ + fields.StringField(id='0', label='foobar0', varname='foobar0'), + fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}), + fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}), + fields.DateField(id='3', label='foobar3', varname='date'), + fields.FileField(id='4', label='foobar4', varname='file'), + fields.MapField(id='5', label='foobar5', varname='map'), + fields.StringField(id='6', label='foobar6', varname='foobar6'), + ] + carddef.backoffice_submission_roles = [role.id] + carddef.workflow = workflow + carddef.store() + + carddata = carddef.data_class()() + upload = PicklableUpload('test.txt', 'text/plain', 'ascii') + upload.receive([b'test']) + carddata.data = { + '0': 'xxx', + '1': '1', + '1_display': '1', + '1_structured': { + 'id': '1', + 'text': 'foo', + 'more': 'XXX', + }, + '2': '10', + '2_display': 'bar', + '3': time.strptime('1970-01-01', '%Y-%m-%d'), + '4': upload, + '5': '1.5;2.25', + 'bo1': 'backoffice field', + } + carddata.just_created() + carddata.store() + + def url(): + signed_url = sign_url( + 'http://example.net/api/cards/test/submit?orig=coucou&email=%s' % urllib.quote(local_user.email), + '1234', + ) + return signed_url[len('http://example.net') :] + + payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder)) + + resp = get_app(pub).post_json(url(), payload) + assert resp.json['err'] == 0 + new_carddata = carddef.data_class().get(resp.json['data']['id']) + assert new_carddata.data['0'] == carddata.data['0'] + assert new_carddata.data['1'] == carddata.data['1'] + assert new_carddata.data['1_display'] == carddata.data['1_display'] + assert new_carddata.data['1_structured'] == carddata.data['1_structured'] + assert new_carddata.data['2'] == carddata.data['2'] + assert new_carddata.data['2_display'] == carddata.data['2_display'] + assert new_carddata.data['3'] == carddata.data['3'] + assert new_carddata.data['4'].get_content() == carddata.data['4'].get_content() + assert new_carddata.data['5'] == carddata.data['5'] + assert new_carddata.data['bo1'] == carddata.data['bo1'] + assert not new_carddata.data.get('6') + assert new_carddata.user_id == str(local_user.id) + + # add an extra attribute + payload['extra'] = {'foobar6': 'YYY'} + resp = get_app(pub).post_json(url(), payload) + assert resp.json['err'] == 0 + new_carddata = carddef.data_class().get(resp.json['data']['id']) + assert new_carddata.data['0'] == carddata.data['0'] + assert new_carddata.data['6'] == 'YYY' + + # add user + carddata.user_id = local_user2.id + carddata.store() + payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder)) + + resp = get_app(pub).post_json(url(), payload) + assert resp.json['err'] == 0 + new_carddata = carddef.data_class().get(resp.json['data']['id']) + assert str(new_carddata.user_id) == str(local_user2.id) + + # test missing map data + del carddata.data['5'] + payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder)) + + resp = get_app(pub).post_json(url(), payload) + assert resp.json['err'] == 0 + new_carddata = carddef.data_class().get(resp.json['data']['id']) + assert new_carddata.data.get('5') is None + + +def test_formdef_submit_structured(pub, local_user): + Role.wipe() + role = Role(name='test') + role.store() + local_user.roles = [role.id] + local_user.store() + + CardDef.wipe() + carddef = CardDef() + carddef.name = 'test' + carddef.fields = [ + fields.ItemField( + id='0', + label='foobar', + varname='foobar', + data_source={ + 'type': 'json', + 'value': 'http://datasource.com', + }, + ), + fields.ItemField( + id='1', + label='foobar1', + varname='foobar1', + data_source={ + 'type': 'formula', + 'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]', + }, + ), + ] + carddef.backoffice_submission_roles = [role.id] + carddef.store() + data_class = carddef.data_class() + + signed_url = sign_url( + 'http://example.net/api/cards/test/submit' + '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), + '1234', + ) + url = signed_url[len('http://example.net') :] + + with mock.patch('wcs.qommon.misc.urlopen') as urlopen: + urlopen.side_effect = lambda *args: StringIO( + '''\ +{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \ +{"id": 1, "text": "uné", "foo": "bar1"}, \ +{"id": 2, "text": "deux", "foo": "bar2"}]}''' + ) + resp = get_app(pub).post_json( + url, + { + 'data': { + '0': '0', + "1": '3', + } + }, + ) + + formdata = data_class.get(resp.json['data']['id']) + assert formdata.status == 'wf-recorded' + assert formdata.data['0'] == '0' + assert formdata.data['0_display'] == 'zéro' + assert formdata.data['0_structured'] == { + 'id': 0, + 'text': 'zéro', + 'foo': 'bar', + } + assert formdata.data['1'] == '3' + assert formdata.data['1_display'] == 'label 3' + assert formdata.data['1_structured'] == { + 'id': 3, + 'text': 'label 3', + 'foo': 3, + } diff --git a/tests/api/test_formdef.py b/tests/api/test_formdef.py index 1cc466d08..836cc1670 100644 --- a/tests/api/test_formdef.py +++ b/tests/api/test_formdef.py @@ -543,12 +543,6 @@ def test_formdef_submit(pub, local_user): def test_formdef_submit_only_one(pub, local_user): - Role.wipe() - role = Role(name='test') - role.store() - local_user.roles = [role.id] - local_user.store() - FormDef.wipe() formdef = FormDef() formdef.name = 'test' @@ -594,12 +588,6 @@ def test_formdef_submit_with_varname(pub, local_user): data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'} data_source.store() - Role.wipe() - role = Role(name='test') - role.store() - local_user.roles = [role.id] - local_user.store() - FormDef.wipe() formdef = FormDef() formdef.name = 'test' @@ -776,12 +764,6 @@ def test_formdef_submit_from_wscall(pub, local_user): def test_formdef_submit_structured(pub, local_user): - Role.wipe() - role = Role(name='test') - role.store() - local_user.roles = [role.id] - local_user.store() - FormDef.wipe() formdef = FormDef() formdef.name = 'test' diff --git a/wcs/api.py b/wcs/api.py index 13afcdca3..5e579e4bb 100644 --- a/wcs/api.py +++ b/wcs/api.py @@ -245,6 +245,7 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage): 'geojson', 'ods', ('@schema', 'schema'), + 'submit', ] def check_access(self, api_name=None): @@ -262,6 +263,68 @@ class ApiCardPage(ApiFormPageMixin, BackofficeCardPage): get_response().set_content_type('application/json') return self.formdef.export_to_json(anonymise=not is_url_signed()) + def submit(self): + get_response().set_content_type('application/json') + if not is_url_signed(): + raise AccessForbiddenError('unsigned API call') + user = get_user_from_api_query_string() + get_request()._user = user + json_input = get_request().json + formdata = self.formdef.data_class()() + + if not (user and self.can_user_add_cards()): + raise AccessForbiddenError('cannot create card') + + if 'data' in json_input: + # the published API expects data in 'data'. + data = json_input['data'] + elif 'fields' in json_input: + # but the API also supports data in 'fields', to match the json + # output produded by wf/wscall.py. + data = json_input['fields'] + if 'workflow' in json_input and json_input['workflow'].get('fields'): + # handle workflow fields, put them all in the same data dictionary. + data.update(json_input['workflow']['fields']) + if 'extra' in json_input: + data.update(json_input['extra']) + else: + data = {} + + formdata.data = posted_json_data_to_formdata_data(self.formdef, data) + + if 'user' in json_input: + formdata_user = None + for name_id in json_input['user'].get('NameID') or []: + formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id) + if formdata_user: + break + else: + if json_input['user'].get('email'): + formdata_user = get_publisher().user_class.get_users_with_email( + json_input['user'].get('email') + ) + if formdata_user: + formdata.user_id = formdata_user[0].id + else: + formdata.user_id = user.id + + formdata.store() + formdata.just_created() + formdata.store() + formdata.perform_workflow() + formdata.store() + return json.dumps( + { + 'err': 0, + 'data': { + 'id': formdata.id, + 'url': formdata.get_url(), + 'backoffice_url': formdata.get_url(backoffice=True), + 'api_url': formdata.get_api_url(), + }, + } + ) + def import_csv(self): if get_request().get_method() != 'PUT': raise MethodNotAllowedError(allowed_methods=['PUT'])