wcs/tests/test_api.py

2461 lines
95 KiB
Python

# -*- coding: utf-8 -*-
from StringIO import StringIO
import pytest
import json
import shutil
import os
import hmac
import base64
import hashlib
import mock
import re
import urllib
import urlparse
import datetime
import time
import json
import sys
from quixote import cleanup, get_publisher
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import PicklableUpload
from wcs.users import User
from wcs.roles import Role
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs import fields, qommon
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
from wcs.qommon.errors import AccessForbiddenError
from utilities import get_app, create_temporary_pub, clean_temporary_pub
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
file(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''\
[api-secrets]
coucou = 1234
''')
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def local_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.name_identifiers = ['0123456789']
user.store()
return user
@pytest.fixture
def admin_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'John Doe Admin'
user.email = 'john.doe@example.com'
user.name_identifiers = ['0123456789']
user.is_admin = True
user.store()
return user
def sign_uri(uri, user=None, format='json'):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if query:
query += '&'
if format:
query += 'format=%s&' % format
query += 'orig=coucou&algo=sha256&timestamp=' + timestamp
if user:
query += '&email=' + urllib.quote(user.email)
query += '&signature=%s' % urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha256).digest()))
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def test_user_page_redirect(pub):
output = get_app(pub).get('/user')
assert output.headers.get('location') == 'http://example.net/myspace/'
def test_user_page_error(pub):
# check we get json as output for errors
output = get_app(pub).get('/api/user/', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_user_page_error_when_json_and_no_user(pub):
output = get_app(pub).get('/api/user/?format=json', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_missing_orig(pub):
output = get_app(pub).get('/api/user/?format=json&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple orig field'
def test_get_user_from_api_query_string_error_invalid_orig(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coin&signature=xxx', status=403)
assert output.json['err_desc'] == 'invalid orig'
def test_get_user_from_api_query_string_error_missing_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple algo field'
def test_get_user_from_api_query_string_error_invalid_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=coin', status=403)
assert output.json['err_desc'] == 'invalid algo'
def test_get_user_from_api_query_string_error_invalid_signature(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=sha1', status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
'format=json&orig=coucou&algo=sha1',
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?format=json&orig=coucou&algo=sha1&signature=%s' % signature, status=403)
assert output.json['err_desc'] == 'missing/multiple timestamp field'
def test_get_user_from_api_query_string_error_missing_email(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'unknown NameID'
def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
# check it's ok to sign an URL without specifiying an user if the endpoint
# works fine without user.
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub):
# check the categories and forms endpoints accept an unknown NameID
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new('1234',
query,
hashlib.sha256).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_sign_url(pub, local_user):
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'12345'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url, status=403)
def test_get_user(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
def test_is_url_signed_check_nonce(pub, local_user):
ORIG = 'xxx'
KEY = 'xxx'
pub.site_options.add_section('api-secrets')
pub.site_options.set('api-secrets', ORIG, KEY)
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
# test clean_nonces do not bark when nonces directory is empty
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
pub.clean_nonces()
signed_url = sign_url('?format=json&orig=%s&email=%s'
% (ORIG, urllib.quote(local_user.email)), KEY)
req = HTTPRequest(None, {'SCRIPT_NAME': '/',
'SERVER_NAME': 'example.net',
'QUERY_STRING': signed_url[1:]})
req.process_inputs()
pub.set_app_dir(req)
pub._set_request(req)
assert is_url_signed()
with pytest.raises(AccessForbiddenError) as exc_info:
req.signed = False
is_url_signed()
assert exc_info.value.public_msg == 'nonce already used'
# test that clean nonces works
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
pub.clean_nonces(delta=0)
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
pub.clean_nonces(delta=0, now=time.time() + 2 * DEFAULT_DURATION)
assert not os.listdir(os.path.join(pub.app_dir, 'nonces'))
def test_get_user_compat_endpoint(pub, local_user):
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
def test_formdef_list(pub):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.keywords = 'mobile, test'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
resp1 = get_app(pub).get('/json')
resp2 = get_app(pub).get('/', headers={'Accept': 'application/json'})
resp3 = get_app(pub).get('/api/formdefs/')
assert resp1.json == resp2.json == resp3.json
assert resp1.json['data'][0]['title'] == 'test'
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
assert resp1.json['data'][0]['redirection'] == False
assert resp1.json['data'][0]['description'] == 'plop'
assert resp1.json['data'][0]['keywords'] == ['mobile', 'test']
assert resp1.json['data'][0]['functions'].keys() == ['_receiver']
assert resp1.json['data'][0]['functions']['_receiver']['label'] == 'Recipient'
assert resp1.json['data'][0]['functions']['_receiver']['role']['slug'] == role.slug
assert resp1.json['data'][0]['functions']['_receiver']['role']['name'] == role.name
assert 'count' not in resp1.json['data'][0]
# backoffice_submission formdef : none
resp1 = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp1.json['err'] == 0
assert len(resp1.json['data']) == 0
def test_limited_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
resp = get_app(pub).get('/api/formdefs/')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['authentication_required'] is False
# not present in backoffice-submission formdefs
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# check it's not advertised
formdef.roles = [role.id]
formdef.store()
resp = get_app(pub).get('/api/formdefs/')
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
assert resp.json == resp2.json == resp3.json == resp4.json
# still not present in backoffice-submission formdefs
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
local_user.roles = []
local_user.store()
# check it's also included in anonymous/signed calls, but marked for
# authentication
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp.json['data'][0]
assert resp.json['data'][0]['authentication_required'] is True
# check it's advertised
formdef.always_advertise = True
formdef.store()
resp = get_app(pub).get('/api/formdefs/')
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['authentication_required']
assert resp.json == resp2.json == resp3.json == resp4.json
formdef.required_authentication_contexts = ['fedict']
formdef.store()
resp = get_app(pub).get('/api/formdefs/')
assert resp.json['data'][0]['required_authentication_contexts'] == ['fedict']
def test_formdef_list_redirection(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.disabled = True
formdef.disabled_redirection = 'http://example.net'
formdef.fields = []
formdef.store()
resp1 = get_app(pub).get('/json')
assert resp1.json['err'] == 0
assert resp1.json['data'][0]['title'] == 'test'
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
assert resp1.json['data'][0]['redirection'] == True
assert 'count' not in resp1.json['data'][0]
def test_backoffice_submission_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
formdef2 = FormDef()
formdef2.name = 'ignore me'
formdef2.fields = []
formdef2.store()
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# check it's not advertised ...
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# even if it's advertised on frontoffice
formdef.always_advertise = True
formdef.store()
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# ... unless user is admin
local_user.is_admin = True
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
local_user.is_admin = False
local_user.store()
# ... unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert 'backoffice_submission_url' in resp.json['data'][0]
# but not advertised if it's a redirection
formdef.disabled = True
formdef.disabled_redirection = 'http://example.net'
formdef.store()
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
def test_formdef_schema(pub):
Workflow.wipe()
workflow = Workflow(name='test')
workflow.add_status('Status1', 'st1')
workflow.add_status('Status2', 'st2')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar'),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={
'type': 'json',
'value': 'http://datasource.com',
}),
fields.ItemsField(id='2', label='foobar2', varname='foobar2',
data_source={
'type': 'formula',
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
}),
]
formdef.workflow_id = workflow.id
formdef.store()
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
resp = get_app(pub).get('/api/formdefs/test/schema')
resp2 = get_app(pub).get('/test/schema')
resp3 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
resp4 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
# check schema
assert resp.json == resp2.json
assert set(resp.json.keys()) >= set(['enable_tracking_codes', 'url_name', 'description',
'workflow', 'expiration_date', 'discussion',
'last_modification_time', 'has_captcha',
'always_advertise', 'name', 'disabled', 'only_allow_one',
'private_status_and_history', 'fields', 'keywords',
'publication_date', 'detailed_emails',
'disabled_redirection'])
assert resp.json['name'] == 'test'
# fields checks
assert resp.json['fields'][0]['label'] == 'foobar'
assert resp.json['fields'][0]['type'] == 'string'
assert resp.json['fields'][1]['label'] == 'foobar1'
assert resp.json['fields'][1]['type'] == 'item'
# check structured items are only exported for authenticated callers
assert resp.json['fields'][1]['items'] == []
assert resp.json['fields'][2]['items'] == []
assert 'structured_items' not in resp.json['fields'][1]
assert 'structured_items' not in resp.json['fields'][2]
assert len(resp3.json['fields'][1]['structured_items']) == 3
assert resp3.json['fields'][1]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][1]['structured_items'][0]['text'] == u'zéro'
assert resp3.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
assert resp3.json['fields'][1]['items'][0] == u'zéro'
assert resp3.json['fields'][2]['label'] == 'foobar2'
assert resp3.json['fields'][2]['type'] == 'items'
assert len(resp3.json['fields'][2]['structured_items']) == 10
assert resp3.json['fields'][2]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
assert resp3.json['fields'][2]['structured_items'][0]['foo'] == 0
assert resp3.json['fields'][2]['items'][0] == 'label 0'
# if structured_items fails no values
assert 'structured_items' not in resp4.json['fields'][1]
assert resp4.json['fields'][1]['items'] == []
# workflow checks
assert len(resp.json['workflow']['statuses']) == 2
assert len(resp.json['workflow']['fields']) == 1
assert resp.json['workflow']['fields'][0]['label'] == '1st backoffice field'
get_app(pub).get('/api/formdefs/xxx/schema', status=404)
def test_post_invalid_json(pub, local_user):
resp = get_app(pub).post('/api/formdefs/test/submit',
params='not a json payload',
content_type='application/json',
status=400)
assert resp.json['err'] == 1
assert resp.json['err_class'] == 'Invalid request'
def test_formdef_submit(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
data_class = formdef.data_class()
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
def url():
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
return signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url(), {'data': {}})
assert resp.json['err'] == 0
assert resp.json['data']['url'] == ('http://example.net/test/%s/' % resp.json['data']['id'])
assert resp.json['data']['backoffice_url'] == ('http://example.net/backoffice/management/test/%s/' % resp.json['data']['id'])
assert resp.json['data']['api_url'] == ('http://example.net/api/forms/test/%s/' % resp.json['data']['id'])
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
resp = get_app(pub).post(url(), json.dumps({'data': {}}), status=400) # missing Content-Type: application/json header
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
formdef.disabled = True
formdef.store()
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'disabled form'
formdef.disabled = False
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = ['xx']
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
assert data_class.get(resp.json['data']['id']).submission_context.get('agent_id') == local_user.id
formdef.enable_tracking_codes = True
formdef.store()
resp = get_app(pub).post_json(url(), {'data': {}})
assert data_class.get(resp.json['data']['id']).tracking_code
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'draft'
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
'context': {'channel': 'mail', 'comments': 'blah'} })
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
assert data_class.get(resp.json['data']['id']).submission_context == {
'comments': 'blah', 'agent_id': local_user.id}
assert data_class.get(resp.json['data']['id']).submission_channel == 'mail'
data_class.wipe()
def test_formdef_submit_only_one(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.only_allow_one = True
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
data_class = formdef.data_class()
def url():
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234')
return signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url(), {'data': {}})
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.count() == 1
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
formdata = data_class.select()[0]
formdata.user_id = '1000' # change owner
formdata.store()
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.count() == 2
def test_formdef_submit_with_varname(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'},
{'id': '2', 'text': 'bar', 'more': 'YYY'}]
data_source.data_source = {'type': 'formula', 'value': repr(source)}
data_source.store()
data_source = NamedDataSource(name='foobar_jsonp')
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar0', varname='foobar0'),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={'type': 'foobar'}),
fields.ItemField(id='2', label='foobar2', varname='foobar2',
data_source={'type': 'foobar_jsonp'}),
fields.DateField(id='3', label='foobar3', varname='date'),
fields.FileField(id='4', label='foobar4', varname='file'),
fields.MapField(id='5', label='foobar5', varname='map'),
fields.StringField(id='6', label='foobar6', varname='foobar6'),
]
formdef.store()
data_class = formdef.data_class()
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
url = signed_url[len('http://example.net'):]
payload = {
'data':
{
'foobar0': 'xxx',
'foobar1': '1',
'foobar1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'foobar2': 'bar',
'foobar2_raw': '10',
'date': '1970-01-01',
'file': {
'filename': 'test.txt',
'content': base64.b64encode('test'),
},
'map': {
'lat': 1.5,
'lon': 2.25,
},
}
}
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
(1970, 1, 1, 0, 0, 0, 3, 1, -1))
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == 'test'
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
# test bijectivity
assert (formdef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3']) ==
payload['data']['date'])
for k in payload['data']['file']:
data = data_class.get(resp.json['data']['id']).data['4']
assert formdef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
assert (formdef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5']) ==
payload['data']['map'])
data_class.wipe()
def test_formdef_submit_from_wscall(pub, local_user):
test_formdef_submit_with_varname(pub, local_user)
formdef = FormDef.select()[0]
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
formdef.workflow = workflow
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['test'])
formdata.data = {
'0': 'xxx',
'1': '1',
'1_display': '1',
'1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'2': '10',
'2_display': 'bar',
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
'4': upload,
'5': '1.5;2.25',
'bo1': 'backoffice field',
}
formdata.just_created()
formdata.evolution[-1].status = 'wf-new'
formdata.store()
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder,
encoding=get_publisher().site_charset))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data['0'] == formdata.data['0']
assert new_formdata.data['1'] == formdata.data['1']
assert new_formdata.data['1_display'] == formdata.data['1_display']
assert new_formdata.data['1_structured'] == formdata.data['1_structured']
assert new_formdata.data['2'] == formdata.data['2']
assert new_formdata.data['2_display'] == formdata.data['2_display']
assert new_formdata.data['3'] == formdata.data['3']
assert new_formdata.data['4'].get_content() == formdata.data['4'].get_content()
assert new_formdata.data['5'] == formdata.data['5']
assert new_formdata.data['bo1'] == formdata.data['bo1']
assert not new_formdata.data.get('6')
assert new_formdata.user_id is None
# add an extra attribute
payload['extra'] = {'foobar6': 'YYY'}
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data['0'] == formdata.data['0']
assert new_formdata.data['6'] == 'YYY'
# add user
formdata.user_id = local_user.id
formdata.store()
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder,
encoding=get_publisher().site_charset))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert str(new_formdata.user_id) == str(local_user.id)
# test missing map data
del formdata.data['5']
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder,
encoding=get_publisher().site_charset))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data.get('5') is None
def test_categories(pub):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
resp = get_app(pub).get('/api/categories/', headers={'Accept': 'application/json'})
assert resp.json['data'] == [] # no advertised forms
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'mobile, test'
formdef.store()
formdef.data_class().wipe()
formdef = FormDef()
formdef.name = 'test 2'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'foobar'
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/api/categories/')
resp2 = get_app(pub).get('/categories', headers={'Accept': 'application/json'})
assert resp.json == resp2.json
assert resp.json['data'][0]['title'] == 'Category'
assert resp.json['data'][0]['url'] == 'http://example.net/category/'
assert resp.json['data'][0]['description'] == '<p>hello world</p>'
assert set(resp.json['data'][0]['keywords']) == set(['foobar', 'mobile', 'test'])
assert not 'forms' in resp.json['data'][0]
# check HTML description
category.description = '<p><strong>hello world</strong></p>'
category.store()
resp = get_app(pub).get('/api/categories/')
assert resp.json['data'][0]['description'] == category.description
def test_categories_private(pub, local_user):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
# open form
resp = get_app(pub).get('/api/categories/')
assert len(resp.json['data']) == 1
# private form, the category doesn't appear anymore
formdef.roles = ['plop']
formdef.store()
resp = get_app(pub).get('/api/categories/')
assert len(resp.json['data']) == 0
# not even for a signed request specifying an user
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
assert len(resp.json['data']) == 0
# but it appears if this is a signed request without user
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/'))
assert len(resp.json['data']) == 1
# or signed with an authorised user
local_user.roles = ['plop']
local_user.store()
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
assert len(resp.json['data']) == 1
def test_categories_formdefs(pub, local_user):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'mobile, test'
formdef.store()
formdef.data_class().wipe()
formdef = FormDef()
formdef.name = 'test 2'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'foobar'
formdef.store()
formdef.data_class().wipe()
formdef2 = FormDef()
formdef2.name = 'other test'
formdef2.category_id = None
formdef2.fields = []
formdef2.store()
formdef2.data_class().wipe()
resp = get_app(pub).get('/api/categories/category/formdefs/')
resp2 = get_app(pub).get('/category/json')
assert resp.json == resp2.json
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
assert resp.json['data'][0]['title'] == 'test'
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
assert resp.json['data'][0]['redirection'] == False
assert resp.json['data'][0]['category'] == 'Category'
assert resp.json['data'][0]['category_slug'] == 'category'
assert 'count' not in resp.json['data'][0]
resp = get_app(pub).get('/api/categories/category/formdefs/?include-count=on')
assert resp.json['data'][0]['title'] == 'test'
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
assert resp.json['data'][0]['count'] == 0
get_app(pub).get('/api/categories/XXX/formdefs/', status=404)
resp = get_app(pub).get('/api/categories/category/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = []
local_user.store()
# check it's not advertised ...
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).get('/api/categories/category/formdefs/?backoffice-submission=on')
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri(
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# ... unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri(
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
def test_categories_full(pub):
test_categories(pub)
resp = get_app(pub).get('/api/categories/?full=on')
assert len(resp.json['data'][0]['forms']) == 2
assert resp.json['data'][0]['forms'][0]['title'] == 'test'
assert resp.json['data'][0]['forms'][1]['title'] == 'test 2'
def test_formdata(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr(
[{'id': '1', 'text': 'foo', 'more': 'XXX'},
{'id': '2', 'text': 'bar', 'more': 'YYY'}])}
data_source.store()
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
formdef = FormDef()
formdef.geolocations = {'base': 'blah'}
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.DateField(id='2', label='foobar3', varname='date'),
fields.FileField(id='3', label='foobar4', varname='file'),
fields.ItemField(id='4', label='foobar5', varname='item',
data_source={'type': 'foobar'}),
]
workflow = Workflow.get_default_workflow()
workflow.roles['_foobar'] = 'Foobar'
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id,
'_foobar': another_role.id}
formdef.store()
item_field = formdef.fields[4]
formdef.data_class().wipe()
formdata = formdef.data_class()()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {
'0': 'foo@localhost',
'1': 'xxx',
'2': date,
'3': upload,
'4': '1',
}
formdata.data['4_display'] = item_field.store_display_value(
formdata.data, item_field.id)
formdata.data['4_structured'] = item_field.store_structured_value(
formdata.data, item_field.id)
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.geolocations = {'base': {'lon': 10, 'lat': -12}}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
assert 'last_update_time' in resp.json
assert len(resp.json['fields']) == 6
assert 'foobar' in resp.json['fields']
assert 'foobar2' not in resp.json['fields'] # foobar2 has no varname, not in json
assert resp.json['user']['name'] == local_user.name
assert resp.json['fields']['foobar'] == 'foo@localhost'
assert resp.json['fields']['date'] == '2014-01-20'
assert resp.json['fields']['file']['content'] == 'YmFzZTY0bWU=' # base64('base64me')
assert resp.json['fields']['file']['filename'] == 'test.txt'
assert resp.json['fields']['file']['content_type'] == 'text/plain'
assert resp.json['fields']['item'] == 'foo'
assert resp.json['fields']['item_raw'] == '1'
assert resp.json['fields']['item_structured'] == {'id': '1', 'text': 'foo', 'more': 'XXX'}
assert resp.json['workflow']['status']['name'] == 'New'
assert resp.json['submission']['channel'] == 'web'
assert resp.json['geolocations']['base']['lon'] == 10
assert resp.json['geolocations']['base']['lat'] == -12
assert [x.get('id') for x in resp.json['roles']['_receiver']] == [str(role.id)]
assert [x.get('id') for x in resp.json['roles']['_foobar']] == [str(another_role.id)]
assert set([x.get('id') for x in resp.json['roles']['concerned']]) == set([str(role.id), str(another_role.id)])
assert [x.get('id') for x in resp.json['roles']['actions']] == [str(role.id)]
# check the ?format=json endpoint returns 403
resp = get_app(pub).get('/test/%s/?format=json' % formdata.id, status=403)
resp2 = get_app(pub).get(sign_uri('/test/%s/' % formdata.id,
user=local_user), status=403)
def test_formdata_backoffice_fields(pub, local_user):
test_formdata(pub, local_user)
workflow = Workflow.get(2)
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
formdef = FormDef.select()[0]
formdata = formdef.data_class().select()[0]
formdata.data['bo1'] = 'Hello world'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['fields']['backoffice_blah'] == 'Hello world'
def test_formdata_edit(pub, local_user):
test_formdata(pub, local_user)
formdef = FormDef.select()[0]
formdata = formdef.data_class().select()[0]
workflow = formdef.workflow
# not user
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id),
{'data': {'0': 'bar@localhost'}},
status=403)
# no editable action
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403)
wfedit = EditableWorkflowStatusItem()
wfedit.id = '_wfedit'
wfedit.by = [local_user.roles[0]]
workflow.possible_status[1].items.append(wfedit)
wfedit.parent = workflow.possible_status[1]
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=200)
assert formdef.data_class().select()[0].data['0'] == 'bar@localhost'
# not editable by user role
wfedit.by = ['XX']
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403)
# edit + jump
wfedit.status = 'rejected'
wfedit.by = [local_user.roles[0]]
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar2@localhost'}},
status=200)
assert formdef.data_class().select()[0].data['0'] == 'bar2@localhost'
assert formdef.data_class().select()[0].status == 'wf-rejected'
def test_formdata_with_workflow_data(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
from qommon.form import PicklableUpload as PicklableUpload2
from wcs.qommon.form import PicklableUpload as PicklableUpload3
upload = PicklableUpload2('test.txt', 'text/plain', 'ascii')
upload.receive(['test'])
upload2 = PicklableUpload3('test.txt', 'text/plain', 'ascii')
upload2.receive(['test'])
formdata.workflow_data = {'blah': upload, 'blah2': upload2, 'xxx': 23}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['data']['xxx'] == 23
assert resp.json['workflow']['data']['blah']['filename'] == 'test.txt'
assert resp.json['workflow']['data']['blah']['content_type'] == 'text/plain'
assert base64.decodestring(resp.json['workflow']['data']['blah']['content']) == 'test'
assert base64.decodestring(resp.json['workflow']['data']['blah2']['content']) == 'test'
def test_user_by_nameid(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user),
status=404)
local_user.name_identifiers = ['xyz']
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
assert str(resp.json['id']) == str(local_user.id)
def test_user_roles(pub, local_user):
local_user.name_identifiers = ['xyz']
local_user.store()
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
assert len(resp.json['user_roles']) == 1
assert resp.json['user_roles'][0]['name'] == 'Foo bar'
def test_user_forms(pub, local_user):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),]
formdef.keywords = 'hello, world'
formdef.disabled = False
formdef.enable_tracking_codes = True
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': 'xxx'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/forms', user=local_user))
resp3 = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['form_name'] == 'test'
assert resp.json['data'][0]['form_slug'] == 'test'
assert resp.json['data'][0]['form_status'] == 'New'
assert datetime.datetime.strptime(resp.json['data'][0]['form_receipt_datetime'],
'%Y-%m-%dT%H:%M:%S')
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
assert resp.json == resp2.json == resp3.json
resp = get_app(pub).get(sign_uri('/api/user/forms?full=on', user=local_user))
assert resp.json['err'] == 0
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&full=on', user=local_user))
assert resp.json == resp2.json
formdef.disabled = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
# check digest is part of contents
formdef.digest_template = 'XYZ'
formdef.data_class().get(formdata.id).store()
assert formdef.data_class().get(formdata.id).digest == 'XYZ'
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['data'][0]['form_digest'] == 'XYZ'
resp = get_app(pub).get(sign_uri('/api/user/forms?NameID=xxx'))
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&NameID=xxx'))
assert resp.json == resp2.json
formdata = formdef.data_class()()
formdata.user_id = local_user.id
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
formdef.disabled = False
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
draft_formdata = [x for x in resp.json['data'] if x['status'] == 'Draft'][0]
assert draft_formdata.get('url')[-1] != '/'
def test_user_forms_from_agent(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.store()
agent_user = get_publisher().user_class()
agent_user.name = 'Agent'
agent_user.email = 'agent@example.com'
agent_user.name_identifiers = ['ABCDE']
agent_user.roles = [role.id]
agent_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),]
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': 'xxx'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['form_name'] == 'test'
assert resp.json['data'][0]['form_slug'] == 'test'
assert resp.json['data'][0]['form_status'] == 'New'
assert resp.json['data'][0]['readable'] is False
formdef.skip_from_360_view = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert len(resp.json['data']) == 0
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.store()
formdef.data_class().rebuild_security()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert len(resp.json['data']) == 1
agent_user.roles = []
agent_user.store()
get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user), status=403)
def test_user_drafts(pub, local_user):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.FileField(id='2', label='foobar3', varname='file'),]
formdef.keywords = 'hello, world'
formdef.disabled = False
formdef.enable_tracking_codes = True
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
formdata = formdef.data_class()()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'foo@localhost', '1': 'xxx', '2': upload}
formdata.user_id = local_user.id
formdata.page_no = 1
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json == resp2.json
assert not 'fields' in resp.json['data'][0]
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
resp = get_app(pub).get(sign_uri('/api/user/drafts?full=on', user=local_user))
assert resp.json['err'] == 0
assert 'fields' in resp.json['data'][0]
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
assert 'file' not in resp.json['data'][0]['fields'] # no file export in full lists
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
formdef.enable_tracking_codes = False
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
formdef.enable_tracking_codes = True
formdef.disabled = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri('/api/user/drafts?NameID=xxx'))
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
resp2 = get_app(pub).get(sign_uri('/api/user/drafts?&NameID=xxx'))
assert resp.json == resp2.json
def test_api_list_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
items=['foo', 'bar', 'baz']),
fields.FileField(id='2', label='foobar4', varname='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i%4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i%4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
elif i%3 == 1:
formdata.jump_status('just_submitted')
else:
formdata.jump_status('finished')
if i%7 == 0:
formdata.backoffice_submission = True
formdata.submission_channel = 'mail'
formdata.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert not 'fields' in resp.json[0]
# check getting full formdata
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'user' in resp.json[0]
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
assert resp.json[0]['evolution'][0]['who']['id'] == local_user.id
assert all('status' in x['workflow'] for x in resp.json)
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['backoffice'] is True
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['channel'] == 'mail'
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['backoffice'] is False
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['channel'] == 'web'
# check filtered results
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=foo', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=bar', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=baz', user=local_user))
assert len(resp.json) == 14
# check filter on status
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=pending', user=local_user))
assert len(resp.json) == 20
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=done', user=local_user))
assert len(resp.json) == 10
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp.json) == 30
# check limit and offset
resp_all = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp_all.json) == 30
partial_resps = []
for i in range(0, 48, 12):
partial_resps.append(get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter=all&offset=%s&limit=12' % i,
user=local_user)))
assert len(partial_resps[0].json) == 12
assert len(partial_resps[1].json) == 12
assert len(partial_resps[2].json) == 6
assert len(partial_resps[3].json) == 0
resp_all_ids = [x.get('id') for x in resp_all.json]
resp_partial_ids = []
for resp in partial_resps:
resp_partial_ids.extend([x.get('id') for x in resp.json])
assert resp_all_ids == resp_partial_ids
# check error handling
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
def test_api_anonymized_formdata(pub, local_user, admin_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
items=['foo', 'bar', 'baz']),
fields.FileField(id='2', label='foobar4', varname='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i%4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i%4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
evo = Evolution()
evo.who = admin_user.id
evo.time = time.localtime()
evo.status = 'wf-%s' % 'finished'
formdata.evolution.append(evo)
formdata.status = evo.status
formdata.store()
# check access is granted even if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert not 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json[1]['evolution'][1]
assert 'id' in resp.json[1]['evolution'][1]['who']
assert 'email' in resp.json[1]['evolution'][1]['who']
assert 'NameID' in resp.json[1]['evolution'][1]['who']
assert 'name' in resp.json[1]['evolution'][1]['who']
# check access is granted event if there is no user
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on'))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert not 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check anonymise is enforced on detail view
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise&full=on' % resp.json[1]['id']))
assert 'receipt_time' in resp.json
assert 'fields' in resp.json
assert 'user' not in resp.json
assert 'file' not in resp.json['fields'] # no file export in detail
assert 'foobar3' in resp.json['fields']
assert 'foobar' not in resp.json['fields']
assert 'evolution' in resp.json
assert len(resp.json['evolution']) == 2
assert 'status' in resp.json['evolution'][0]
assert not 'who' in resp.json['evolution'][0]
assert 'time' in resp.json['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json['evolution'][1]
assert 'id' in resp.json['evolution'][1]['who']
assert 'email' in resp.json['evolution'][1]['who']
assert 'NameID' in resp.json['evolution'][1]['who']
assert 'name' in resp.json['evolution'][1]['who']
def test_api_geojson_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
fields.FileField(id='1', label='foobar1', type='file')
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?anonymise', user=local_user), status=403)
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive(['base64me'])
foobar = '<font color="red">FOO BAR</font>'
username = '<font color="red">Jean Darmette</font>'
data = {'0': foobar, '1': upload}
local_user.name = username
local_user.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
display_fields = resp.json['features'][0]['properties']['display_fields']
for field in display_fields:
if field['label'] == 'Number':
assert field['varname'] == 'id'
assert field['html_value'] == '1-28'
assert field['value'] == '1-28'
if field['label'] == 'User Label':
assert field['varname'] == 'user_label'
assert field['value'] == username
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;Jean Darmette&lt;/font&gt;"
if field['label'] == 'foobar':
assert field['varname'] == 'foobar'
assert field['value'] == foobar
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;FOO BAR&lt;/font&gt;"
if field['label'] == 'foobar1':
assert field['varname'] is None
assert field['value'] == "test.txt"
assert field['html_value'] == '<div class="file-field"><a download="test.txt" href="http://example.net/backoffice/management/test/28/download?f=1"><span>test.txt</span></a></div>'
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?filter=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
# check 404 if the formdef doesn't have geolocation support
formdef.geolocations = {}
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=404)
def test_api_global_geojson(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = []
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/geojson?status=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
def test_api_global_listing(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
role.store()
# check there's no crash if there are no formdefs
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user))
assert len(resp.json['data']) == 20
# check limit/offset
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=5&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=18&limit=5', user=local_user))
assert len(resp.json['data']) == 2
# check error handling
get_app(pub).get(sign_uri('/api/forms/?status=done&limit=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/?status=done&offset=plop', user=local_user), status=400)
def test_api_global_listing_ignored_roles(pub, local_user):
test_api_global_listing(pub, local_user)
role = Role(name='test2')
role.store()
formdef = FormDef()
formdef.name = 'test2'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# considering roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100', user=local_user))
assert len(resp.json['data']) == 30
# ignore roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 40
# check sensitive forms are not exposed
formdef.skip_from_360_view = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 30
@pytest.fixture
def ics_data(local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.url_name = 'test'
formdef.name = 'test\xc3\xa9'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2', varname='foobar2'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
date = datetime.datetime(2014, 1, 20, 12, 00)
for i in range(30):
formdata = data_class()
formdata.data = {'0': (date + datetime.timedelta(days=i)).strftime('%Y-%m-%d %H:%M')}
formdata.data['1'] = (date + datetime.timedelta(days=i, minutes=i+1)).strftime('%Y-%m-%d %H:%M')
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
def test_api_ics_formdata(pub, local_user, ics_data):
role = Role.select()[0]
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?anonymise', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
def remove_dtstamp(body):
# remove dtstamp as the precise timing may vary between two consecutive
# calls and we shouldn't care.
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/', user=local_user))
assert remove_dtstamp(resp.body) == remove_dtstamp(resp2.body)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.body.count('BEGIN:VEVENT') == 10
# check that description contains form name, display id, workflow status,
# backoffice url and attached user
pattern = re.compile('DESCRIPTION:testé \| 1-\d+ \| New', re.MULTILINE)
m = pattern.findall(resp.body)
assert len(m) == 10
assert resp.body.count('Jean Darmette') == 10
assert resp.body.count('DTSTART') == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?filter=done', user=local_user))
assert resp.body.count('BEGIN:VEVENT') == 20
pattern = re.compile('DESCRIPTION:testé \| 1-\d+ \| Finished', re.MULTILINE)
m = pattern.findall(resp.body)
assert len(m) == 20
assert resp.body.count('Jean Darmette') == 20
# check 404 on erroneous field var
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/xxx', user=local_user), status=404)
# check 404 on an erroneous field var for endtime
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/xxx', user=local_user), status=404)
# check 404 on too many path elements
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/xxx', user=local_user), status=404)
# check ics data with start and end varnames
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/', user=local_user))
assert remove_dtstamp(resp.body) == remove_dtstamp(resp2.body)
assert resp.body.count('DTSTART') == 10
assert resp.body.count('DTEND') == 10
def test_api_ics_formdata_http_auth(pub, local_user, ics_data):
role = Role.select()[0]
# no access
app = get_app(pub)
app.authorization = ('Basic', ('user', 'password'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
# add authentication info
pub.load_site_options()
pub.site_options.add_section('api-http-auth-ics')
pub.site_options.set('api-http-auth-ics', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
# check access is denied if the user has not the appropriate role
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=200)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.body.count('BEGIN:VEVENT') == 10
# check it fails with a different password
app.authorization = ('Basic', ('user', 'password2'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
def test_roles(pub, local_user):
Role.wipe()
role = Role(name='Hello World')
role.emails = ['toto@example.com', 'zozo@example.com']
role.details = 'kouign amann'
role.store()
resp = get_app(pub).get('/api/roles', status=403)
resp = get_app(pub).get(sign_uri('/api/roles'))
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
# also check old endpoint, for compatibility
resp = get_app(pub).get(sign_uri('/roles'), headers={'Accept': 'application/json'})
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
def test_users(pub, local_user):
resp = get_app(pub).get('/api/users/', status=403)
resp = get_app(pub).get(sign_uri('/api/users/'))
assert resp.json['data'][0]['user_display_name'] == local_user.name
assert resp.json['data'][0]['user_email'] == local_user.email
assert resp.json['data'][0]['user_id'] == local_user.id
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
assert resp.json['data'][0]['user_email'] == local_user.email
assert len(resp.json['data'][0]['user_roles']) == 1
assert resp.json['data'][0]['user_roles'][0]['name'] == 'Foo bar'
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
assert len(resp.json['data']) == 0
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields.append(fields.StringField(id='3', label='test', type='string'))
formdef.store()
local_user.form_data = {'3': 'HELLO'}
local_user.set_attributes_from_formdata(local_user.form_data)
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=HELLO'))
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
assert len(resp.json['data']) == 0
def test_users_unaccent(pub, local_user):
local_user.name = 'Jean Sénisme'
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=senisme'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=sénisme'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=blah'))
assert len(resp.json['data']) == 0
def test_workflow_trigger(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
status=200)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
Role.wipe()
role = Role(name='xxx')
role.store()
jump.by = [role.id]
workflow.store()
formdata.store() # (will get back to wf-st1)
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
status=403)
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
status=200)
def test_workflow_trigger_with_condition(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.condition = {'type': 'django', 'value': 'form_var_foo == "bar"'}
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foo', varname='foo')]
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'0': 'foo'}
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=403)
assert resp.json == {'err_desc': 'unmet condition', 'err': 1}
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
# check without json
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', format=None), status=403)
assert resp.content_type == 'text/html'
formdata.data['0'] = 'bar'
formdata.store()
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
def test_workflow_trigger_jump_once(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
st3 = workflow.add_status('Status3', 'st3')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st3'
st2.items.append(jump)
jump.parent = st2
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
assert formdef.data_class().get(formdata.id).status == 'wf-st3'
def test_tracking_code(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.enable_tracking_codes = True
formdef.store()
data_class = formdef.data_class()
formdata = data_class()
formdata.store()
code = get_publisher().tracking_code_class()
code.formdata = formdata
code.store()
# missing signature
get_app(pub).get('/api/code/foobar', status=403)
resp = get_app(pub).get(sign_url('/api/code/foobar?orig=coucou', '1234'), status=404)
assert resp.json['err'] == 1
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=200)
assert resp.json['err'] == 0
assert resp.json['url'] == 'http://example.net/test/%s' % formdata.id
assert resp.json['load_url'] == 'http://example.net/code/%s/load' % code.id
formdef.enable_tracking_codes = False
formdef.store()
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
formdef.enable_tracking_codes = True
formdef.store()
formdata.remove_self()
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
def test_validate_expression(pub):
resp = get_app(pub).get('/api/validate-expression?expression=hello')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression=[hello]')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression==[hello')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-expression?expression==[hello]')
assert resp.json['klass'] == 'warning'
assert resp.json['msg'].startswith('Make sure you want a Python expression,')
resp = get_app(pub).get('/api/validate-expression?expression==hello[0]')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression==hello[\'plop\']')
assert resp.json == {'klass': None, 'msg': ''}
# django with unicode
resp = get_app(pub).get('/api/validate-expression?expression={{hello+%C3%A9l%C3%A9phant}}')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error in Django template: Could not parse the remainder')
def test_validate_condition(pub):
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=~2')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello -')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-condition?type=python&value_python={{form_number}}==3')
assert resp.json['klass'] == 'error'
assert 'Python condition cannot contain {{' in resp.json['msg']
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=un+%C3%A9l%C3%A9phant')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith(u"syntax error: Unused 'éléphant'")
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=~2')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-condition?type=unknown&value_unknown=2')
assert resp.json['klass'] == 'error'
assert resp.json['msg'] == 'unknown condition type'
@pytest.fixture(params=['sql', 'pickle'])
def no_request_pub(request):
pub = create_temporary_pub(sql_mode=bool(request.param == 'sql'))
pub.app_dir = os.path.join(pub.APP_DIR, 'example.net')
pub.set_config()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''
[wscall-secrets]
api.example.com = 1234
''')
return pub
def test_get_secret_and_orig(no_request_pub):
secret, orig = get_secret_and_orig('https://api.example.com/endpoint/')
assert secret == '1234'
assert orig == 'example.net'
def test_reverse_geocoding(pub):
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'}))
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert resp.content_type == 'application/json'
assert resp.body == json.dumps({'address': 'xxx'})
assert urlopen.call_args[0][0] == 'http://nominatim.openstreetmap.org/reverse?zoom=18&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.add_section('options')
pub.site_options.set('options', 'nominatim_reverse_zoom_level', '16')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'http://nominatim.openstreetmap.org/reverse?zoom=16&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.set('options', 'nominatim_key', 'KEY')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'http://nominatim.openstreetmap.org/reverse?zoom=16&key=KEY&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.set('options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'http://reverse.example.net/?param=value&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
def test_formdef_submit_structured(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ItemField(id='0', label='foobar', varname='foobar',
data_source={
'type': 'json',
'value': 'http://datasource.com',
}),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={
'type': 'formula',
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
}),
]
formdef.store()
data_class = formdef.data_class()
def url():
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
return signed_url[len('http://example.net'):]
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
resp = get_app(pub).post_json(url(), {'data': {
'0': '0',
"1": '3',
}})
formdata = data_class.get(resp.json['data']['id'])
assert formdata.status == 'wf-new'
assert formdata.data['0'] == '0'
assert formdata.data['0_display'] == 'zéro'
assert formdata.data['0_structured'] == {
'id': 0,
'text': 'zéro',
'foo': 'bar',
}
assert formdata.data['1'] == '3'
assert formdata.data['1_display'] == 'label 3'
assert formdata.data['1_structured'] == {
'id': 3,
'text': 'label 3',
'foo': 3,
}
data_class.wipe()