836 lines
30 KiB
Python
836 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import base64
|
|
import datetime
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
import mock
|
|
import pytest
|
|
from django.utils.encoding import force_text
|
|
from django.utils.six import StringIO
|
|
from django.utils.six.moves.urllib import parse as urllib
|
|
from quixote import get_publisher
|
|
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
|
|
|
from wcs import fields, qommon
|
|
from wcs.api_utils import sign_url
|
|
from wcs.data_sources import NamedDataSource
|
|
from wcs.formdef import FormDef
|
|
from wcs.qommon.form import PicklableUpload
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.roles import Role
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
|
|
|
|
from .utils import sign_uri
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'pub' in metafunc.fixturenames:
|
|
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def pub(request, emails):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub.cfg['identification'] = {'methods': ['password']}
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
|
|
'''\
|
|
[api-secrets]
|
|
coucou = 1234
|
|
'''
|
|
)
|
|
|
|
return pub
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
@pytest.fixture
|
|
def local_user():
|
|
get_publisher().user_class.wipe()
|
|
user = get_publisher().user_class()
|
|
user.name = 'Jean Darmette'
|
|
user.email = 'jean.darmette@triffouilis.fr'
|
|
user.name_identifiers = ['0123456789']
|
|
user.store()
|
|
return user
|
|
|
|
|
|
def test_formdef_list(pub):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.keywords = 'mobile, test'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
# anonymous access -> 403
|
|
resp1 = get_app(pub).get('/json', status=403)
|
|
resp2 = get_app(pub).get('/', headers={'Accept': 'application/json'}, status=403)
|
|
resp3 = get_app(pub).get('/api/formdefs/', status=403)
|
|
|
|
# signed request
|
|
resp1 = get_app(pub).get(sign_uri('/json'))
|
|
resp2 = get_app(pub).get(sign_uri('/'), headers={'Accept': 'application/json'})
|
|
resp3 = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
assert resp1.json == resp2.json == resp3.json
|
|
assert resp1.json['data'][0]['title'] == 'test'
|
|
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp1.json['data'][0]['redirection'] is False
|
|
assert resp1.json['data'][0]['always_advertise'] is False
|
|
assert resp1.json['data'][0]['description'] == 'plop'
|
|
assert resp1.json['data'][0]['keywords'] == ['mobile', 'test']
|
|
assert list(resp1.json['data'][0]['functions'].keys()) == ['_receiver']
|
|
assert resp1.json['data'][0]['functions']['_receiver']['label'] == 'Recipient'
|
|
assert resp1.json['data'][0]['functions']['_receiver']['role']['slug'] == role.slug
|
|
assert resp1.json['data'][0]['functions']['_receiver']['role']['name'] == role.name
|
|
assert 'count' not in resp1.json['data'][0]
|
|
|
|
# backoffice_submission formdef : none
|
|
resp1 = get_app(pub).get('/api/formdefs/?backoffice-submission=on', status=403)
|
|
resp1 = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp1.json['err'] == 0
|
|
assert len(resp1.json['data']) == 0
|
|
|
|
formdef.data_class().wipe()
|
|
|
|
# a draft
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.status = 'draft'
|
|
formdata.store()
|
|
|
|
other_formdef = FormDef()
|
|
other_formdef.name = 'test 2'
|
|
other_formdef.fields = []
|
|
other_formdef.store()
|
|
other_formdata = other_formdef.data_class()()
|
|
other_formdata.data = {}
|
|
other_formdata.just_created()
|
|
other_formdata.store()
|
|
|
|
# formdata created:
|
|
# - 1 day ago (=3*4)
|
|
# - 7 days ago (=2*2)
|
|
# - 29 days ago (=1*1)
|
|
# - 31 days ago (=0)
|
|
for days in [1, 1, 1, 7, 7, 29, 31]:
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.receipt_time = (datetime.datetime.now() - datetime.timedelta(days=days)).timetuple()
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?include-count=on'))
|
|
if not pub.is_using_postgresql():
|
|
assert resp.json['data'][0]['count'] == 8
|
|
else:
|
|
# 3*4 + 2*2 + 1*1
|
|
assert resp.json['data'][0]['count'] == 17
|
|
|
|
|
|
def test_limited_formdef_list(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['authentication_required'] is False
|
|
# not present in backoffice-submission formdefs
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# check it's not advertised
|
|
formdef.roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
|
|
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
|
|
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1 # advertised in naked calls (as done from combo)
|
|
assert len(resp2.json['data']) == 0 # not advertised otherwise
|
|
assert resp2.json == resp3.json == resp4.json
|
|
# still not present in backoffice-submission formdefs
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# unless user has correct roles
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
local_user.roles = []
|
|
local_user.store()
|
|
|
|
# check it's also included in anonymous/signed calls, but marked for
|
|
# authentication
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
assert resp.json['data'][0]
|
|
assert resp.json['data'][0]['authentication_required'] is True
|
|
|
|
# check it's advertised
|
|
formdef.always_advertise = True
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
|
|
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
|
|
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['authentication_required']
|
|
assert resp.json == resp2.json == resp3.json == resp4.json
|
|
|
|
formdef.required_authentication_contexts = ['fedict']
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
assert resp.json['data'][0]['required_authentication_contexts'] == ['fedict']
|
|
|
|
|
|
def test_formdef_list_redirection(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.disabled = True
|
|
formdef.disabled_redirection = 'http://example.net'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
resp1 = get_app(pub).get(sign_uri('/json'))
|
|
assert resp1.json['err'] == 0
|
|
assert resp1.json['data'][0]['title'] == 'test'
|
|
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp1.json['data'][0]['redirection'] is True
|
|
assert 'count' not in resp1.json['data'][0]
|
|
|
|
|
|
def test_backoffice_submission_formdef_list(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdef2 = FormDef()
|
|
formdef2.name = 'ignore me'
|
|
formdef2.fields = []
|
|
formdef2.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# check it's not advertised ...
|
|
formdef.backoffice_submission_roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# even if it's advertised on frontoffice
|
|
formdef.always_advertise = True
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# even if user is admin
|
|
local_user.is_admin = True
|
|
local_user.store()
|
|
resp = get_app(pub).get(
|
|
sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' % local_user.name_identifiers[0])
|
|
)
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
local_user.is_admin = False
|
|
local_user.store()
|
|
|
|
# ... unless user has correct roles
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(
|
|
sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' % local_user.name_identifiers[0])
|
|
)
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert 'backoffice_submission_url' in resp.json['data'][0]
|
|
|
|
# but not advertised if it's a redirection
|
|
formdef.disabled = True
|
|
formdef.disabled_redirection = 'http://example.net'
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
|
|
def test_formdef_schema(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.status = 'st2'
|
|
jump.timeout = 100
|
|
st1.items.append(jump)
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.status = 'st3'
|
|
st2.items.append(jump)
|
|
st2 = workflow.add_status('Status3', 'st3')
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
|
|
]
|
|
workflow.store()
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar'),
|
|
fields.ItemField(
|
|
id='1',
|
|
label='foobar1',
|
|
varname='foobar1',
|
|
data_source={
|
|
'type': 'json',
|
|
'value': 'http://datasource.com',
|
|
},
|
|
),
|
|
fields.ItemsField(
|
|
id='2',
|
|
label='foobar2',
|
|
varname='foobar2',
|
|
data_source={
|
|
'type': 'formula',
|
|
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
|
|
},
|
|
),
|
|
]
|
|
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO(
|
|
'''\
|
|
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
|
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
|
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
|
|
)
|
|
resp = get_app(pub).get('/api/formdefs/test/schema')
|
|
resp2 = get_app(pub).get('/test/schema')
|
|
resp3 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
|
|
resp4 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
|
|
|
|
# check schema
|
|
assert resp.json == resp2.json
|
|
assert set(resp.json.keys()) >= set(
|
|
[
|
|
'enable_tracking_codes',
|
|
'url_name',
|
|
'description',
|
|
'workflow',
|
|
'expiration_date',
|
|
'discussion',
|
|
'has_captcha',
|
|
'always_advertise',
|
|
'name',
|
|
'disabled',
|
|
'only_allow_one',
|
|
'fields',
|
|
'keywords',
|
|
'publication_date',
|
|
'detailed_emails',
|
|
'disabled_redirection',
|
|
]
|
|
)
|
|
assert resp.json['name'] == 'test'
|
|
|
|
# fields checks
|
|
assert resp.json['fields'][0]['label'] == 'foobar'
|
|
assert resp.json['fields'][0]['type'] == 'string'
|
|
|
|
assert resp.json['fields'][1]['label'] == 'foobar1'
|
|
assert resp.json['fields'][1]['type'] == 'item'
|
|
|
|
# check structured items are only exported for authenticated callers
|
|
assert resp.json['fields'][1]['items'] == []
|
|
assert resp.json['fields'][2]['items'] == []
|
|
assert 'structured_items' not in resp.json['fields'][1]
|
|
assert 'structured_items' not in resp.json['fields'][2]
|
|
|
|
assert len(resp3.json['fields'][1]['structured_items']) == 3
|
|
assert resp3.json['fields'][1]['structured_items'][0]['id'] == 0
|
|
assert resp3.json['fields'][1]['structured_items'][0]['text'] == u'zéro'
|
|
assert resp3.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
|
|
assert resp3.json['fields'][1]['items'][0] == u'zéro'
|
|
|
|
assert resp3.json['fields'][2]['label'] == 'foobar2'
|
|
assert resp3.json['fields'][2]['type'] == 'items'
|
|
assert len(resp3.json['fields'][2]['structured_items']) == 10
|
|
assert resp3.json['fields'][2]['structured_items'][0]['id'] == 0
|
|
assert resp3.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
|
|
assert resp3.json['fields'][2]['structured_items'][0]['foo'] == 0
|
|
assert resp3.json['fields'][2]['items'][0] == 'label 0'
|
|
|
|
# if structured_items fails no values
|
|
assert 'structured_items' not in resp4.json['fields'][1]
|
|
assert resp4.json['fields'][1]['items'] == []
|
|
|
|
# workflow checks
|
|
assert len(resp.json['workflow']['statuses']) == 3
|
|
assert resp.json['workflow']['statuses'][0]['id'] == 'st1'
|
|
assert resp.json['workflow']['statuses'][0]['endpoint'] is False
|
|
assert resp.json['workflow']['statuses'][0]['waitpoint'] is True
|
|
assert resp.json['workflow']['statuses'][1]['id'] == 'st2'
|
|
assert resp.json['workflow']['statuses'][1]['endpoint'] is False
|
|
assert resp.json['workflow']['statuses'][1]['waitpoint'] is False
|
|
assert resp.json['workflow']['statuses'][2]['id'] == 'st3'
|
|
assert resp.json['workflow']['statuses'][2]['endpoint'] is True
|
|
assert resp.json['workflow']['statuses'][2]['waitpoint'] is True
|
|
assert len(resp.json['workflow']['fields']) == 1
|
|
|
|
assert resp.json['workflow']['fields'][0]['label'] == '1st backoffice field'
|
|
|
|
get_app(pub).get('/api/formdefs/xxx/schema', status=404)
|
|
|
|
|
|
def test_post_invalid_json(pub, local_user):
|
|
resp = get_app(pub).post(
|
|
'/api/formdefs/test/submit', params='not a json payload', content_type='application/json', status=400
|
|
)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_class'] == 'Invalid request'
|
|
|
|
|
|
def test_formdef_submit(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [fields.StringField(id='0', label='foobar')]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'unsigned API call'
|
|
|
|
def url():
|
|
signed_url = sign_url(
|
|
'http://example.net/api/formdefs/test/submit'
|
|
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234',
|
|
)
|
|
return signed_url[len('http://example.net') :]
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert resp.json['err'] == 0
|
|
assert resp.json['data']['url'] == ('http://example.net/test/%s/' % resp.json['data']['id'])
|
|
assert resp.json['data']['backoffice_url'] == (
|
|
'http://example.net/backoffice/management/test/%s/' % resp.json['data']['id']
|
|
)
|
|
assert resp.json['data']['api_url'] == ('http://example.net/api/forms/test/%s/' % resp.json['data']['id'])
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
|
|
local_user2 = get_publisher().user_class()
|
|
local_user2.name = 'Test'
|
|
local_user2.email = 'foo@localhost'
|
|
local_user2.store()
|
|
resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
|
|
assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
|
|
|
|
resp = get_app(pub).post(
|
|
url(), json.dumps({'data': {}}), status=400
|
|
) # missing Content-Type: application/json header
|
|
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
|
|
|
|
# check qualified content type are recognized
|
|
resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8')
|
|
assert resp.json['data']['url']
|
|
|
|
formdef.disabled = True
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'disabled form'
|
|
|
|
formdef.disabled = False
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
formdef.backoffice_submission_roles = ['xx']
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
formdef.backoffice_submission_roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
|
assert data_class.get(resp.json['data']['id']).user_id is None
|
|
assert data_class.get(resp.json['data']['id']).submission_agent_id == str(local_user.id)
|
|
|
|
formdef.enable_tracking_codes = True
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).tracking_code
|
|
|
|
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).status == 'draft'
|
|
|
|
resp = get_app(pub).post_json(
|
|
url(),
|
|
{
|
|
'meta': {'backoffice-submission': True},
|
|
'data': {},
|
|
'context': {'channel': 'mail', 'comments': 'blah'},
|
|
},
|
|
)
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
|
assert data_class.get(resp.json['data']['id']).user_id is None
|
|
assert data_class.get(resp.json['data']['id']).submission_context == {'comments': 'blah'}
|
|
assert data_class.get(resp.json['data']['id']).submission_channel == 'mail'
|
|
|
|
data_class.wipe()
|
|
|
|
|
|
def test_formdef_submit_only_one(pub, local_user):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.only_allow_one = True
|
|
formdef.fields = [fields.StringField(id='0', label='foobar')]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
def url():
|
|
signed_url = sign_url(
|
|
'http://example.net/api/formdefs/test/submit'
|
|
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234',
|
|
)
|
|
return signed_url[len('http://example.net') :]
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
|
|
assert data_class.count() == 1
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
|
|
|
|
formdata = data_class.select()[0]
|
|
formdata.user_id = '1000' # change owner
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.count() == 2
|
|
|
|
|
|
def test_formdef_submit_with_varname(pub, local_user):
|
|
NamedDataSource.wipe()
|
|
data_source = NamedDataSource(name='foobar')
|
|
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]
|
|
data_source.data_source = {'type': 'formula', 'value': repr(source)}
|
|
data_source.store()
|
|
|
|
data_source = NamedDataSource(name='foobar_jsonp')
|
|
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
|
|
data_source.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar0', varname='foobar0'),
|
|
fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}),
|
|
fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}),
|
|
fields.DateField(id='3', label='foobar3', varname='date'),
|
|
fields.FileField(id='4', label='foobar4', varname='file'),
|
|
fields.MapField(id='5', label='foobar5', varname='map'),
|
|
fields.StringField(id='6', label='foobar6', varname='foobar6'),
|
|
]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'unsigned API call'
|
|
|
|
signed_url = sign_url(
|
|
'http://example.net/api/formdefs/test/submit'
|
|
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234',
|
|
)
|
|
url = signed_url[len('http://example.net') :]
|
|
payload = {
|
|
'data': {
|
|
'foobar0': 'xxx',
|
|
'foobar1': '1',
|
|
'foobar1_structured': {
|
|
'id': '1',
|
|
'text': 'foo',
|
|
'more': 'XXX',
|
|
},
|
|
'foobar2': 'bar',
|
|
'foobar2_raw': '10',
|
|
'date': '1970-01-01',
|
|
'file': {
|
|
'filename': 'test.txt',
|
|
'content': force_text(base64.b64encode(b'test')),
|
|
},
|
|
'map': {
|
|
'lat': 1.5,
|
|
'lon': 2.25,
|
|
},
|
|
}
|
|
}
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
|
|
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
|
|
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
|
|
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
|
|
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
|
|
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
|
|
(1970, 1, 1, 0, 0, 0, 3, 1, -1)
|
|
)
|
|
|
|
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
|
|
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test'
|
|
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
|
|
# test bijectivity
|
|
assert (
|
|
formdef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3'])
|
|
== payload['data']['date']
|
|
)
|
|
for k in payload['data']['file']:
|
|
data = data_class.get(resp.json['data']['id']).data['4']
|
|
assert formdef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
|
|
assert (
|
|
formdef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5'])
|
|
== payload['data']['map']
|
|
)
|
|
|
|
data_class.wipe()
|
|
|
|
|
|
def test_formdef_submit_from_wscall(pub, local_user):
|
|
test_formdef_submit_with_varname(pub, local_user)
|
|
formdef = FormDef.select()[0]
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.id = '2'
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
|
|
]
|
|
workflow.store()
|
|
formdef.workflow = workflow
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'test'])
|
|
|
|
formdata.data = {
|
|
'0': 'xxx',
|
|
'1': '1',
|
|
'1_display': '1',
|
|
'1_structured': {
|
|
'id': '1',
|
|
'text': 'foo',
|
|
'more': 'XXX',
|
|
},
|
|
'2': '10',
|
|
'2_display': 'bar',
|
|
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
|
|
'4': upload,
|
|
'5': '1.5;2.25',
|
|
'bo1': 'backoffice field',
|
|
}
|
|
formdata.just_created()
|
|
formdata.evolution[-1].status = 'wf-new'
|
|
formdata.store()
|
|
|
|
payload = json.loads(json.dumps(formdata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net') :]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data['0'] == formdata.data['0']
|
|
assert new_formdata.data['1'] == formdata.data['1']
|
|
assert new_formdata.data['1_display'] == formdata.data['1_display']
|
|
assert new_formdata.data['1_structured'] == formdata.data['1_structured']
|
|
assert new_formdata.data['2'] == formdata.data['2']
|
|
assert new_formdata.data['2_display'] == formdata.data['2_display']
|
|
assert new_formdata.data['3'] == formdata.data['3']
|
|
assert new_formdata.data['4'].get_content() == formdata.data['4'].get_content()
|
|
assert new_formdata.data['5'] == formdata.data['5']
|
|
assert new_formdata.data['bo1'] == formdata.data['bo1']
|
|
assert not new_formdata.data.get('6')
|
|
assert new_formdata.user_id is None
|
|
|
|
# add an extra attribute
|
|
payload['extra'] = {'foobar6': 'YYY'}
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net') :]
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data['0'] == formdata.data['0']
|
|
assert new_formdata.data['6'] == 'YYY'
|
|
|
|
# add user
|
|
formdata.user_id = local_user.id
|
|
formdata.store()
|
|
|
|
payload = json.loads(json.dumps(formdata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net') :]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert str(new_formdata.user_id) == str(local_user.id)
|
|
|
|
# test missing map data
|
|
del formdata.data['5']
|
|
|
|
payload = json.loads(json.dumps(formdata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net') :]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data.get('5') is None
|
|
|
|
|
|
def test_formdef_submit_structured(pub, local_user):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.ItemField(
|
|
id='0',
|
|
label='foobar',
|
|
varname='foobar',
|
|
data_source={
|
|
'type': 'json',
|
|
'value': 'http://datasource.com',
|
|
},
|
|
),
|
|
fields.ItemField(
|
|
id='1',
|
|
label='foobar1',
|
|
varname='foobar1',
|
|
data_source={
|
|
'type': 'formula',
|
|
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
|
|
},
|
|
),
|
|
]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
def url():
|
|
signed_url = sign_url(
|
|
'http://example.net/api/formdefs/test/submit'
|
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234',
|
|
)
|
|
return signed_url[len('http://example.net') :]
|
|
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO(
|
|
'''\
|
|
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
|
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
|
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
|
|
)
|
|
resp = get_app(pub).post_json(
|
|
url(),
|
|
{
|
|
'data': {
|
|
'0': '0',
|
|
"1": '3',
|
|
}
|
|
},
|
|
)
|
|
|
|
formdata = data_class.get(resp.json['data']['id'])
|
|
assert formdata.status == 'wf-new'
|
|
assert formdata.data['0'] == '0'
|
|
assert formdata.data['0_display'] == 'zéro'
|
|
assert formdata.data['0_structured'] == {
|
|
'id': 0,
|
|
'text': 'zéro',
|
|
'foo': 'bar',
|
|
}
|
|
assert formdata.data['1'] == '3'
|
|
assert formdata.data['1_display'] == 'label 3'
|
|
assert formdata.data['1_structured'] == {
|
|
'id': 3,
|
|
'text': 'label 3',
|
|
'foo': 3,
|
|
}
|
|
|
|
data_class.wipe()
|