wcs/tests/test_api.py

3276 lines
123 KiB
Python

# -*- coding: utf-8 -*-
import pytest
import json
import shutil
import os
import hmac
import base64
import hashlib
import mock
import re
import datetime
import time
import json
import sys
import xml.etree.ElementTree as ET
import zipfile
from django.utils.encoding import force_bytes, force_text
from django.utils.six import StringIO, BytesIO
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup, get_publisher
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import PicklableUpload
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon import ods
from wcs.users import User
from wcs.roles import Role
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef, WorkflowVariablesFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs import fields, qommon
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
from wcs.qommon.errors import AccessForbiddenError
from utilities import get_app, create_temporary_pub, clean_temporary_pub, login
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''\
[api-secrets]
coucou = 1234
''')
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def local_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.name_identifiers = ['0123456789']
user.store()
return user
@pytest.fixture
def admin_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'John Doe Admin'
user.email = 'john.doe@example.com'
user.name_identifiers = ['0123456789']
user.is_admin = True
user.store()
account = PasswordAccount(id='admin')
account.set_password('admin')
account.user_id = user.id
account.store()
return user
def sign_uri(uri, user=None, format='json'):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if query:
query += '&'
if format:
query += 'format=%s&' % format
query += 'orig=coucou&algo=sha256&timestamp=' + timestamp
if user:
query += '&email=' + urllib.quote(user.email)
query += '&signature=%s' % urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha256).digest()))
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def test_user_page_redirect(pub):
output = get_app(pub).get('/user')
assert output.headers.get('location') == 'http://example.net/myspace/'
def test_user_page_error(pub):
# check we get json as output for errors
output = get_app(pub).get('/api/user/', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_user_page_error_when_json_and_no_user(pub):
output = get_app(pub).get('/api/user/?format=json', status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_missing_orig(pub):
output = get_app(pub).get('/api/user/?format=json&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple orig field'
def test_get_user_from_api_query_string_error_invalid_orig(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coin&signature=xxx', status=403)
assert output.json['err_desc'] == 'invalid orig'
def test_get_user_from_api_query_string_error_missing_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx', status=403)
assert output.json['err_desc'] == 'missing/multiple algo field'
def test_get_user_from_api_query_string_error_invalid_algo(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=coin', status=403)
assert output.json['err_desc'] == 'invalid algo'
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=__getattribute__', status=403)
assert output.json['err_desc'] == 'invalid algo'
def test_get_user_from_api_query_string_error_invalid_signature(pub):
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=sha1', status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
b'format=json&orig=coucou&algo=sha1',
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?format=json&orig=coucou&algo=sha1&signature=%s' % signature, status=403)
assert output.json['err_desc'] == 'missing/multiple timestamp field'
def test_get_user_from_api_query_string_error_missing_email(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'no user specified'
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'unknown NameID'
def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
# check it's ok to sign an URL without specifiying an user if the endpoint
# works fine without user.
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub):
# check the categories and forms endpoints accept an unknown NameID
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
assert output.json == {'err': 0, 'data': []}
def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha1).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'invalid signature'
def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '&timestamp=' + timestamp
signature = urllib.quote(
base64.b64encode(
hmac.new(b'1234',
force_bytes(query),
hashlib.sha256).digest()))
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_sign_url(pub, local_user):
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
# try to add something after signed url
get_app(pub).get('%s&foo=bar' % url, status=403)
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'12345'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url, status=403)
def test_get_user(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
def test_is_url_signed_check_nonce(pub, local_user, freezer):
ORIG = 'xxx'
KEY = 'xxx'
pub.site_options.add_section('api-secrets')
pub.site_options.set('api-secrets', ORIG, KEY)
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
# test clean_nonces do not bark when nonces directory is empty
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
pub.clean_nonces(now=0)
nonce_dir = os.path.join(pub.app_dir, 'nonces')
assert not os.path.exists(nonce_dir) or not os.listdir(nonce_dir)
signed_url = sign_url('?format=json&orig=%s&email=%s'
% (ORIG, urllib.quote(local_user.email)), KEY)
req = HTTPRequest(None, {'SCRIPT_NAME': '/',
'SERVER_NAME': 'example.net',
'QUERY_STRING': signed_url[1:]})
req.process_inputs()
pub.set_app_dir(req)
pub._set_request(req)
assert is_url_signed()
with pytest.raises(AccessForbiddenError) as exc_info:
req.signed = False
is_url_signed()
assert exc_info.value.public_msg == 'nonce already used'
# test that clean nonces works
pub.clean_nonces()
assert os.listdir(nonce_dir)
# 80 seconds in the future, nothing should be cleaned
freezer.move_to(datetime.timedelta(seconds=80))
pub.clean_nonces()
assert os.listdir(nonce_dir)
# 90 seconds in the future, nonces should be removed
freezer.move_to(datetime.timedelta(seconds=10))
pub.clean_nonces()
assert not os.listdir(nonce_dir)
def test_get_user_compat_endpoint(pub, local_user):
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234'
)
url = signed_url[len('http://example.net'):]
output = get_app(pub).get(url)
assert output.json['user_display_name'] == u'Jean Darmette'
def test_formdef_list(pub):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.keywords = 'mobile, test'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
# anonymous access -> 403
resp1 = get_app(pub).get('/json', status=403)
resp2 = get_app(pub).get('/', headers={'Accept': 'application/json'}, status=403)
resp3 = get_app(pub).get('/api/formdefs/', status=403)
# signed request
resp1 = get_app(pub).get(sign_uri('/json'))
resp2 = get_app(pub).get(sign_uri('/'), headers={'Accept': 'application/json'})
resp3 = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp1.json == resp2.json == resp3.json
assert resp1.json['data'][0]['title'] == 'test'
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
assert resp1.json['data'][0]['redirection'] is False
assert resp1.json['data'][0]['always_advertise'] is False
assert resp1.json['data'][0]['description'] == 'plop'
assert resp1.json['data'][0]['keywords'] == ['mobile', 'test']
assert list(resp1.json['data'][0]['functions'].keys()) == ['_receiver']
assert resp1.json['data'][0]['functions']['_receiver']['label'] == 'Recipient'
assert resp1.json['data'][0]['functions']['_receiver']['role']['slug'] == role.slug
assert resp1.json['data'][0]['functions']['_receiver']['role']['name'] == role.name
assert 'count' not in resp1.json['data'][0]
# backoffice_submission formdef : none
resp1 = get_app(pub).get('/api/formdefs/?backoffice-submission=on', status=403)
resp1 = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp1.json['err'] == 0
assert len(resp1.json['data']) == 0
def test_limited_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['authentication_required'] is False
# not present in backoffice-submission formdefs
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# check it's not advertised
formdef.roles = [role.id]
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1 # advertised in naked calls (as done from combo)
assert len(resp2.json['data']) == 0 # not advertised otherwise
assert resp2.json == resp3.json == resp4.json
# still not present in backoffice-submission formdefs
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
local_user.roles = []
local_user.store()
# check it's also included in anonymous/signed calls, but marked for
# authentication
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp.json['data'][0]
assert resp.json['data'][0]['authentication_required'] is True
# check it's advertised
formdef.always_advertise = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['authentication_required']
assert resp.json == resp2.json == resp3.json == resp4.json
formdef.required_authentication_contexts = ['fedict']
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp.json['data'][0]['required_authentication_contexts'] == ['fedict']
def test_formdef_list_redirection(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.disabled = True
formdef.disabled_redirection = 'http://example.net'
formdef.fields = []
formdef.store()
resp1 = get_app(pub).get(sign_uri('/json'))
assert resp1.json['err'] == 0
assert resp1.json['data'][0]['title'] == 'test'
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
assert resp1.json['data'][0]['redirection'] == True
assert 'count' not in resp1.json['data'][0]
def test_backoffice_submission_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.id = '14'
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.description = 'plop'
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.fields = []
formdef.store()
formdef2 = FormDef()
formdef2.name = 'ignore me'
formdef2.fields = []
formdef2.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# check it's not advertised ...
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# even if it's advertised on frontoffice
formdef.always_advertise = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# even if user is admin
local_user.is_admin = True
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
local_user.is_admin = False
local_user.store()
# ... unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert 'backoffice_submission_url' in resp.json['data'][0]
# but not advertised if it's a redirection
formdef.disabled = True
formdef.disabled_redirection = 'http://example.net'
formdef.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
def test_formdef_schema(pub):
Workflow.wipe()
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.status = 'st2'
jump.timeout = 100
st1.items.append(jump)
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.status = 'st3'
st2.items.append(jump)
st2 = workflow.add_status('Status3', 'st3')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar'),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={
'type': 'json',
'value': 'http://datasource.com',
}),
fields.ItemsField(id='2', label='foobar2', varname='foobar2',
data_source={
'type': 'formula',
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
}),
]
formdef.workflow_id = workflow.id
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
resp = get_app(pub).get('/api/formdefs/test/schema')
resp2 = get_app(pub).get('/test/schema')
resp3 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
resp4 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
# check schema
assert resp.json == resp2.json
assert set(resp.json.keys()) >= set(['enable_tracking_codes', 'url_name', 'description',
'workflow', 'expiration_date', 'discussion',
'last_modification_time', 'has_captcha',
'always_advertise', 'name', 'disabled', 'only_allow_one',
'fields', 'keywords',
'publication_date', 'detailed_emails',
'disabled_redirection'])
assert resp.json['name'] == 'test'
# fields checks
assert resp.json['fields'][0]['label'] == 'foobar'
assert resp.json['fields'][0]['type'] == 'string'
assert resp.json['fields'][1]['label'] == 'foobar1'
assert resp.json['fields'][1]['type'] == 'item'
# check structured items are only exported for authenticated callers
assert resp.json['fields'][1]['items'] == []
assert resp.json['fields'][2]['items'] == []
assert 'structured_items' not in resp.json['fields'][1]
assert 'structured_items' not in resp.json['fields'][2]
assert len(resp3.json['fields'][1]['structured_items']) == 3
assert resp3.json['fields'][1]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][1]['structured_items'][0]['text'] == u'zéro'
assert resp3.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
assert resp3.json['fields'][1]['items'][0] == u'zéro'
assert resp3.json['fields'][2]['label'] == 'foobar2'
assert resp3.json['fields'][2]['type'] == 'items'
assert len(resp3.json['fields'][2]['structured_items']) == 10
assert resp3.json['fields'][2]['structured_items'][0]['id'] == 0
assert resp3.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
assert resp3.json['fields'][2]['structured_items'][0]['foo'] == 0
assert resp3.json['fields'][2]['items'][0] == 'label 0'
# if structured_items fails no values
assert 'structured_items' not in resp4.json['fields'][1]
assert resp4.json['fields'][1]['items'] == []
# workflow checks
assert len(resp.json['workflow']['statuses']) == 3
assert resp.json['workflow']['statuses'][0]['id'] == 'st1'
assert resp.json['workflow']['statuses'][0]['endpoint'] is False
assert resp.json['workflow']['statuses'][0]['waitpoint'] is True
assert resp.json['workflow']['statuses'][1]['id'] == 'st2'
assert resp.json['workflow']['statuses'][1]['endpoint'] is False
assert resp.json['workflow']['statuses'][1]['waitpoint'] is False
assert resp.json['workflow']['statuses'][2]['id'] == 'st3'
assert resp.json['workflow']['statuses'][2]['endpoint'] is True
assert resp.json['workflow']['statuses'][2]['waitpoint'] is True
assert len(resp.json['workflow']['fields']) == 1
assert resp.json['workflow']['fields'][0]['label'] == '1st backoffice field'
get_app(pub).get('/api/formdefs/xxx/schema', status=404)
def test_post_invalid_json(pub, local_user):
resp = get_app(pub).post('/api/formdefs/test/submit',
params='not a json payload',
content_type='application/json',
status=400)
assert resp.json['err'] == 1
assert resp.json['err_class'] == 'Invalid request'
def test_formdef_submit(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
data_class = formdef.data_class()
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
def url():
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
return signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url(), {'data': {}})
assert resp.json['err'] == 0
assert resp.json['data']['url'] == ('http://example.net/test/%s/' % resp.json['data']['id'])
assert resp.json['data']['backoffice_url'] == ('http://example.net/backoffice/management/test/%s/' % resp.json['data']['id'])
assert resp.json['data']['api_url'] == ('http://example.net/api/forms/test/%s/' % resp.json['data']['id'])
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
local_user2 = get_publisher().user_class()
local_user2.name = 'Test'
local_user2.email= 'foo@localhost'
local_user2.store()
resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
resp = get_app(pub).post(url(), json.dumps({'data': {}}), status=400) # missing Content-Type: application/json header
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
# check qualified content type are recognized
resp = get_app(pub).post(
url(),
json.dumps({'data': {}}),
content_type='application/json; charset=utf-8')
assert resp.json['data']['url']
formdef.disabled = True
formdef.store()
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'disabled form'
formdef.disabled = False
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = ['xx']
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
assert data_class.get(resp.json['data']['id']).submission_agent_id == str(local_user.id)
formdef.enable_tracking_codes = True
formdef.store()
resp = get_app(pub).post_json(url(), {'data': {}})
assert data_class.get(resp.json['data']['id']).tracking_code
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
assert data_class.get(resp.json['data']['id']).status == 'draft'
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
'context': {'channel': 'mail', 'comments': 'blah'} })
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
assert data_class.get(resp.json['data']['id']).user_id is None
assert data_class.get(resp.json['data']['id']).submission_context == {
'comments': 'blah'}
assert data_class.get(resp.json['data']['id']).submission_channel == 'mail'
data_class.wipe()
def test_formdef_submit_only_one(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.only_allow_one = True
formdef.fields = [fields.StringField(id='0', label='foobar')]
formdef.store()
data_class = formdef.data_class()
def url():
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'1234')
return signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url(), {'data': {}})
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.count() == 1
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
formdata = data_class.select()[0]
formdata.user_id = '1000' # change owner
formdata.store()
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.count() == 2
def test_formdef_submit_with_varname(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'},
{'id': '2', 'text': 'bar', 'more': 'YYY'}]
data_source.data_source = {'type': 'formula', 'value': repr(source)}
data_source.store()
data_source = NamedDataSource(name='foobar_jsonp')
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar0', varname='foobar0'),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={'type': 'foobar'}),
fields.ItemField(id='2', label='foobar2', varname='foobar2',
data_source={'type': 'foobar_jsonp'}),
fields.DateField(id='3', label='foobar3', varname='date'),
fields.FileField(id='4', label='foobar4', varname='file'),
fields.MapField(id='5', label='foobar5', varname='map'),
fields.StringField(id='6', label='foobar6', varname='foobar6'),
]
formdef.store()
data_class = formdef.data_class()
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unsigned API call'
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
url = signed_url[len('http://example.net'):]
payload = {
'data':
{
'foobar0': 'xxx',
'foobar1': '1',
'foobar1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'foobar2': 'bar',
'foobar2_raw': '10',
'date': '1970-01-01',
'file': {
'filename': 'test.txt',
'content': force_text(base64.b64encode(b'test')),
},
'map': {
'lat': 1.5,
'lon': 2.25,
},
}
}
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
assert data_class.get(resp.json['data']['id']).tracking_code is None
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
(1970, 1, 1, 0, 0, 0, 3, 1, -1))
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test'
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
# test bijectivity
assert (formdef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3']) ==
payload['data']['date'])
for k in payload['data']['file']:
data = data_class.get(resp.json['data']['id']).data['4']
assert formdef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
assert (formdef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5']) ==
payload['data']['map'])
data_class.wipe()
def test_formdef_submit_from_wscall(pub, local_user):
test_formdef_submit_with_varname(pub, local_user)
formdef = FormDef.select()[0]
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
formdef.workflow = workflow
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'test'])
formdata.data = {
'0': 'xxx',
'1': '1',
'1_display': '1',
'1_structured': {
'id': '1',
'text': 'foo',
'more': 'XXX',
},
'2': '10',
'2_display': 'bar',
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
'4': upload,
'5': '1.5;2.25',
'bo1': 'backoffice field',
}
formdata.just_created()
formdata.evolution[-1].status = 'wf-new'
formdata.store()
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data['0'] == formdata.data['0']
assert new_formdata.data['1'] == formdata.data['1']
assert new_formdata.data['1_display'] == formdata.data['1_display']
assert new_formdata.data['1_structured'] == formdata.data['1_structured']
assert new_formdata.data['2'] == formdata.data['2']
assert new_formdata.data['2_display'] == formdata.data['2_display']
assert new_formdata.data['3'] == formdata.data['3']
assert new_formdata.data['4'].get_content() == formdata.data['4'].get_content()
assert new_formdata.data['5'] == formdata.data['5']
assert new_formdata.data['bo1'] == formdata.data['bo1']
assert not new_formdata.data.get('6')
assert new_formdata.user_id is None
# add an extra attribute
payload['extra'] = {'foobar6': 'YYY'}
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data['0'] == formdata.data['0']
assert new_formdata.data['6'] == 'YYY'
# add user
formdata.user_id = local_user.id
formdata.store()
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert str(new_formdata.user_id) == str(local_user.id)
# test missing map data
del formdata.data['5']
payload = json.loads(
json.dumps(formdata.get_json_export_dict(),
cls=qommon.misc.JSONEncoder))
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
url = signed_url[len('http://example.net'):]
resp = get_app(pub).post_json(url, payload)
assert resp.json['err'] == 0
new_formdata = formdef.data_class().get(resp.json['data']['id'])
assert new_formdata.data.get('5') is None
def test_categories(pub):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
resp = get_app(pub).get('/api/categories/', headers={'Accept': 'application/json'})
assert resp.json['data'] == [] # no advertised forms
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'mobile, test'
formdef.store()
formdef.data_class().wipe()
formdef = FormDef()
formdef.name = 'test 2'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'foobar'
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/api/categories/')
resp2 = get_app(pub).get('/categories', headers={'Accept': 'application/json'})
assert resp.json == resp2.json
assert resp.json['data'][0]['title'] == 'Category'
assert resp.json['data'][0]['url'] == 'http://example.net/category/'
assert resp.json['data'][0]['description'] == '<p>hello world</p>'
assert set(resp.json['data'][0]['keywords']) == set(['foobar', 'mobile', 'test'])
assert not 'forms' in resp.json['data'][0]
# check HTML description
category.description = '<p><strong>hello world</strong></p>'
category.store()
resp = get_app(pub).get('/api/categories/')
assert resp.json['data'][0]['description'] == category.description
def test_categories_private(pub, local_user):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
# open form
resp = get_app(pub).get('/api/categories/')
assert len(resp.json['data']) == 1
# private form, the category doesn't appear anymore
formdef.roles = ['plop']
formdef.store()
resp = get_app(pub).get('/api/categories/')
assert len(resp.json['data']) == 0
# not even for a signed request specifying an user
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
assert len(resp.json['data']) == 0
# but it appears if this is a signed request without user
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/'))
assert len(resp.json['data']) == 1
# or signed with an authorised user
local_user.roles = ['plop']
local_user.store()
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
assert len(resp.json['data']) == 1
def test_categories_formdefs(pub, local_user):
FormDef.wipe()
Category.wipe()
category = Category()
category.name = 'Category'
category.description = 'hello world'
category.store()
formdef = FormDef()
formdef.name = 'test'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'mobile, test'
formdef.store()
formdef.data_class().wipe()
formdef = FormDef()
formdef.name = 'test 2'
formdef.category_id = category.id
formdef.fields = []
formdef.keywords = 'foobar'
formdef.store()
formdef.data_class().wipe()
formdef2 = FormDef()
formdef2.name = 'other test'
formdef2.category_id = None
formdef2.fields = []
formdef2.store()
formdef2.data_class().wipe()
formdef2 = FormDef()
formdef2.name = 'test disabled'
formdef2.category_id = category.id
formdef2.fields = []
formdef2.disabled = True
formdef2.store()
formdef2.data_class().wipe()
resp = get_app(pub).get('/api/categories/category/formdefs/', status=403)
resp2 = get_app(pub).get('/category/json', status=403)
resp = get_app(pub).get(sign_uri('/api/categories/category/formdefs/'))
resp2 = get_app(pub).get(sign_uri('/category/json'))
assert resp.json == resp2.json
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
assert resp.json['data'][0]['title'] == 'test'
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
assert resp.json['data'][0]['redirection'] == False
assert resp.json['data'][0]['category'] == 'Category'
assert resp.json['data'][0]['category_slug'] == 'category'
assert 'count' not in resp.json['data'][0]
resp = get_app(pub).get(sign_uri('/api/categories/category/formdefs/?include-count=on'))
assert resp.json['data'][0]['title'] == 'test'
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
assert resp.json['data'][0]['count'] == 0
resp = get_app(pub).get(sign_uri('/api/categories/category/formdefs/?include-disabled=on'))
assert len(resp.json['data']) == 3
assert resp.json['data'][2]['title'] == 'test disabled'
get_app(pub).get('/api/categories/XXX/formdefs/', status=404)
resp = get_app(pub).get(sign_uri('/api/categories/category/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = []
local_user.store()
# check it's not advertised ...
formdef.backoffice_submission_roles = [role.id]
formdef.store()
resp = get_app(pub).get(sign_uri('/api/categories/category/formdefs/?backoffice-submission=on'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri(
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
# ... unless user has correct roles
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri(
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
local_user.name_identifiers[0]))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
def test_categories_full(pub):
test_categories(pub)
resp = get_app(pub).get('/api/categories/?full=on')
assert len(resp.json['data'][0]['forms']) == 2
assert resp.json['data'][0]['forms'][0]['title'] == 'test'
assert resp.json['data'][0]['forms'][1]['title'] == 'test 2'
def test_formdata(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr(
[{'id': '1', 'text': 'foo', 'more': 'XXX'},
{'id': '2', 'text': 'bar', 'more': 'YYY'}])}
data_source.store()
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
formdef = FormDef()
formdef.geolocations = {'base': 'blah'}
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.DateField(id='2', label='foobar3', varname='date'),
fields.FileField(id='3', label='foobar4', varname='file'),
fields.ItemField(id='4', label='foobar5', varname='item',
data_source={'type': 'foobar'}),
]
workflow = Workflow.get_default_workflow()
workflow.roles['_foobar'] = 'Foobar'
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id,
'_foobar': another_role.id}
formdef.store()
item_field = formdef.fields[4]
formdef.data_class().wipe()
formdata = formdef.data_class()()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {
'0': 'foo@localhost',
'1': 'xxx',
'2': date,
'3': upload,
'4': '1',
}
formdata.data['4_display'] = item_field.store_display_value(
formdata.data, item_field.id)
formdata.data['4_structured'] = item_field.store_structured_value(
formdata.data, item_field.id)
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.geolocations = {'base': {'lon': 10, 'lat': -12}}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
assert datetime.datetime.strptime(resp.json['last_update_time'], '%Y-%m-%dT%H:%M:%S')
assert datetime.datetime.strptime(resp.json['receipt_time'], '%Y-%m-%dT%H:%M:%S')
assert len(resp.json['fields']) == 6
assert 'foobar' in resp.json['fields']
assert 'foobar2' not in resp.json['fields'] # foobar2 has no varname, not in json
assert resp.json['user']['name'] == local_user.name
assert resp.json['fields']['foobar'] == 'foo@localhost'
assert resp.json['fields']['date'] == '2014-01-20'
assert resp.json['fields']['file']['content'] == 'YmFzZTY0bWU=' # base64('base64me')
assert resp.json['fields']['file']['filename'] == 'test.txt'
assert resp.json['fields']['file']['content_type'] == 'text/plain'
assert resp.json['fields']['item'] == 'foo'
assert resp.json['fields']['item_raw'] == '1'
assert resp.json['fields']['item_structured'] == {'id': '1', 'text': 'foo', 'more': 'XXX'}
assert resp.json['workflow']['status']['name'] == 'New'
assert resp.json['submission']['channel'] == 'web'
assert resp.json['geolocations']['base']['lon'] == 10
assert resp.json['geolocations']['base']['lat'] == -12
assert [x.get('id') for x in resp.json['roles']['_receiver']] == [str(role.id)]
assert [x.get('id') for x in resp.json['roles']['_foobar']] == [str(another_role.id)]
assert set([x.get('id') for x in resp.json['roles']['concerned']]) == set([str(role.id), str(another_role.id)])
assert [x.get('id') for x in resp.json['roles']['actions']] == [str(role.id)]
# check the ?format=json endpoint returns 403
resp = get_app(pub).get('/test/%s/?format=json' % formdata.id, status=403)
resp2 = get_app(pub).get(sign_uri('/test/%s/' % formdata.id,
user=local_user), status=403)
def test_formdata_backoffice_fields(pub, local_user):
test_formdata(pub, local_user)
workflow = Workflow.get(2)
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
workflow.store()
formdef = FormDef.select()[0]
formdata = formdef.data_class().select()[0]
formdata.data['bo1'] = 'Hello world'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['fields']['backoffice_blah'] == 'Hello world'
def test_formdata_duplicated_varnames(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
formdef = FormDef()
formdef.geolocations = {'base': 'blah'}
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2', varname='foobar'),
]
workflow = Workflow.get_default_workflow()
workflow.roles['_foobar'] = 'Foobar'
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id,
'_foobar': another_role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'0': 'foo',
'1': 'bar',
}
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
formdata.data = {
'0': 'foo',
'1': '',
}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
formdata.data = {
'0': '',
'1': 'foo',
}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
def test_formdata_edit(pub, local_user):
test_formdata(pub, local_user)
formdef = FormDef.select()[0]
formdata = formdef.data_class().select()[0]
workflow = formdef.workflow
# not user
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id),
{'data': {'0': 'bar@localhost'}},
status=403)
# no editable action
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403)
wfedit = EditableWorkflowStatusItem()
wfedit.id = '_wfedit'
wfedit.by = [local_user.roles[0]]
workflow.possible_status[1].items.append(wfedit)
wfedit.parent = workflow.possible_status[1]
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=200)
assert formdef.data_class().select()[0].data['0'] == 'bar@localhost'
# not editable by user role
wfedit.by = ['XX']
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403)
# edit + jump
wfedit.status = 'rejected'
wfedit.by = [local_user.roles[0]]
workflow.store()
resp = get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar2@localhost'}},
status=200)
assert formdef.data_class().select()[0].data['0'] == 'bar2@localhost'
assert formdef.data_class().select()[0].status == 'wf-rejected'
def test_formdata_with_workflow_data(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
from wcs.qommon.form import PicklableUpload as PicklableUpload3
upload = PicklableUpload3('test.txt', 'text/plain', 'ascii')
upload.receive([b'test'])
upload2 = PicklableUpload3('test.txt', 'text/plain', 'ascii')
upload2.receive([b'test'])
formdata.workflow_data = {'blah': upload, 'blah2': upload2, 'xxx': 23}
formdata.store()
resp = get_app(pub).get(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['data']['xxx'] == 23
assert resp.json['workflow']['data']['blah']['filename'] == 'test.txt'
assert resp.json['workflow']['data']['blah']['content_type'] == 'text/plain'
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah']['content'])) == b'test'
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah2']['content'])) == b'test'
def test_user_by_nameid(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user),
status=404)
local_user.name_identifiers = ['xyz']
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
assert str(resp.json['id']) == str(local_user.id)
def test_user_roles(pub, local_user):
local_user.name_identifiers = ['xyz']
local_user.store()
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
assert len(resp.json['user_roles']) == 1
assert resp.json['user_roles'][0]['name'] == 'Foo bar'
def test_user_forms(pub, local_user):
Workflow.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields.append(fields.DateField(label='Test', type='date', varname='option_date'))
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.DateField(id='2', label='date', type='date', varname='date'),
]
formdef.keywords = 'hello, world'
formdef.disabled = False
formdef.enable_tracking_codes = True
formdef.workflow = workflow
formdef.workflow_options = {'option_date': datetime.date(2020, 1, 15).timetuple()}
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
formdata = formdef.data_class()()
formdata.data = {
'0': 'foo@localhost',
'1': 'xxx',
'2': datetime.date(2020, 1, 15).timetuple(),
}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/forms', user=local_user))
resp3 = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['form_name'] == 'test'
assert resp.json['data'][0]['form_slug'] == 'test'
assert resp.json['data'][0]['form_status'] == 'New'
assert datetime.datetime.strptime(resp.json['data'][0]['form_receipt_datetime'],
'%Y-%m-%dT%H:%M:%S')
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
assert resp.json == resp2.json == resp3.json
resp = get_app(pub).get(sign_uri('/api/user/forms?full=on', user=local_user))
assert resp.json['err'] == 0
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
assert resp.json['data'][0]['fields']['date'] == '2020-01-15'
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
assert resp.json['data'][0]['form_option_option_date'] == '2020-01-15'
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&full=on', user=local_user))
assert resp.json == resp2.json
formdef.disabled = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
# check digest is part of contents
formdef.digest_template = 'XYZ'
formdef.data_class().get(formdata.id).store()
assert formdef.data_class().get(formdata.id).digest == 'XYZ'
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['data'][0]['form_digest'] == 'XYZ'
resp = get_app(pub).get(sign_uri('/api/user/forms?NameID=xxx'))
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&NameID=xxx'))
assert resp.json == resp2.json
formdata = formdef.data_class()()
formdata.user_id = local_user.id
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
formdef.disabled = False
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
draft_formdata = [x for x in resp.json['data'] if x['status'] == 'Draft'][0]
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': 'xyy'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.receipt_time = (datetime.datetime.now() + datetime.timedelta(days=1)).timetuple()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
assert len(resp.json['data']) == 2
resp2 = get_app(pub).get(sign_uri('/api/user/forms?sort=desc', user=local_user))
assert len(resp2.json['data']) == 2
assert resp2.json['data'][0] == resp.json['data'][1]
assert resp2.json['data'][1] == resp.json['data'][0]
def test_user_forms_limit_offset(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test limit offset'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),]
formdef.keywords = 'hello, world'
formdef.disabled = False
formdef.enable_tracking_codes = False
formdef.store()
formdef.data_class().wipe()
for i in range(50):
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': str(i)}
formdata.user_id = local_user.id
formdata.just_created()
formdata.receipt_time = (datetime.datetime.now() + datetime.timedelta(days=i)).timetuple()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 50
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 10
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(1, 11)]
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10&offset=45' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 5
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(46, 51)]
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10&sort=desc' % local_user.id))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 10
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(50, 40, -1)]
def test_user_forms_from_agent(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
role.store()
agent_user = get_publisher().user_class()
agent_user.name = 'Agent'
agent_user.email = 'agent@example.com'
agent_user.name_identifiers = ['ABCDE']
agent_user.roles = [role.id]
agent_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),]
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'0': 'foo@localhost', '1': 'xxx'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['form_name'] == 'test'
assert resp.json['data'][0]['form_slug'] == 'test'
assert resp.json['data'][0]['form_status'] == 'New'
assert resp.json['data'][0]['readable'] is False
formdef.skip_from_360_view = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert len(resp.json['data']) == 0
formdef.workflow_roles = {'_receiver': str(role.id)}
formdef.store()
formdef.data_class().rebuild_security()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
assert len(resp.json['data']) == 1
agent_user.roles = []
agent_user.store()
get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user), status=403)
def test_user_drafts(pub, local_user):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.FileField(id='2', label='foobar3', varname='file'),]
formdef.keywords = 'hello, world'
formdef.disabled = False
formdef.enable_tracking_codes = True
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
formdata = formdef.data_class()()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {'0': 'foo@localhost', '1': 'xxx', '2': upload}
formdata.user_id = local_user.id
formdata.page_no = 1
formdata.status = 'draft'
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
formdata.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
resp2 = get_app(pub).get(sign_uri('/myspace/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
assert resp.json == resp2.json
assert not 'fields' in resp.json['data'][0]
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
resp = get_app(pub).get(sign_uri('/api/user/drafts?full=on', user=local_user))
assert resp.json['err'] == 0
assert 'fields' in resp.json['data'][0]
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
assert 'url' in resp.json['data'][0]['fields']['file']
assert 'content' not in resp.json['data'][0]['fields']['file'] # no file content in full lists
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
formdef.enable_tracking_codes = False
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
formdef.enable_tracking_codes = True
formdef.disabled = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri('/api/user/drafts?NameID=xxx'))
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
resp2 = get_app(pub).get(sign_uri('/api/user/drafts?&NameID=xxx'))
assert resp.json == resp2.json
def test_api_list_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
items=['foo', 'bar', 'baz']),
fields.FileField(id='2', label='foobar4', varname='file', type='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i%4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i%4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
elif i%3 == 1:
formdata.jump_status('just_submitted')
else:
formdata.jump_status('finished')
if i%7 == 0:
formdata.backoffice_submission = True
formdata.submission_channel = 'mail'
formdata.evolution[-1].time = (datetime.datetime(2020, 1, 2, 3, 4) +
datetime.timedelta(hours=i)).timetuple()
formdata.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
assert len(resp.json) == 30
assert datetime.datetime.strptime(resp.json[0]['receipt_time'], '%Y-%m-%dT%H:%M:%S')
assert not 'fields' in resp.json[0]
# check getting full formdata
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'url' in resp.json[0]['fields']['file']
assert 'content' not in resp.json[0]['fields']['file'] # no file content in full lists
assert 'user' in resp.json[0]
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
assert resp.json[0]['evolution'][0]['who']['id'] == local_user.id
assert all('status' in x['workflow'] for x in resp.json)
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['backoffice'] is True
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['channel'] == 'mail'
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['backoffice'] is False
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['channel'] == 'web'
# check filtered results
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=foo', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=bar', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=baz', user=local_user))
assert len(resp.json) == 14
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=FOO BAR 3', user=local_user))
assert len(resp.json) == 1
# check filter on status
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=pending', user=local_user))
assert len(resp.json) == 20
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=done', user=local_user))
assert len(resp.json) == 10
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp.json) == 30
# check filter on last update time
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-start-mtime=on&filter-start-mtime-value=2020-01-03', user=local_user))
assert len(resp.json) == 16
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-start-mtime=on&filter-start-mtime-value=2020-01-03 10:00', user=local_user))
assert len(resp.json) == 10
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-end-mtime=on&filter-end-mtime-value=2020-01-03', user=local_user))
assert len(resp.json) == 14
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-end-mtime=on&filter-end-mtime-value=2020-01-03 10:00', user=local_user))
assert len(resp.json) == 20
# check limit and offset
resp_all = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp_all.json) == 30
partial_resps = []
for i in range(0, 48, 12):
partial_resps.append(get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter=all&offset=%s&limit=12' % i,
user=local_user)))
assert len(partial_resps[0].json) == 12
assert len(partial_resps[1].json) == 12
assert len(partial_resps[2].json) == 6
assert len(partial_resps[3].json) == 0
resp_all_ids = [x.get('id') for x in resp_all.json]
resp_partial_ids = []
for resp in partial_resps:
resp_partial_ids.extend([x.get('id') for x in resp.json])
assert resp_all_ids == resp_partial_ids
# check error handling
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
def test_api_anonymized_formdata(pub, local_user, admin_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
items=['foo', 'bar', 'baz']),
fields.FileField(id='2', label='foobar4', varname='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i%4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i%4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
evo = Evolution()
evo.who = admin_user.id
evo.time = time.localtime()
evo.status = 'wf-%s' % 'finished'
formdata.evolution.append(evo)
formdata.status = evo.status
formdata.store()
# check access is granted even if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert not 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json[1]['evolution'][1]
assert 'id' in resp.json[1]['evolution'][1]['who']
assert 'email' in resp.json[1]['evolution'][1]['who']
assert 'NameID' in resp.json[1]['evolution'][1]['who']
assert 'name' in resp.json[1]['evolution'][1]['who']
# check access is granted event if there is no user
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on'))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert not 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check anonymise is enforced on detail view
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise&full=on' % resp.json[1]['id']))
assert 'receipt_time' in resp.json
assert 'fields' in resp.json
assert 'user' not in resp.json
assert 'file' not in resp.json['fields'] # no file export in detail
assert 'foobar3' in resp.json['fields']
assert 'foobar' not in resp.json['fields']
assert 'evolution' in resp.json
assert len(resp.json['evolution']) == 2
assert 'status' in resp.json['evolution'][0]
assert not 'who' in resp.json['evolution'][0]
assert 'time' in resp.json['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json['evolution'][1]
assert 'id' in resp.json['evolution'][1]['who']
assert 'email' in resp.json['evolution'][1]['who']
assert 'NameID' in resp.json['evolution'][1]['who']
assert 'name' in resp.json['evolution'][1]['who']
def test_api_geojson_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
fields.FileField(id='1', label='foobar1', type='file')
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?anonymise', user=local_user), status=403)
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
foobar = '<font color="red">FOO BAR</font>'
username = '<font color="red">Jean Darmette</font>'
data = {'0': foobar, '1': upload}
local_user.name = username
local_user.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
display_fields = resp.json['features'][0]['properties']['display_fields']
for field in display_fields:
if field['label'] == 'Number':
assert field['varname'] == 'id'
assert field['html_value'] == '1-28'
assert field['value'] == '1-28'
if field['label'] == 'User Label':
assert field['varname'] == 'user_label'
assert field['value'] == username
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;Jean Darmette&lt;/font&gt;"
if field['label'] == 'foobar':
assert field['varname'] == 'foobar'
assert field['value'] == foobar
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;FOO BAR&lt;/font&gt;"
if field['label'] == 'foobar1':
assert field['varname'] is None
assert field['value'] == "test.txt"
assert field['html_value'] == '<div class="file-field"><a download="test.txt" href="http://example.net/backoffice/management/test/28/download?f=1"><span>test.txt</span></a></div>'
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?filter=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
# check with http basic auth
app = get_app(pub)
app.authorization = ('Basic', ('user', 'password'))
resp = app.get('/api/forms/test/geojson?email=%s' % local_user.email, status=401)
# add authentication info
pub.load_site_options()
pub.site_options.add_section('api-http-auth-geojson')
pub.site_options.set('api-http-auth-geojson', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = app.get('/api/forms/test/geojson?email=%s' % local_user.email)
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check 404 if the formdef doesn't have geolocation support
formdef.geolocations = {}
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=404)
def test_api_ods_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user), status=403)
# even if there's an anonymise parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/ods?anonymise', user=local_user), status=403)
data = {'0': 'foobar'}
for i in range(30):
formdata = data_class()
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
# check it still gives a ods file when there is more date
for i in range(300):
formdata = data_class()
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
zipf = zipfile.ZipFile(BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
def test_api_list_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [fields.StringField(id='0', label='foobar', varname='foobar'),]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
formdata.data = {'0': 'FOO BAR %d' % i}
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
assert len(resp.json) == 30
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'custom view'
custom_view.formdef = formdef
custom_view.columns = {'list': [{'id': '0'}]}
custom_view.filters = {"filter": "done", "filter-status": "on"}
custom_view.visibility = 'any'
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/list/custom-view', user=local_user))
assert len(resp.json['data']) == 20
def test_api_ods_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [fields.StringField(id='0', label='foobar', varname='foobar'),]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
formdata.data = {'0': 'FOO BAR %d' % i}
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
zipf = zipfile.ZipFile(BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'custom view'
custom_view.formdef = formdef
custom_view.columns = {'list': [{'id': '0'}]}
custom_view.filters = {"filter": "done", "filter-status": "on"}
custom_view.visibility = 'any'
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
zipf = zipfile.ZipFile(BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21
def test_api_geojson_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [fields.StringField(id='0', label='foobar', varname='foobar'),]
formdef.geolocations = {'base': 'Location'}
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
formdata.data = {'0': 'FOO BAR %d' % i}
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user))
assert len(resp.json['features']) == 10
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'custom view'
custom_view.formdef = formdef
custom_view.columns = {'list': [{'id': '0'}]}
custom_view.filters = {"filter": "done", "filter-status": "on"}
custom_view.visibility = 'any'
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson/custom-view', user=local_user))
assert len(resp.json['features']) == 20
def test_api_global_geojson(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = []
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/geojson?status=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
def test_api_global_listing(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
role.store()
# check there's no crash if there are no formdefs
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user))
assert len(resp.json['data']) == 20
# check limit/offset
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=5&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=18&limit=5', user=local_user))
assert len(resp.json['data']) == 2
# check error handling
get_app(pub).get(sign_uri('/api/forms/?status=done&limit=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/?status=done&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/?category_id=plop', user=local_user), status=400)
# check when there are missing statuses
for formdata in data_class.select():
formdata.status = 'wf-missing'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/?status=all', user=local_user))
assert resp.json['data'][0]['status'] is None
assert 'unknown' in resp.json['data'][0]['title']
def test_api_global_listing_ignored_roles(pub, local_user):
test_api_global_listing(pub, local_user)
role = Role(name='test2')
role.store()
formdef = FormDef()
formdef.name = 'test2'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# considering roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100', user=local_user))
assert len(resp.json['data']) == 30
# ignore roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 40
# check sensitive forms are not exposed
formdef.skip_from_360_view = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 30
def test_api_include_anonymised(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
role.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# anonymise the last one
formdata.anonymise()
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=on', user=local_user))
assert len(resp.json['data']) == 10
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=off', user=local_user))
assert len(resp.json['data']) == 9
@pytest.fixture
def ics_data(local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.url_name = 'test'
formdef.name = 'testé'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2', varname='foobar2'),
]
formdef.digest_template = 'plöp {{ form_var_foobar }} plÔp'
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
date = datetime.datetime(2014, 1, 20, 12, 00)
for i in range(30):
formdata = data_class()
formdata.data = {'0': (date + datetime.timedelta(days=i)).strftime('%Y-%m-%d %H:%M')}
formdata.data['1'] = (date + datetime.timedelta(days=i, minutes=i+1)).strftime('%Y-%m-%d %H:%M')
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# not a datetime: ignored
date = datetime.date(2014, 1, 20)
formdata = data_class()
formdata.data = {'0': '12:00'}
formdata.data['1'] = '13:00'
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
def test_api_ics_formdata(pub, local_user, ics_data):
role = Role.select()[0]
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?anonymise', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
def remove_dtstamp(body):
# remove dtstamp as the precise timing may vary between two consecutive
# calls and we shouldn't care.
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
# check 404 on incomplete ics url access
assert get_app(pub).get(sign_uri('/api/forms/test/ics/', user=local_user), status=404)
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check that description contains form name, display id, workflow status,
# backoffice url and attached user
pattern = re.compile(u'DESCRIPTION:testé \| 1-\d+ \| New', re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 10
assert resp.text.count('Jean Darmette') == 10
assert resp.text.count('DTSTART') == 10
# check formdata digest summary and description contains the formdata digest
pattern = re.compile(
r'SUMMARY:testé #1-\d+ - plöp \d{4}-\d{2}-\d{2} \d{2}:\d{2} plÔp',
re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 10
assert resp.text.count(r'plöp') == 20
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?filter=done', user=local_user))
assert resp.text.count('BEGIN:VEVENT') == 20
pattern = re.compile(u'DESCRIPTION:testé \| 1-\d+ \| Finished', re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 20
assert resp.text.count('Jean Darmette') == 20
# check 404 on erroneous field var
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/xxx', user=local_user), status=404)
# check 404 on an erroneous field var for endtime
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/xxx', user=local_user), status=404)
# check 404 on too many path elements
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/xxx', user=local_user), status=404)
# check ics data with start and end varnames
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.text.count('DTSTART') == 10
assert resp.text.count('DTEND') == 10
def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
role = Role.select()[0]
# check as admin
app = login(get_app(pub))
resp = app.get('/api/forms/test/ics/foobar', status=200)
# no access
app = get_app(pub)
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
assert resp.headers['Www-Authenticate']
# auth but no access
app = get_app(pub)
app.authorization = ('Basic', ('user', 'password'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
# add authentication info
pub.load_site_options()
pub.site_options.add_section('api-http-auth-ics')
pub.site_options.set('api-http-auth-ics', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
# check access is denied if the user has not the appropriate role
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
# check access is denied if the user is not specified
resp = app.get('/api/forms/test/ics/foobar', status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=200)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check it fails with a different password
app.authorization = ('Basic', ('user', 'password2'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
def test_api_ics_formdata_custom_view(pub, local_user, ics_data):
role = Role.select()[0]
formdef = FormDef.get_by_urlname('test')
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'custom view'
custom_view.formdef = formdef
custom_view.columns = {'list': [{'id': '0'}]}
custom_view.filters = {}
custom_view.visibility = 'any'
custom_view.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar', user=local_user), status=403)
# even if there's an anonymise parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar?anonymise', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
def remove_dtstamp(body):
# remove dtstamp as the precise timing may vary between two consecutive
# calls and we shouldn't care.
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check ics data with start and end varnames
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/foobar2', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/foobar2/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.text.count('DTSTART') == 10
assert resp.text.count('DTEND') == 10
def test_roles(pub, local_user):
Role.wipe()
role = Role(name='Hello World')
role.emails = ['toto@example.com', 'zozo@example.com']
role.details = 'kouign amann'
role.store()
resp = get_app(pub).get('/api/roles', status=403)
resp = get_app(pub).get(sign_uri('/api/roles'))
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
# also check old endpoint, for compatibility
resp = get_app(pub).get(sign_uri('/roles'), headers={'Accept': 'application/json'})
assert resp.json['data'][0]['text'] == 'Hello World'
assert resp.json['data'][0]['slug'] == 'hello-world'
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
assert resp.json['data'][0]['emails_to_members'] == False
assert resp.json['data'][0]['details'] == 'kouign amann'
def test_users(pub, local_user):
resp = get_app(pub).get('/api/users/', status=403)
resp = get_app(pub).get(sign_uri('/api/users/'))
assert resp.json['data'][0]['user_display_name'] == local_user.name
assert resp.json['data'][0]['user_email'] == local_user.email
assert resp.json['data'][0]['user_id'] == local_user.id
role = Role(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
assert resp.json['data'][0]['user_email'] == local_user.email
assert len(resp.json['data'][0]['user_roles']) == 1
assert resp.json['data'][0]['user_roles'][0]['name'] == 'Foo bar'
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
assert len(resp.json['data']) == 0
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(pub)
formdef.fields.append(fields.StringField(id='3', label='test', type='string'))
formdef.store()
local_user.form_data = {'3': 'HELLO'}
local_user.set_attributes_from_formdata(local_user.form_data)
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=HELLO'))
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
assert len(resp.json['data']) == 0
def test_users_unaccent(pub, local_user):
local_user.name = 'Jean Sénisme'
local_user.store()
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=senisme'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=sénisme'))
assert resp.json['data'][0]['user_email'] == local_user.email
resp = get_app(pub).get(sign_uri('/api/users/?q=blah'))
assert len(resp.json['data']) == 0
def test_workflow_trigger(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
status=200)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
Role.wipe()
role = Role(name='xxx')
role.store()
jump.by = [role.id]
workflow.store()
formdata.store() # (will get back to wf-st1)
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
status=403)
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
status=200)
def test_workflow_trigger_with_condition(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.condition = {'type': 'django', 'value': 'form_var_foo == "bar"'}
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
st2 = workflow.add_status('Status2', 'st2')
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='0', label='foo', varname='foo')]
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {'0': 'foo'}
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=403)
assert resp.json == {'err_desc': 'unmet condition', 'err': 1}
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
# check without json
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', format=None), status=403)
assert resp.content_type == 'text/html'
formdata.data['0'] = 'bar'
formdata.store()
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
def test_workflow_trigger_jump_once(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
st3 = workflow.add_status('Status3', 'st3')
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
jump = JumpWorkflowStatusItem()
jump.trigger = 'XXX'
jump.status = 'st3'
st2.items.append(jump)
jump.parent = st2
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
assert resp.json == {'err': 0, 'url': None}
assert formdef.data_class().get(formdata.id).status == 'wf-st3'
def test_workflow_global_webservice_trigger(pub, local_user):
workflow = Workflow(name='test')
st1 = workflow.add_status('Status1', 'st1')
ac1 = workflow.add_global_action('Action', 'ac1')
trigger = ac1.append_trigger('webservice')
trigger.identifier = 'plop'
add_to_journal = RegisterCommenterWorkflowStatusItem()
add_to_journal.id = '_add_to_journal'
add_to_journal.comment = 'HELLO WORLD'
ac1.items.append(add_to_journal)
add_to_journal.parent = ac1
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
# call to undefined hook
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'hooks/XXX/'), status=404)
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/XXX/'), status=404)
# anonymous call
resp = get_app(pub).post(formdata.get_url() + 'hooks/plop/', status=200)
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD'
add_to_journal.comment = 'HELLO WORLD 2'
workflow.store()
resp = get_app(pub).post(formdata.get_api_url() + 'hooks/plop/', status=200)
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 2'
# call requiring user
add_to_journal.comment = 'HELLO WORLD 3'
trigger.roles = ['logged-users']
workflow.store()
resp = get_app(pub).post(formdata.get_api_url() + 'hooks/plop/', status=403)
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/'), status=200)
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 3'
# call requiring roles
add_to_journal.comment = 'HELLO WORLD 4'
trigger.roles = ['logged-users']
workflow.store()
Role.wipe()
role = Role(name='xxx')
role.store()
trigger.roles = [role.id]
workflow.store()
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/'), status=403)
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user), status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user), status=200)
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 4'
# call adding data
add_to_journal.comment = 'HELLO {{plop_test}}'
workflow.store()
resp = get_app(pub).post_json(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user),
{'test': 'foobar'}, status=200)
# (django templating make it turn into HTML)
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == '<div>HELLO foobar</div>'
# call adding data but with no actions
ac1.items = []
workflow.store()
resp = get_app(pub).post_json(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user),
{'test': 'BAR'}, status=200)
assert formdef.data_class().get(formdata.id).workflow_data == {'plop': {'test': 'BAR'}}
def test_tracking_code(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
formdef.enable_tracking_codes = True
formdef.store()
data_class = formdef.data_class()
formdata = data_class()
formdata.store()
code = get_publisher().tracking_code_class()
code.formdata = formdata
code.store()
# missing signature
get_app(pub).get('/api/code/foobar', status=403)
resp = get_app(pub).get(sign_url('/api/code/foobar?orig=coucou', '1234'), status=404)
assert resp.json['err'] == 1
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=200)
assert resp.json['err'] == 0
assert resp.json['url'] == 'http://example.net/test/%s/' % formdata.id
assert resp.json['load_url'] == 'http://example.net/code/%s/load' % code.id
formdef.enable_tracking_codes = False
formdef.store()
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
formdef.enable_tracking_codes = True
formdef.store()
formdata.remove_self()
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
def test_validate_expression(pub):
resp = get_app(pub).get('/api/validate-expression?expression=hello')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression=[hello]')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression==[hello')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-expression?expression==[hello]')
assert resp.json['klass'] == 'warning'
assert resp.json['msg'].startswith('Make sure you want a Python expression,')
resp = get_app(pub).get('/api/validate-expression?expression==hello[0]')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-expression?expression==hello[\'plop\']')
assert resp.json == {'klass': None, 'msg': ''}
# django with unicode
resp = get_app(pub).get('/api/validate-expression?expression={{hello+%C3%A9l%C3%A9phant}}')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error in Django template: Could not parse the remainder')
def test_validate_condition(pub):
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=~2')
assert resp.json == {'klass': None, 'msg': ''}
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello -')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-condition?type=python&value_python={{form_number}}==3')
assert resp.json['klass'] == 'error'
assert 'Python condition cannot contain {{' in resp.json['msg']
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=un+%C3%A9l%C3%A9phant')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith(u"syntax error: Unused 'éléphant'")
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=~2')
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=%22...%22+inf') # "..." + inf
assert resp.json['klass'] == 'error'
assert resp.json['msg'].startswith('syntax error')
resp = get_app(pub).get('/api/validate-condition?type=unknown&value_unknown=2')
assert resp.json['klass'] == 'error'
assert resp.json['msg'] == 'unknown condition type'
@pytest.fixture(params=['sql', 'pickle'])
def no_request_pub(request):
pub = create_temporary_pub(sql_mode=bool(request.param == 'sql'))
pub.app_dir = os.path.join(pub.APP_DIR, 'example.net')
pub.set_config()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''
[wscall-secrets]
api.example.com = 1234
''')
return pub
def test_get_secret_and_orig(no_request_pub):
secret, orig = get_secret_and_orig('https://api.example.com/endpoint/')
assert secret == '1234'
assert orig == 'example.net'
def test_reverse_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'}))
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert resp.content_type == 'application/json'
assert resp.text == json.dumps({'address': 'xxx'})
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=18&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.add_section('options')
pub.site_options.set('options', 'nominatim_reverse_zoom_level', '16')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=16&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.set('options', 'nominatim_key', 'KEY')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=16&key=KEY&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
pub.site_options.set('options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert urlopen.call_args[0][0] == 'http://reverse.example.net/?param=value&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
def test_formdef_submit_structured(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ItemField(id='0', label='foobar', varname='foobar',
data_source={
'type': 'json',
'value': 'http://datasource.com',
}),
fields.ItemField(id='1', label='foobar1', varname='foobar1',
data_source={
'type': 'formula',
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
}),
]
formdef.store()
data_class = formdef.data_class()
def url():
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
return signed_url[len('http://example.net'):]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
resp = get_app(pub).post_json(url(), {'data': {
'0': '0',
"1": '3',
}})
formdata = data_class.get(resp.json['data']['id'])
assert formdata.status == 'wf-new'
assert formdata.data['0'] == '0'
assert formdata.data['0_display'] == 'zéro'
assert formdata.data['0_structured'] == {
'id': 0,
'text': 'zéro',
'foo': 'bar',
}
assert formdata.data['1'] == '3'
assert formdata.data['1_display'] == 'label 3'
assert formdata.data['1_structured'] == {
'id': 3,
'text': 'label 3',
'foo': 3,
}
data_class.wipe()
def test_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
get_app(pub).get('/api/geocoding', status=400)
resp = get_app(pub).get('/api/geocoding?q=test')
assert resp.content_type == 'application/json'
assert resp.text == json.dumps([{'lat': 0, 'lon': 0}])
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/search?format=json&q=test&accept-language=en'
pub.site_options.add_section('options')
pub.site_options.set('options', 'nominatim_key', 'KEY')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/search?key=KEY&format=json&q=test&accept-language=en'
pub.site_options.set('options', 'geocoding_service_url', 'http://reverse.example.net/?param=value')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == 'http://reverse.example.net/?param=value&format=json&q=test&accept-language=en'
def test_cards(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
carddef.workflow_roles = {'_viewer': role.id}
carddef.digest_template = 'bla {{ form_var_foo }} xxx'
carddef.store()
formdata = carddef.data_class()()
formdata.data = {'0': 'blah'}
formdata.just_created()
formdata.store()
resp = get_app(pub).get('/api/cards/@list', status=403)
resp = get_app(pub).get(sign_uri('/api/cards/@list'))
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['slug'] == 'test'
resp = get_app(pub).get(sign_uri('/api/cards/test/list'), status=403)
resp = get_app(pub).get(sign_uri(
'/api/cards/test/list?NameID=%s' %
local_user.name_identifiers[0]))
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['display_id'] == formdata.get_display_id()
assert resp.json['data'][0]['display_name'] == formdata.get_display_name()
assert resp.json['data'][0]['digest'] == formdata.digest
assert resp.json['data'][0]['text'] == formdata.digest
resp = get_app(pub).get(sign_uri(
'/api/cards/test/list?NameID=%s&full=on' %
local_user.name_identifiers[0]))
assert resp.json['data'][0]['fields']['foo'] == 'blah'
assert resp.json['data'][0]['digest'] == formdata.digest
assert resp.json['data'][0]['text'] == formdata.digest
# get schema
resp = get_app(pub).get(sign_uri('/api/cards/test/@schema'), status=200)
assert len(resp.json['fields']) == 1
assert resp.json['fields'][0]['label'] == 'foobar'
assert resp.json['fields'][0]['varname'] == 'foo'
def test_api_invalid_http_basic_auth(pub, local_user, admin_user, ics_data):
app = get_app(pub)
app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email,
headers={'Authorization': 'Basic garbage'}, status=401)