2811 lines
106 KiB
Python
2811 lines
106 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pytest
|
|
import json
|
|
import shutil
|
|
import os
|
|
import hmac
|
|
import base64
|
|
import hashlib
|
|
import mock
|
|
import re
|
|
import datetime
|
|
import time
|
|
import json
|
|
import sys
|
|
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.utils.six import StringIO
|
|
from django.utils.six.moves.urllib import parse as urllib
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from quixote import cleanup, get_publisher
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.form import PicklableUpload
|
|
from wcs.users import User
|
|
from wcs.roles import Role
|
|
from wcs.carddef import CardDef
|
|
from wcs.formdef import FormDef
|
|
from wcs.formdata import Evolution
|
|
from wcs.categories import Category
|
|
from wcs.data_sources import NamedDataSource
|
|
from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef, WorkflowVariablesFieldsFormDef
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
|
from wcs import fields, qommon
|
|
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
|
|
from wcs.qommon.errors import AccessForbiddenError
|
|
|
|
from utilities import get_app, create_temporary_pub, clean_temporary_pub
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'pub' in metafunc.fixturenames:
|
|
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def pub(request, emails):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub.cfg['identification'] = {'methods': ['password']}
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''\
|
|
[api-secrets]
|
|
coucou = 1234
|
|
''')
|
|
|
|
return pub
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
@pytest.fixture
|
|
def local_user():
|
|
get_publisher().user_class.wipe()
|
|
user = get_publisher().user_class()
|
|
user.name = 'Jean Darmette'
|
|
user.email = 'jean.darmette@triffouilis.fr'
|
|
user.name_identifiers = ['0123456789']
|
|
user.store()
|
|
return user
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_user():
|
|
get_publisher().user_class.wipe()
|
|
user = get_publisher().user_class()
|
|
user.name = 'John Doe Admin'
|
|
user.email = 'john.doe@example.com'
|
|
user.name_identifiers = ['0123456789']
|
|
user.is_admin = True
|
|
user.store()
|
|
return user
|
|
|
|
|
|
def sign_uri(uri, user=None, format='json'):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
|
|
if query:
|
|
query += '&'
|
|
if format:
|
|
query += 'format=%s&' % format
|
|
query += 'orig=coucou&algo=sha256×tamp=' + timestamp
|
|
if user:
|
|
query += '&email=' + urllib.quote(user.email)
|
|
query += '&signature=%s' % urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha256).digest()))
|
|
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
|
|
|
|
|
|
def test_user_page_redirect(pub):
|
|
output = get_app(pub).get('/user')
|
|
assert output.headers.get('location') == 'http://example.net/myspace/'
|
|
|
|
|
|
def test_user_page_error(pub):
|
|
# check we get json as output for errors
|
|
output = get_app(pub).get('/api/user/', status=403)
|
|
assert output.json['err_desc'] == 'no user specified'
|
|
|
|
|
|
def test_user_page_error_when_json_and_no_user(pub):
|
|
output = get_app(pub).get('/api/user/?format=json', status=403)
|
|
assert output.json['err_desc'] == 'no user specified'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_missing_orig(pub):
|
|
output = get_app(pub).get('/api/user/?format=json&signature=xxx', status=403)
|
|
assert output.json['err_desc'] == 'missing/multiple orig field'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_invalid_orig(pub):
|
|
output = get_app(pub).get('/api/user/?format=json&orig=coin&signature=xxx', status=403)
|
|
assert output.json['err_desc'] == 'invalid orig'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_missing_algo(pub):
|
|
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx', status=403)
|
|
assert output.json['err_desc'] == 'missing/multiple algo field'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_invalid_algo(pub):
|
|
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=coin', status=403)
|
|
assert output.json['err_desc'] == 'invalid algo'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_invalid_signature(pub):
|
|
output = get_app(pub).get('/api/user/?format=json&orig=coucou&signature=xxx&algo=sha1', status=403)
|
|
assert output.json['err_desc'] == 'invalid signature'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
b'format=json&orig=coucou&algo=sha1',
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/api/user/?format=json&orig=coucou&algo=sha1&signature=%s' % signature, status=403)
|
|
assert output.json['err_desc'] == 'missing/multiple timestamp field'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_missing_email(pub):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
|
assert output.json['err_desc'] == 'no user specified'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
|
assert output.json['err_desc'] == 'unknown NameID'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
|
|
# check it's ok to sign an URL without specifiying an user if the endpoint
|
|
# works fine without user.
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
|
|
assert output.json == {'data': []}
|
|
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
|
|
assert output.json == {'err': 0, 'data': []}
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub):
|
|
# check the categories and forms endpoints accept an unknown NameID
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
|
|
assert output.json == {'data': []}
|
|
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
|
|
assert output.json == {'err': 0, 'data': []}
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha1&email=' + urllib.quote(local_user.email) + '×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
|
|
assert output.json['user_display_name'] == u'Jean Darmette'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pub, local_user):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha1).digest()))
|
|
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
|
assert output.json['err_desc'] == 'invalid signature'
|
|
|
|
|
|
def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
|
|
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
|
query = 'format=json&orig=coucou&algo=sha256&email=' + urllib.quote(local_user.email) + '×tamp=' + timestamp
|
|
signature = urllib.quote(
|
|
base64.b64encode(
|
|
hmac.new(b'1234',
|
|
force_bytes(query),
|
|
hashlib.sha256).digest()))
|
|
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
|
|
assert output.json['user_display_name'] == u'Jean Darmette'
|
|
|
|
|
|
def test_sign_url(pub, local_user):
|
|
signed_url = sign_url(
|
|
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234'
|
|
)
|
|
url = signed_url[len('http://example.net'):]
|
|
output = get_app(pub).get(url)
|
|
assert output.json['user_display_name'] == u'Jean Darmette'
|
|
|
|
# try to add something after signed url
|
|
get_app(pub).get('%s&foo=bar' % url, status=403)
|
|
|
|
signed_url = sign_url(
|
|
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'12345'
|
|
)
|
|
url = signed_url[len('http://example.net'):]
|
|
output = get_app(pub).get(url, status=403)
|
|
|
|
|
|
def test_get_user(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
signed_url = sign_url(
|
|
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234'
|
|
)
|
|
url = signed_url[len('http://example.net'):]
|
|
output = get_app(pub).get(url)
|
|
assert output.json['user_display_name'] == u'Jean Darmette'
|
|
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
|
|
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
|
|
|
|
|
|
def test_is_url_signed_check_nonce(pub, local_user):
|
|
ORIG = 'xxx'
|
|
KEY = 'xxx'
|
|
|
|
pub.site_options.add_section('api-secrets')
|
|
pub.site_options.set('api-secrets', ORIG, KEY)
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
# test clean_nonces do not bark when nonces directory is empty
|
|
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
|
|
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
|
|
pub.clean_nonces()
|
|
signed_url = sign_url('?format=json&orig=%s&email=%s'
|
|
% (ORIG, urllib.quote(local_user.email)), KEY)
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/',
|
|
'SERVER_NAME': 'example.net',
|
|
'QUERY_STRING': signed_url[1:]})
|
|
req.process_inputs()
|
|
pub.set_app_dir(req)
|
|
pub._set_request(req)
|
|
|
|
assert is_url_signed()
|
|
with pytest.raises(AccessForbiddenError) as exc_info:
|
|
req.signed = False
|
|
is_url_signed()
|
|
assert exc_info.value.public_msg == 'nonce already used'
|
|
# test that clean nonces works
|
|
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
pub.clean_nonces(delta=0)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
pub.clean_nonces(delta=0, now=time.time() + 2 * DEFAULT_DURATION)
|
|
assert not os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
|
|
|
|
def test_get_user_compat_endpoint(pub, local_user):
|
|
signed_url = sign_url(
|
|
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234'
|
|
)
|
|
url = signed_url[len('http://example.net'):]
|
|
output = get_app(pub).get(url)
|
|
assert output.json['user_display_name'] == u'Jean Darmette'
|
|
|
|
|
|
def test_formdef_list(pub):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.keywords = 'mobile, test'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
resp1 = get_app(pub).get('/json')
|
|
resp2 = get_app(pub).get('/', headers={'Accept': 'application/json'})
|
|
resp3 = get_app(pub).get('/api/formdefs/')
|
|
assert resp1.json == resp2.json == resp3.json
|
|
assert resp1.json['data'][0]['title'] == 'test'
|
|
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp1.json['data'][0]['redirection'] == False
|
|
assert resp1.json['data'][0]['description'] == 'plop'
|
|
assert resp1.json['data'][0]['keywords'] == ['mobile', 'test']
|
|
assert list(resp1.json['data'][0]['functions'].keys()) == ['_receiver']
|
|
assert resp1.json['data'][0]['functions']['_receiver']['label'] == 'Recipient'
|
|
assert resp1.json['data'][0]['functions']['_receiver']['role']['slug'] == role.slug
|
|
assert resp1.json['data'][0]['functions']['_receiver']['role']['name'] == role.name
|
|
assert 'count' not in resp1.json['data'][0]
|
|
|
|
# backoffice_submission formdef : none
|
|
resp1 = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp1.json['err'] == 0
|
|
assert len(resp1.json['data']) == 0
|
|
|
|
|
|
def test_limited_formdef_list(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
resp = get_app(pub).get('/api/formdefs/')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['authentication_required'] is False
|
|
# not present in backoffice-submission formdefs
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# check it's not advertised
|
|
formdef.roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/')
|
|
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
|
|
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
|
|
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
assert resp.json == resp2.json == resp3.json == resp4.json
|
|
# still not present in backoffice-submission formdefs
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# unless user has correct roles
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
local_user.roles = []
|
|
local_user.store()
|
|
|
|
# check it's also included in anonymous/signed calls, but marked for
|
|
# authentication
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
|
|
assert resp.json['data'][0]
|
|
assert resp.json['data'][0]['authentication_required'] is True
|
|
|
|
# check it's advertised
|
|
formdef.always_advertise = True
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/')
|
|
resp2 = get_app(pub).get(sign_uri('/api/formdefs/?NameID='))
|
|
resp3 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=XXX'))
|
|
resp4 = get_app(pub).get(sign_uri('/api/formdefs/?NameID=%s' % local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['authentication_required']
|
|
assert resp.json == resp2.json == resp3.json == resp4.json
|
|
|
|
formdef.required_authentication_contexts = ['fedict']
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/')
|
|
assert resp.json['data'][0]['required_authentication_contexts'] == ['fedict']
|
|
|
|
|
|
def test_formdef_list_redirection(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.disabled = True
|
|
formdef.disabled_redirection = 'http://example.net'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
resp1 = get_app(pub).get('/json')
|
|
assert resp1.json['err'] == 0
|
|
assert resp1.json['data'][0]['title'] == 'test'
|
|
assert resp1.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp1.json['data'][0]['redirection'] == True
|
|
assert 'count' not in resp1.json['data'][0]
|
|
|
|
|
|
def test_backoffice_submission_formdef_list(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.id = '14'
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.description = 'plop'
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdef2 = FormDef()
|
|
formdef2.name = 'ignore me'
|
|
formdef2.fields = []
|
|
formdef2.store()
|
|
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# check it's not advertised ...
|
|
formdef.backoffice_submission_roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# even if it's advertised on frontoffice
|
|
formdef.always_advertise = True
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# even if user is admin
|
|
local_user.is_admin = True
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
|
|
local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
local_user.is_admin = False
|
|
local_user.store()
|
|
|
|
# ... unless user has correct roles
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/formdefs/?backoffice-submission=on&NameID=%s' %
|
|
local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert 'backoffice_submission_url' in resp.json['data'][0]
|
|
|
|
# but not advertised if it's a redirection
|
|
formdef.disabled = True
|
|
formdef.disabled_redirection = 'http://example.net'
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
|
|
def test_formdef_schema(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.status = 'st2'
|
|
jump.timeout = 100
|
|
st1.items.append(jump)
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.status = 'st3'
|
|
st2.items.append(jump)
|
|
st2 = workflow.add_status('Status3', 'st3')
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
]
|
|
workflow.store()
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar'),
|
|
fields.ItemField(id='1', label='foobar1', varname='foobar1',
|
|
data_source={
|
|
'type': 'json',
|
|
'value': 'http://datasource.com',
|
|
}),
|
|
fields.ItemsField(id='2', label='foobar2', varname='foobar2',
|
|
data_source={
|
|
'type': 'formula',
|
|
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
|
|
}),
|
|
]
|
|
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO('''\
|
|
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
|
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
|
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
|
|
resp = get_app(pub).get('/api/formdefs/test/schema')
|
|
resp2 = get_app(pub).get('/test/schema')
|
|
resp3 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
|
|
resp4 = get_app(pub).get(sign_url('/api/formdefs/test/schema?orig=coucou', '1234'))
|
|
|
|
# check schema
|
|
assert resp.json == resp2.json
|
|
assert set(resp.json.keys()) >= set(['enable_tracking_codes', 'url_name', 'description',
|
|
'workflow', 'expiration_date', 'discussion',
|
|
'last_modification_time', 'has_captcha',
|
|
'always_advertise', 'name', 'disabled', 'only_allow_one',
|
|
'fields', 'keywords',
|
|
'publication_date', 'detailed_emails',
|
|
'disabled_redirection'])
|
|
assert resp.json['name'] == 'test'
|
|
|
|
# fields checks
|
|
assert resp.json['fields'][0]['label'] == 'foobar'
|
|
assert resp.json['fields'][0]['type'] == 'string'
|
|
|
|
assert resp.json['fields'][1]['label'] == 'foobar1'
|
|
assert resp.json['fields'][1]['type'] == 'item'
|
|
|
|
# check structured items are only exported for authenticated callers
|
|
assert resp.json['fields'][1]['items'] == []
|
|
assert resp.json['fields'][2]['items'] == []
|
|
assert 'structured_items' not in resp.json['fields'][1]
|
|
assert 'structured_items' not in resp.json['fields'][2]
|
|
|
|
assert len(resp3.json['fields'][1]['structured_items']) == 3
|
|
assert resp3.json['fields'][1]['structured_items'][0]['id'] == 0
|
|
assert resp3.json['fields'][1]['structured_items'][0]['text'] == u'zéro'
|
|
assert resp3.json['fields'][1]['structured_items'][0]['foo'] == 'bar'
|
|
assert resp3.json['fields'][1]['items'][0] == u'zéro'
|
|
|
|
assert resp3.json['fields'][2]['label'] == 'foobar2'
|
|
assert resp3.json['fields'][2]['type'] == 'items'
|
|
assert len(resp3.json['fields'][2]['structured_items']) == 10
|
|
assert resp3.json['fields'][2]['structured_items'][0]['id'] == 0
|
|
assert resp3.json['fields'][2]['structured_items'][0]['text'] == 'label 0'
|
|
assert resp3.json['fields'][2]['structured_items'][0]['foo'] == 0
|
|
assert resp3.json['fields'][2]['items'][0] == 'label 0'
|
|
|
|
# if structured_items fails no values
|
|
assert 'structured_items' not in resp4.json['fields'][1]
|
|
assert resp4.json['fields'][1]['items'] == []
|
|
|
|
# workflow checks
|
|
assert len(resp.json['workflow']['statuses']) == 3
|
|
assert resp.json['workflow']['statuses'][0]['id'] == 'st1'
|
|
assert resp.json['workflow']['statuses'][0]['endpoint'] is False
|
|
assert resp.json['workflow']['statuses'][0]['waitpoint'] is True
|
|
assert resp.json['workflow']['statuses'][1]['id'] == 'st2'
|
|
assert resp.json['workflow']['statuses'][1]['endpoint'] is False
|
|
assert resp.json['workflow']['statuses'][1]['waitpoint'] is False
|
|
assert resp.json['workflow']['statuses'][2]['id'] == 'st3'
|
|
assert resp.json['workflow']['statuses'][2]['endpoint'] is True
|
|
assert resp.json['workflow']['statuses'][2]['waitpoint'] is True
|
|
assert len(resp.json['workflow']['fields']) == 1
|
|
|
|
assert resp.json['workflow']['fields'][0]['label'] == '1st backoffice field'
|
|
|
|
get_app(pub).get('/api/formdefs/xxx/schema', status=404)
|
|
|
|
|
|
def test_post_invalid_json(pub, local_user):
|
|
resp = get_app(pub).post('/api/formdefs/test/submit',
|
|
params='not a json payload',
|
|
content_type='application/json',
|
|
status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_class'] == 'Invalid request'
|
|
|
|
|
|
def test_formdef_submit(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [fields.StringField(id='0', label='foobar')]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'unsigned API call'
|
|
|
|
def url():
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
|
return signed_url[len('http://example.net'):]
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert resp.json['err'] == 0
|
|
assert resp.json['data']['url'] == ('http://example.net/test/%s/' % resp.json['data']['id'])
|
|
assert resp.json['data']['backoffice_url'] == ('http://example.net/backoffice/management/test/%s/' % resp.json['data']['id'])
|
|
assert resp.json['data']['api_url'] == ('http://example.net/api/forms/test/%s/' % resp.json['data']['id'])
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
|
|
resp = get_app(pub).post(url(), json.dumps({'data': {}}), status=400) # missing Content-Type: application/json header
|
|
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
|
|
|
|
# check qualified content type are recognized
|
|
resp = get_app(pub).post(
|
|
url(),
|
|
json.dumps({'data': {}}),
|
|
content_type='application/json; charset=utf-8')
|
|
assert resp.json['data']['url']
|
|
|
|
formdef.disabled = True
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'disabled form'
|
|
|
|
formdef.disabled = False
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
formdef.backoffice_submission_roles = ['xx']
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
formdef.backoffice_submission_roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
|
assert data_class.get(resp.json['data']['id']).user_id is None
|
|
assert data_class.get(resp.json['data']['id']).submission_context.get('agent_id') == local_user.id
|
|
|
|
formdef.enable_tracking_codes = True
|
|
formdef.store()
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).tracking_code
|
|
|
|
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).status == 'draft'
|
|
|
|
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
|
|
'context': {'channel': 'mail', 'comments': 'blah'} })
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
|
assert data_class.get(resp.json['data']['id']).user_id is None
|
|
assert data_class.get(resp.json['data']['id']).submission_context == {
|
|
'comments': 'blah', 'agent_id': local_user.id}
|
|
assert data_class.get(resp.json['data']['id']).submission_channel == 'mail'
|
|
|
|
data_class.wipe()
|
|
|
|
|
|
def test_formdef_submit_only_one(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.only_allow_one = True
|
|
formdef.fields = [fields.StringField(id='0', label='foobar')]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
def url():
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
'1234')
|
|
return signed_url[len('http://example.net'):]
|
|
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
|
|
assert data_class.count() == 1
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
|
|
|
|
formdata = data_class.select()[0]
|
|
formdata.user_id = '1000' # change owner
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.count() == 2
|
|
|
|
|
|
def test_formdef_submit_with_varname(pub, local_user):
|
|
NamedDataSource.wipe()
|
|
data_source = NamedDataSource(name='foobar')
|
|
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'},
|
|
{'id': '2', 'text': 'bar', 'more': 'YYY'}]
|
|
data_source.data_source = {'type': 'formula', 'value': repr(source)}
|
|
data_source.store()
|
|
|
|
data_source = NamedDataSource(name='foobar_jsonp')
|
|
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
|
|
data_source.store()
|
|
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar0', varname='foobar0'),
|
|
fields.ItemField(id='1', label='foobar1', varname='foobar1',
|
|
data_source={'type': 'foobar'}),
|
|
fields.ItemField(id='2', label='foobar2', varname='foobar2',
|
|
data_source={'type': 'foobar_jsonp'}),
|
|
fields.DateField(id='3', label='foobar3', varname='date'),
|
|
fields.FileField(id='4', label='foobar4', varname='file'),
|
|
fields.MapField(id='5', label='foobar5', varname='map'),
|
|
fields.StringField(id='6', label='foobar6', varname='foobar6'),
|
|
]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'unsigned API call'
|
|
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
|
url = signed_url[len('http://example.net'):]
|
|
payload = {
|
|
'data':
|
|
{
|
|
'foobar0': 'xxx',
|
|
'foobar1': '1',
|
|
'foobar1_structured': {
|
|
'id': '1',
|
|
'text': 'foo',
|
|
'more': 'XXX',
|
|
},
|
|
'foobar2': 'bar',
|
|
'foobar2_raw': '10',
|
|
'date': '1970-01-01',
|
|
'file': {
|
|
'filename': 'test.txt',
|
|
'content': force_text(base64.b64encode(b'test')),
|
|
},
|
|
'map': {
|
|
'lat': 1.5,
|
|
'lon': 2.25,
|
|
},
|
|
}
|
|
}
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
|
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
|
|
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
|
|
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
|
|
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
|
|
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
|
|
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
|
|
(1970, 1, 1, 0, 0, 0, 3, 1, -1))
|
|
|
|
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
|
|
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test'
|
|
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
|
|
# test bijectivity
|
|
assert (formdef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3']) ==
|
|
payload['data']['date'])
|
|
for k in payload['data']['file']:
|
|
data = data_class.get(resp.json['data']['id']).data['4']
|
|
assert formdef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
|
|
assert (formdef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5']) ==
|
|
payload['data']['map'])
|
|
|
|
data_class.wipe()
|
|
|
|
|
|
def test_formdef_submit_from_wscall(pub, local_user):
|
|
test_formdef_submit_with_varname(pub, local_user)
|
|
formdef = FormDef.select()[0]
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.id = '2'
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
]
|
|
workflow.store()
|
|
formdef.workflow = workflow
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'test'])
|
|
|
|
formdata.data = {
|
|
'0': 'xxx',
|
|
'1': '1',
|
|
'1_display': '1',
|
|
'1_structured': {
|
|
'id': '1',
|
|
'text': 'foo',
|
|
'more': 'XXX',
|
|
},
|
|
'2': '10',
|
|
'2_display': 'bar',
|
|
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
|
|
'4': upload,
|
|
'5': '1.5;2.25',
|
|
'bo1': 'backoffice field',
|
|
}
|
|
formdata.just_created()
|
|
formdata.evolution[-1].status = 'wf-new'
|
|
formdata.store()
|
|
|
|
payload = json.loads(
|
|
json.dumps(formdata.get_json_export_dict(),
|
|
cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net'):]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data['0'] == formdata.data['0']
|
|
assert new_formdata.data['1'] == formdata.data['1']
|
|
assert new_formdata.data['1_display'] == formdata.data['1_display']
|
|
assert new_formdata.data['1_structured'] == formdata.data['1_structured']
|
|
assert new_formdata.data['2'] == formdata.data['2']
|
|
assert new_formdata.data['2_display'] == formdata.data['2_display']
|
|
assert new_formdata.data['3'] == formdata.data['3']
|
|
assert new_formdata.data['4'].get_content() == formdata.data['4'].get_content()
|
|
assert new_formdata.data['5'] == formdata.data['5']
|
|
assert new_formdata.data['bo1'] == formdata.data['bo1']
|
|
assert not new_formdata.data.get('6')
|
|
assert new_formdata.user_id is None
|
|
|
|
# add an extra attribute
|
|
payload['extra'] = {'foobar6': 'YYY'}
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net'):]
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data['0'] == formdata.data['0']
|
|
assert new_formdata.data['6'] == 'YYY'
|
|
|
|
# add user
|
|
formdata.user_id = local_user.id
|
|
formdata.store()
|
|
|
|
payload = json.loads(
|
|
json.dumps(formdata.get_json_export_dict(),
|
|
cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net'):]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert str(new_formdata.user_id) == str(local_user.id)
|
|
|
|
# test missing map data
|
|
del formdata.data['5']
|
|
|
|
payload = json.loads(
|
|
json.dumps(formdata.get_json_export_dict(),
|
|
cls=qommon.misc.JSONEncoder))
|
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234')
|
|
url = signed_url[len('http://example.net'):]
|
|
|
|
resp = get_app(pub).post_json(url, payload)
|
|
assert resp.json['err'] == 0
|
|
new_formdata = formdef.data_class().get(resp.json['data']['id'])
|
|
assert new_formdata.data.get('5') is None
|
|
|
|
|
|
def test_categories(pub):
|
|
FormDef.wipe()
|
|
Category.wipe()
|
|
category = Category()
|
|
category.name = 'Category'
|
|
category.description = 'hello world'
|
|
category.store()
|
|
|
|
resp = get_app(pub).get('/api/categories/', headers={'Accept': 'application/json'})
|
|
assert resp.json['data'] == [] # no advertised forms
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.category_id = category.id
|
|
formdef.fields = []
|
|
formdef.keywords = 'mobile, test'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test 2'
|
|
formdef.category_id = category.id
|
|
formdef.fields = []
|
|
formdef.keywords = 'foobar'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
resp = get_app(pub).get('/api/categories/')
|
|
resp2 = get_app(pub).get('/categories', headers={'Accept': 'application/json'})
|
|
assert resp.json == resp2.json
|
|
assert resp.json['data'][0]['title'] == 'Category'
|
|
assert resp.json['data'][0]['url'] == 'http://example.net/category/'
|
|
assert resp.json['data'][0]['description'] == '<p>hello world</p>'
|
|
assert set(resp.json['data'][0]['keywords']) == set(['foobar', 'mobile', 'test'])
|
|
assert not 'forms' in resp.json['data'][0]
|
|
|
|
# check HTML description
|
|
category.description = '<p><strong>hello world</strong></p>'
|
|
category.store()
|
|
resp = get_app(pub).get('/api/categories/')
|
|
assert resp.json['data'][0]['description'] == category.description
|
|
|
|
|
|
def test_categories_private(pub, local_user):
|
|
FormDef.wipe()
|
|
Category.wipe()
|
|
category = Category()
|
|
category.name = 'Category'
|
|
category.description = 'hello world'
|
|
category.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.category_id = category.id
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
# open form
|
|
resp = get_app(pub).get('/api/categories/')
|
|
assert len(resp.json['data']) == 1
|
|
|
|
# private form, the category doesn't appear anymore
|
|
formdef.roles = ['plop']
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/categories/')
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# not even for a signed request specifying an user
|
|
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# but it appears if this is a signed request without user
|
|
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/'))
|
|
assert len(resp.json['data']) == 1
|
|
|
|
# or signed with an authorised user
|
|
local_user.roles = ['plop']
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('http://example.net/api/categories/', local_user))
|
|
assert len(resp.json['data']) == 1
|
|
|
|
|
|
def test_categories_formdefs(pub, local_user):
|
|
FormDef.wipe()
|
|
Category.wipe()
|
|
category = Category()
|
|
category.name = 'Category'
|
|
category.description = 'hello world'
|
|
category.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.category_id = category.id
|
|
formdef.fields = []
|
|
formdef.keywords = 'mobile, test'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test 2'
|
|
formdef.category_id = category.id
|
|
formdef.fields = []
|
|
formdef.keywords = 'foobar'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdef2 = FormDef()
|
|
formdef2.name = 'other test'
|
|
formdef2.category_id = None
|
|
formdef2.fields = []
|
|
formdef2.store()
|
|
formdef2.data_class().wipe()
|
|
|
|
resp = get_app(pub).get('/api/categories/category/formdefs/')
|
|
resp2 = get_app(pub).get('/category/json')
|
|
assert resp.json == resp2.json
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 2
|
|
assert resp.json['data'][0]['title'] == 'test'
|
|
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp.json['data'][0]['redirection'] == False
|
|
assert resp.json['data'][0]['category'] == 'Category'
|
|
assert resp.json['data'][0]['category_slug'] == 'category'
|
|
assert 'count' not in resp.json['data'][0]
|
|
|
|
resp = get_app(pub).get('/api/categories/category/formdefs/?include-count=on')
|
|
assert resp.json['data'][0]['title'] == 'test'
|
|
assert resp.json['data'][0]['url'] == 'http://example.net/test/'
|
|
assert resp.json['data'][0]['count'] == 0
|
|
|
|
get_app(pub).get('/api/categories/XXX/formdefs/', status=404)
|
|
|
|
resp = get_app(pub).get('/api/categories/category/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = []
|
|
local_user.store()
|
|
# check it's not advertised ...
|
|
formdef.backoffice_submission_roles = [role.id]
|
|
formdef.store()
|
|
resp = get_app(pub).get('/api/categories/category/formdefs/?backoffice-submission=on')
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
resp = get_app(pub).get(sign_uri(
|
|
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
|
|
local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
# ... unless user has correct roles
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri(
|
|
'/api/categories/category/formdefs/?backoffice-submission=on&NameID=%s' %
|
|
local_user.name_identifiers[0]))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
|
|
def test_categories_full(pub):
|
|
test_categories(pub)
|
|
resp = get_app(pub).get('/api/categories/?full=on')
|
|
assert len(resp.json['data'][0]['forms']) == 2
|
|
assert resp.json['data'][0]['forms'][0]['title'] == 'test'
|
|
assert resp.json['data'][0]['forms'][1]['title'] == 'test 2'
|
|
|
|
|
|
def test_formdata(pub, local_user):
|
|
NamedDataSource.wipe()
|
|
data_source = NamedDataSource(name='foobar')
|
|
data_source.data_source = {'type': 'formula',
|
|
'value': repr(
|
|
[{'id': '1', 'text': 'foo', 'more': 'XXX'},
|
|
{'id': '2', 'text': 'bar', 'more': 'YYY'}])}
|
|
data_source.store()
|
|
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.id = '123'
|
|
role.store()
|
|
another_role = Role(name='another')
|
|
another_role.id = '321'
|
|
another_role.store()
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.geolocations = {'base': 'blah'}
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2'),
|
|
fields.DateField(id='2', label='foobar3', varname='date'),
|
|
fields.FileField(id='3', label='foobar4', varname='file'),
|
|
fields.ItemField(id='4', label='foobar5', varname='item',
|
|
data_source={'type': 'foobar'}),
|
|
]
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.roles['_foobar'] = 'Foobar'
|
|
workflow.id = '2'
|
|
workflow.store()
|
|
formdef.workflow_id = workflow.id
|
|
formdef.workflow_roles = {'_receiver': role.id,
|
|
'_foobar': another_role.id}
|
|
formdef.store()
|
|
item_field = formdef.fields[4]
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'base64me'])
|
|
formdata.data = {
|
|
'0': 'foo@localhost',
|
|
'1': 'xxx',
|
|
'2': date,
|
|
'3': upload,
|
|
'4': '1',
|
|
}
|
|
formdata.data['4_display'] = item_field.store_display_value(
|
|
formdata.data, item_field.id)
|
|
formdata.data['4_structured'] = item_field.store_structured_value(
|
|
formdata.data, item_field.id)
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.status = 'wf-new'
|
|
formdata.evolution[-1].status = 'wf-new'
|
|
formdata.geolocations = {'base': {'lon': 10, 'lat': -12}}
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
status=403)
|
|
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
status=200)
|
|
|
|
assert datetime.datetime.strptime(resp.json['last_update_time'], '%Y-%m-%dT%H:%M:%S')
|
|
assert datetime.datetime.strptime(resp.json['receipt_time'], '%Y-%m-%dT%H:%M:%S')
|
|
assert len(resp.json['fields']) == 6
|
|
assert 'foobar' in resp.json['fields']
|
|
assert 'foobar2' not in resp.json['fields'] # foobar2 has no varname, not in json
|
|
assert resp.json['user']['name'] == local_user.name
|
|
assert resp.json['fields']['foobar'] == 'foo@localhost'
|
|
assert resp.json['fields']['date'] == '2014-01-20'
|
|
assert resp.json['fields']['file']['content'] == 'YmFzZTY0bWU=' # base64('base64me')
|
|
assert resp.json['fields']['file']['filename'] == 'test.txt'
|
|
assert resp.json['fields']['file']['content_type'] == 'text/plain'
|
|
assert resp.json['fields']['item'] == 'foo'
|
|
assert resp.json['fields']['item_raw'] == '1'
|
|
assert resp.json['fields']['item_structured'] == {'id': '1', 'text': 'foo', 'more': 'XXX'}
|
|
assert resp.json['workflow']['status']['name'] == 'New'
|
|
assert resp.json['submission']['channel'] == 'web'
|
|
assert resp.json['geolocations']['base']['lon'] == 10
|
|
assert resp.json['geolocations']['base']['lat'] == -12
|
|
|
|
assert [x.get('id') for x in resp.json['roles']['_receiver']] == [str(role.id)]
|
|
assert [x.get('id') for x in resp.json['roles']['_foobar']] == [str(another_role.id)]
|
|
assert set([x.get('id') for x in resp.json['roles']['concerned']]) == set([str(role.id), str(another_role.id)])
|
|
assert [x.get('id') for x in resp.json['roles']['actions']] == [str(role.id)]
|
|
|
|
# check the ?format=json endpoint returns 403
|
|
resp = get_app(pub).get('/test/%s/?format=json' % formdata.id, status=403)
|
|
resp2 = get_app(pub).get(sign_uri('/test/%s/' % formdata.id,
|
|
user=local_user), status=403)
|
|
|
|
|
|
def test_formdata_backoffice_fields(pub, local_user):
|
|
test_formdata(pub, local_user)
|
|
workflow = Workflow.get(2)
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
]
|
|
workflow.store()
|
|
|
|
formdef = FormDef.select()[0]
|
|
formdata = formdef.data_class().select()[0]
|
|
formdata.data['bo1'] = 'Hello world'
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
|
|
assert resp.json['workflow']['fields']['backoffice_blah'] == 'Hello world'
|
|
|
|
|
|
def test_formdata_edit(pub, local_user):
|
|
test_formdata(pub, local_user)
|
|
formdef = FormDef.select()[0]
|
|
formdata = formdef.data_class().select()[0]
|
|
workflow = formdef.workflow
|
|
|
|
# not user
|
|
resp = get_app(pub).post_json(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id),
|
|
{'data': {'0': 'bar@localhost'}},
|
|
status=403)
|
|
|
|
# no editable action
|
|
resp = get_app(pub).post_json(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
{'data': {'0': 'bar@localhost'}},
|
|
status=403)
|
|
|
|
wfedit = EditableWorkflowStatusItem()
|
|
wfedit.id = '_wfedit'
|
|
wfedit.by = [local_user.roles[0]]
|
|
workflow.possible_status[1].items.append(wfedit)
|
|
wfedit.parent = workflow.possible_status[1]
|
|
workflow.store()
|
|
|
|
resp = get_app(pub).post_json(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
{'data': {'0': 'bar@localhost'}},
|
|
status=200)
|
|
assert formdef.data_class().select()[0].data['0'] == 'bar@localhost'
|
|
|
|
# not editable by user role
|
|
wfedit.by = ['XX']
|
|
workflow.store()
|
|
resp = get_app(pub).post_json(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
{'data': {'0': 'bar@localhost'}},
|
|
status=403)
|
|
|
|
# edit + jump
|
|
wfedit.status = 'rejected'
|
|
wfedit.by = [local_user.roles[0]]
|
|
workflow.store()
|
|
|
|
resp = get_app(pub).post_json(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
|
|
{'data': {'0': 'bar2@localhost'}},
|
|
status=200)
|
|
assert formdef.data_class().select()[0].data['0'] == 'bar2@localhost'
|
|
assert formdef.data_class().select()[0].status == 'wf-rejected'
|
|
|
|
|
|
def test_formdata_with_workflow_data(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.id = '123'
|
|
role.store()
|
|
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.id = '2'
|
|
workflow.store()
|
|
formdef.workflow_id = workflow.id
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.status = 'wf-new'
|
|
formdata.evolution[-1].status = 'wf-new'
|
|
|
|
from wcs.qommon.form import PicklableUpload as PicklableUpload3
|
|
upload = PicklableUpload3('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'test'])
|
|
upload2 = PicklableUpload3('test.txt', 'text/plain', 'ascii')
|
|
upload2.receive([b'test'])
|
|
formdata.workflow_data = {'blah': upload, 'blah2': upload2, 'xxx': 23}
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(
|
|
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
|
|
assert resp.json['workflow']['data']['xxx'] == 23
|
|
assert resp.json['workflow']['data']['blah']['filename'] == 'test.txt'
|
|
assert resp.json['workflow']['data']['blah']['content_type'] == 'text/plain'
|
|
assert base64.decodestring(force_bytes(resp.json['workflow']['data']['blah']['content'])) == b'test'
|
|
assert base64.decodestring(force_bytes(resp.json['workflow']['data']['blah2']['content'])) == b'test'
|
|
|
|
|
|
def test_user_by_nameid(pub, local_user):
|
|
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user),
|
|
status=404)
|
|
local_user.name_identifiers = ['xyz']
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
|
|
assert str(resp.json['id']) == str(local_user.id)
|
|
|
|
|
|
def test_user_roles(pub, local_user):
|
|
local_user.name_identifiers = ['xyz']
|
|
local_user.store()
|
|
role = Role(name='Foo bar')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/users/xyz/', user=local_user))
|
|
assert len(resp.json['user_roles']) == 1
|
|
assert resp.json['user_roles'][0]['name'] == 'Foo bar'
|
|
|
|
|
|
def test_user_forms(pub, local_user):
|
|
Workflow.wipe()
|
|
workflow = Workflow.get_default_workflow()
|
|
workflow.id = '2'
|
|
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
|
|
workflow.variables_formdef.fields.append(fields.DateField(label='Test', type='date', varname='option_date'))
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2'),
|
|
fields.DateField(id='2', label='date', type='date', varname='date'),
|
|
]
|
|
formdef.keywords = 'hello, world'
|
|
formdef.disabled = False
|
|
formdef.enable_tracking_codes = True
|
|
formdef.workflow = workflow
|
|
formdef.workflow_options = {'option_date': datetime.date(2020, 1, 15).timetuple()}
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {
|
|
'0': 'foo@localhost',
|
|
'1': 'xxx',
|
|
'2': datetime.date(2020, 1, 15).timetuple(),
|
|
}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
resp2 = get_app(pub).get(sign_uri('/myspace/forms', user=local_user))
|
|
resp3 = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['form_name'] == 'test'
|
|
assert resp.json['data'][0]['form_slug'] == 'test'
|
|
assert resp.json['data'][0]['form_status'] == 'New'
|
|
assert datetime.datetime.strptime(resp.json['data'][0]['form_receipt_datetime'],
|
|
'%Y-%m-%dT%H:%M:%S')
|
|
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
|
|
assert resp.json == resp2.json == resp3.json
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms?full=on', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
|
|
assert resp.json['data'][0]['fields']['date'] == '2020-01-15'
|
|
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
|
|
assert resp.json['data'][0]['form_option_option_date'] == '2020-01-15'
|
|
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&full=on', user=local_user))
|
|
assert resp.json == resp2.json
|
|
|
|
formdef.disabled = True
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
# check digest is part of contents
|
|
formdef.digest_template = 'XYZ'
|
|
formdef.data_class().get(formdata.id).store()
|
|
assert formdef.data_class().get(formdata.id).digest == 'XYZ'
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
assert resp.json['data'][0]['form_digest'] == 'XYZ'
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms?NameID=xxx'))
|
|
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
|
|
resp2 = get_app(pub).get(sign_uri('/api/user/forms?&NameID=xxx'))
|
|
assert resp.json == resp2.json
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = local_user.id
|
|
formdata.status = 'draft'
|
|
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
formdef.disabled = False
|
|
formdef.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms?include-drafts=true', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 2
|
|
|
|
draft_formdata = [x for x in resp.json['data'] if x['status'] == 'Draft'][0]
|
|
assert draft_formdata.get('url')[-1] != '/'
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'foo@localhost', '1': 'xyy'}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.receipt_time = (datetime.datetime.now() + datetime.timedelta(days=1)).timetuple()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/forms', user=local_user))
|
|
assert len(resp.json['data']) == 2
|
|
resp2 = get_app(pub).get(sign_uri('/api/user/forms?sort=desc', user=local_user))
|
|
assert len(resp2.json['data']) == 2
|
|
assert resp2.json['data'][0] == resp.json['data'][1]
|
|
assert resp2.json['data'][1] == resp.json['data'][0]
|
|
|
|
|
|
def test_user_forms_limit_offset(pub, local_user):
|
|
if not pub.is_using_postgresql():
|
|
pytest.skip('this requires SQL')
|
|
return
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test limit offset'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2'),]
|
|
formdef.keywords = 'hello, world'
|
|
formdef.disabled = False
|
|
formdef.enable_tracking_codes = False
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
for i in range(50):
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'foo@localhost', '1': str(i)}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.receipt_time = (datetime.datetime.now() + datetime.timedelta(days=i)).timetuple()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 50
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10' % local_user.id))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 10
|
|
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(1, 11)]
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10&offset=45' % local_user.id))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 5
|
|
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(46, 51)]
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?limit=10&sort=desc' % local_user.id))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 10
|
|
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(50, 40, -1)]
|
|
|
|
|
|
def test_user_forms_from_agent(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Foo bar')
|
|
role.store()
|
|
|
|
agent_user = get_publisher().user_class()
|
|
agent_user.name = 'Agent'
|
|
agent_user.email = 'agent@example.com'
|
|
agent_user.name_identifiers = ['ABCDE']
|
|
agent_user.roles = [role.id]
|
|
agent_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2'),]
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'foo@localhost', '1': 'xxx'}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['form_name'] == 'test'
|
|
assert resp.json['data'][0]['form_slug'] == 'test'
|
|
assert resp.json['data'][0]['form_status'] == 'New'
|
|
assert resp.json['data'][0]['readable'] is False
|
|
|
|
formdef.skip_from_360_view = True
|
|
formdef.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
formdef.workflow_roles = {'_receiver': str(role.id)}
|
|
formdef.store()
|
|
formdef.data_class().rebuild_security()
|
|
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
|
|
assert len(resp.json['data']) == 1
|
|
|
|
agent_user.roles = []
|
|
agent_user.store()
|
|
get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user), status=403)
|
|
|
|
|
|
def test_user_drafts(pub, local_user):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2'),
|
|
fields.FileField(id='2', label='foobar3', varname='file'),]
|
|
formdef.keywords = 'hello, world'
|
|
formdef.disabled = False
|
|
formdef.enable_tracking_codes = True
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
formdata = formdef.data_class()()
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'base64me'])
|
|
formdata.data = {'0': 'foo@localhost', '1': 'xxx', '2': upload}
|
|
formdata.user_id = local_user.id
|
|
formdata.page_no = 1
|
|
formdata.status = 'draft'
|
|
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
|
|
resp2 = get_app(pub).get(sign_uri('/myspace/drafts', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json == resp2.json
|
|
assert not 'fields' in resp.json['data'][0]
|
|
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts?full=on', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert 'fields' in resp.json['data'][0]
|
|
assert resp.json['data'][0]['fields']['foobar'] == 'foo@localhost'
|
|
assert 'file' not in resp.json['data'][0]['fields'] # no file export in full lists
|
|
assert resp.json['data'][0]['keywords'] == ['hello', 'world']
|
|
|
|
formdef.enable_tracking_codes = False
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 1
|
|
|
|
formdef.enable_tracking_codes = True
|
|
formdef.disabled = True
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts', user=local_user))
|
|
assert resp.json['err'] == 0
|
|
assert len(resp.json['data']) == 0
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/user/drafts?NameID=xxx'))
|
|
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
|
|
resp2 = get_app(pub).get(sign_uri('/api/user/drafts?&NameID=xxx'))
|
|
assert resp.json == resp2.json
|
|
|
|
|
|
def test_api_list_formdata(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
|
|
items=['foo', 'bar', 'baz']),
|
|
fields.FileField(id='2', label='foobar4', varname='file'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'base64me'])
|
|
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
|
|
formdata.user_id = local_user.id
|
|
if i%4 == 0:
|
|
formdata.data['1'] = 'foo'
|
|
formdata.data['1_display'] = 'foo'
|
|
elif i%4 == 1:
|
|
formdata.data['1'] = 'bar'
|
|
formdata.data['1_display'] = 'bar'
|
|
else:
|
|
formdata.data['1'] = 'baz'
|
|
formdata.data['1_display'] = 'baz'
|
|
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
elif i%3 == 1:
|
|
formdata.jump_status('just_submitted')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
if i%7 == 0:
|
|
formdata.backoffice_submission = True
|
|
formdata.submission_channel = 'mail'
|
|
formdata.store()
|
|
|
|
# check access is denied if the user has not the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user), status=403)
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
# check it now gets the data
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
|
|
assert len(resp.json) == 30
|
|
assert datetime.datetime.strptime(resp.json[0]['receipt_time'], '%Y-%m-%dT%H:%M:%S')
|
|
assert not 'fields' in resp.json[0]
|
|
|
|
# check getting full formdata
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
|
|
assert len(resp.json) == 30
|
|
assert 'receipt_time' in resp.json[0]
|
|
assert 'fields' in resp.json[0]
|
|
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
|
|
assert 'user' in resp.json[0]
|
|
assert 'evolution' in resp.json[0]
|
|
assert len(resp.json[0]['evolution']) == 2
|
|
assert 'status' in resp.json[0]['evolution'][0]
|
|
assert 'who' in resp.json[0]['evolution'][0]
|
|
assert 'time' in resp.json[0]['evolution'][0]
|
|
assert resp.json[0]['evolution'][0]['who']['id'] == local_user.id
|
|
|
|
assert all('status' in x['workflow'] for x in resp.json)
|
|
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['backoffice'] is True
|
|
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission']['channel'] == 'mail'
|
|
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['backoffice'] is False
|
|
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['channel'] == 'web'
|
|
|
|
# check filtered results
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=foo', user=local_user))
|
|
assert len(resp.json) == 8
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=bar', user=local_user))
|
|
assert len(resp.json) == 8
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=baz', user=local_user))
|
|
assert len(resp.json) == 14
|
|
|
|
# check filter on status
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=pending', user=local_user))
|
|
assert len(resp.json) == 20
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=done', user=local_user))
|
|
assert len(resp.json) == 10
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
|
|
assert len(resp.json) == 30
|
|
|
|
# check limit and offset
|
|
resp_all = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
|
|
assert len(resp_all.json) == 30
|
|
partial_resps = []
|
|
for i in range(0, 48, 12):
|
|
partial_resps.append(get_app(pub).get(
|
|
sign_uri(
|
|
'/api/forms/test/list?filter=all&offset=%s&limit=12' % i,
|
|
user=local_user)))
|
|
assert len(partial_resps[0].json) == 12
|
|
assert len(partial_resps[1].json) == 12
|
|
assert len(partial_resps[2].json) == 6
|
|
assert len(partial_resps[3].json) == 0
|
|
resp_all_ids = [x.get('id') for x in resp_all.json]
|
|
resp_partial_ids = []
|
|
for resp in partial_resps:
|
|
resp_partial_ids.extend([x.get('id') for x in resp.json])
|
|
assert resp_all_ids == resp_partial_ids
|
|
|
|
# check error handling
|
|
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
|
|
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
|
|
|
|
|
|
def test_api_anonymized_formdata(pub, local_user, admin_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.ItemField(id='1', label='foobar3', varname='foobar3', type='item',
|
|
items=['foo', 'bar', 'baz']),
|
|
fields.FileField(id='2', label='foobar4', varname='file'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'base64me'])
|
|
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
|
|
formdata.user_id = local_user.id
|
|
if i%4 == 0:
|
|
formdata.data['1'] = 'foo'
|
|
formdata.data['1_display'] = 'foo'
|
|
elif i%4 == 1:
|
|
formdata.data['1'] = 'bar'
|
|
formdata.data['1_display'] = 'bar'
|
|
else:
|
|
formdata.data['1'] = 'baz'
|
|
formdata.data['1_display'] = 'baz'
|
|
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
evo = Evolution()
|
|
evo.who = admin_user.id
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % 'finished'
|
|
formdata.evolution.append(evo)
|
|
formdata.status = evo.status
|
|
formdata.store()
|
|
|
|
# check access is granted even if the user has not the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on', user=local_user))
|
|
assert len(resp.json) == 30
|
|
assert 'receipt_time' in resp.json[0]
|
|
assert 'fields' in resp.json[0]
|
|
assert 'user' not in resp.json[0]
|
|
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
|
|
assert 'foobar3' in resp.json[0]['fields']
|
|
assert 'foobar' not in resp.json[0]['fields']
|
|
assert 'evolution' in resp.json[0]
|
|
assert len(resp.json[0]['evolution']) == 2
|
|
assert 'status' in resp.json[0]['evolution'][0]
|
|
assert not 'who' in resp.json[0]['evolution'][0]
|
|
assert 'time' in resp.json[0]['evolution'][0]
|
|
# check evolution made by other than _submitter are exported
|
|
assert 'who' in resp.json[1]['evolution'][1]
|
|
assert 'id' in resp.json[1]['evolution'][1]['who']
|
|
assert 'email' in resp.json[1]['evolution'][1]['who']
|
|
assert 'NameID' in resp.json[1]['evolution'][1]['who']
|
|
assert 'name' in resp.json[1]['evolution'][1]['who']
|
|
|
|
# check access is granted event if there is no user
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on'))
|
|
assert len(resp.json) == 30
|
|
assert 'receipt_time' in resp.json[0]
|
|
assert 'fields' in resp.json[0]
|
|
assert 'user' not in resp.json[0]
|
|
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
|
|
assert 'foobar3' in resp.json[0]['fields']
|
|
assert 'foobar' not in resp.json[0]['fields']
|
|
assert 'evolution' in resp.json[0]
|
|
assert len(resp.json[0]['evolution']) == 2
|
|
assert 'status' in resp.json[0]['evolution'][0]
|
|
assert not 'who' in resp.json[0]['evolution'][0]
|
|
assert 'time' in resp.json[0]['evolution'][0]
|
|
# check anonymise is enforced on detail view
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise&full=on' % resp.json[1]['id']))
|
|
assert 'receipt_time' in resp.json
|
|
assert 'fields' in resp.json
|
|
assert 'user' not in resp.json
|
|
assert 'file' not in resp.json['fields'] # no file export in detail
|
|
assert 'foobar3' in resp.json['fields']
|
|
assert 'foobar' not in resp.json['fields']
|
|
assert 'evolution' in resp.json
|
|
assert len(resp.json['evolution']) == 2
|
|
assert 'status' in resp.json['evolution'][0]
|
|
assert not 'who' in resp.json['evolution'][0]
|
|
assert 'time' in resp.json['evolution'][0]
|
|
# check evolution made by other than _submitter are exported
|
|
assert 'who' in resp.json['evolution'][1]
|
|
assert 'id' in resp.json['evolution'][1]['who']
|
|
assert 'email' in resp.json['evolution'][1]['who']
|
|
assert 'NameID' in resp.json['evolution'][1]['who']
|
|
assert 'name' in resp.json['evolution'][1]['who']
|
|
|
|
|
|
def test_api_geojson_formdata(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
|
|
fields.FileField(id='1', label='foobar1', type='file')
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
formdef.geolocations = {'base': 'Location'}
|
|
formdef.store()
|
|
|
|
# check access is denied if the user has not the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=403)
|
|
# even if there's an anonymse parameter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?anonymise', user=local_user), status=403)
|
|
|
|
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive([b'base64me'])
|
|
|
|
foobar = '<font color="red">FOO BAR</font>'
|
|
username = '<font color="red">Jean Darmette</font>'
|
|
|
|
data = {'0': foobar, '1': upload}
|
|
local_user.name = username
|
|
local_user.store()
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
|
|
formdata.data = data
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
formdata.store()
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
# check it gets the data
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user))
|
|
assert 'features' in resp.json
|
|
assert len(resp.json['features']) == 10
|
|
display_fields = resp.json['features'][0]['properties']['display_fields']
|
|
for field in display_fields:
|
|
if field['label'] == 'Number':
|
|
assert field['varname'] == 'id'
|
|
assert field['html_value'] == '1-28'
|
|
assert field['value'] == '1-28'
|
|
if field['label'] == 'User Label':
|
|
assert field['varname'] == 'user_label'
|
|
assert field['value'] == username
|
|
assert field['html_value'] == "<font color="red">Jean Darmette</font>"
|
|
if field['label'] == 'foobar':
|
|
assert field['varname'] == 'foobar'
|
|
assert field['value'] == foobar
|
|
assert field['html_value'] == "<font color="red">FOO BAR</font>"
|
|
if field['label'] == 'foobar1':
|
|
assert field['varname'] is None
|
|
assert field['value'] == "test.txt"
|
|
assert field['html_value'] == '<div class="file-field"><a download="test.txt" href="http://example.net/backoffice/management/test/28/download?f=1"><span>test.txt</span></a></div>'
|
|
|
|
# check with a filter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?filter=done', user=local_user))
|
|
assert 'features' in resp.json
|
|
assert len(resp.json['features']) == 20
|
|
|
|
# check 404 if the formdef doesn't have geolocation support
|
|
formdef.geolocations = {}
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=404)
|
|
|
|
|
|
def test_api_global_geojson(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
formdef.geolocations = {'base': 'Location'}
|
|
formdef.store()
|
|
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
formdata.store()
|
|
|
|
if not pub.is_using_postgresql():
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
|
pytest.skip('this requires SQL')
|
|
return
|
|
|
|
# check empty content if user doesn't have the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
|
|
assert 'features' in resp.json
|
|
assert len(resp.json['features']) == 0
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
# check it gets the data
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
|
|
assert 'features' in resp.json
|
|
assert len(resp.json['features']) == 10
|
|
|
|
# check with a filter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson?status=done', user=local_user))
|
|
assert 'features' in resp.json
|
|
assert len(resp.json['features']) == 20
|
|
|
|
|
|
def test_api_global_listing(pub, local_user):
|
|
if not pub.is_using_postgresql():
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
|
pytest.skip('this requires SQL')
|
|
return
|
|
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
# check there's no crash if there are no formdefs
|
|
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
formdef.store()
|
|
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
formdata.data = {'0': 'FOO BAR'}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
formdata.store()
|
|
|
|
# check empty content if user doesn't have the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
# check it gets the data
|
|
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
|
|
assert len(resp.json['data']) == 10
|
|
|
|
# check with a filter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user))
|
|
assert len(resp.json['data']) == 20
|
|
|
|
# check limit/offset
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&limit=5', user=local_user))
|
|
assert len(resp.json['data']) == 5
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=5&limit=5', user=local_user))
|
|
assert len(resp.json['data']) == 5
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=18&limit=5', user=local_user))
|
|
assert len(resp.json['data']) == 2
|
|
|
|
# check error handling
|
|
get_app(pub).get(sign_uri('/api/forms/?status=done&limit=plop', user=local_user), status=400)
|
|
get_app(pub).get(sign_uri('/api/forms/?status=done&offset=plop', user=local_user), status=400)
|
|
|
|
|
|
def test_api_global_listing_ignored_roles(pub, local_user):
|
|
test_api_global_listing(pub, local_user)
|
|
|
|
role = Role(name='test2')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'test2'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
for i in range(10):
|
|
formdata = data_class()
|
|
date = time.strptime('2014-01-20', '%Y-%m-%d')
|
|
formdata.data = {'0': 'FOO BAR'}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
# considering roles
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100', user=local_user))
|
|
assert len(resp.json['data']) == 30
|
|
|
|
# ignore roles
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
|
|
assert len(resp.json['data']) == 40
|
|
|
|
# check sensitive forms are not exposed
|
|
formdef.skip_from_360_view = True
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
|
|
assert len(resp.json['data']) == 30
|
|
|
|
|
|
def test_api_include_anonymised(pub, local_user):
|
|
if not pub.is_using_postgresql():
|
|
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
|
pytest.skip('this requires SQL')
|
|
return
|
|
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
for i in range(10):
|
|
formdata = data_class()
|
|
formdata.data = {'0': 'FOO BAR'}
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
formdata.jump_status('new')
|
|
formdata.store()
|
|
|
|
# anonymise the last one
|
|
formdata.anonymise()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
|
|
assert len(resp.json['data']) == 10
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=on', user=local_user))
|
|
assert len(resp.json['data']) == 10
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=off', user=local_user))
|
|
assert len(resp.json['data']) == 9
|
|
|
|
|
|
@pytest.fixture
|
|
def ics_data(local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.url_name = 'test'
|
|
formdef.name = 'testé'
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
formdef.fields = [
|
|
fields.StringField(id='0', label='foobar', varname='foobar'),
|
|
fields.StringField(id='1', label='foobar2', varname='foobar2'),
|
|
]
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
data_class.wipe()
|
|
|
|
date = datetime.datetime(2014, 1, 20, 12, 00)
|
|
for i in range(30):
|
|
formdata = data_class()
|
|
formdata.data = {'0': (date + datetime.timedelta(days=i)).strftime('%Y-%m-%d %H:%M')}
|
|
formdata.data['1'] = (date + datetime.timedelta(days=i, minutes=i+1)).strftime('%Y-%m-%d %H:%M')
|
|
formdata.user_id = local_user.id
|
|
formdata.just_created()
|
|
if i%3 == 0:
|
|
formdata.jump_status('new')
|
|
else:
|
|
formdata.jump_status('finished')
|
|
formdata.store()
|
|
|
|
|
|
def test_api_ics_formdata(pub, local_user, ics_data):
|
|
role = Role.select()[0]
|
|
|
|
# check access is denied if the user has not the appropriate role
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403)
|
|
# even if there's an anonymse parameter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?anonymise', user=local_user), status=403)
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
def remove_dtstamp(body):
|
|
# remove dtstamp as the precise timing may vary between two consecutive
|
|
# calls and we shouldn't care.
|
|
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
|
|
|
|
# check it gets the data
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user))
|
|
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/', user=local_user))
|
|
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
|
|
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
|
|
assert resp.text.count('BEGIN:VEVENT') == 10
|
|
# check that description contains form name, display id, workflow status,
|
|
# backoffice url and attached user
|
|
pattern = re.compile(u'DESCRIPTION:testé \| 1-\d+ \| New', re.MULTILINE)
|
|
m = pattern.findall(resp.text)
|
|
assert len(m) == 10
|
|
assert resp.text.count('Jean Darmette') == 10
|
|
assert resp.text.count('DTSTART') == 10
|
|
|
|
# check with a filter
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?filter=done', user=local_user))
|
|
assert resp.text.count('BEGIN:VEVENT') == 20
|
|
pattern = re.compile(u'DESCRIPTION:testé \| 1-\d+ \| Finished', re.MULTILINE)
|
|
m = pattern.findall(resp.text)
|
|
assert len(m) == 20
|
|
assert resp.text.count('Jean Darmette') == 20
|
|
|
|
# check 404 on erroneous field var
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/xxx', user=local_user), status=404)
|
|
|
|
# check 404 on an erroneous field var for endtime
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/xxx', user=local_user), status=404)
|
|
|
|
# check 404 on too many path elements
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/xxx', user=local_user), status=404)
|
|
|
|
# check ics data with start and end varnames
|
|
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2', user=local_user))
|
|
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/', user=local_user))
|
|
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
|
|
assert resp.text.count('DTSTART') == 10
|
|
assert resp.text.count('DTEND') == 10
|
|
|
|
|
|
def test_api_ics_formdata_http_auth(pub, local_user, ics_data):
|
|
role = Role.select()[0]
|
|
|
|
# no access
|
|
app = get_app(pub)
|
|
app.authorization = ('Basic', ('user', 'password'))
|
|
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
|
|
|
# add authentication info
|
|
pub.load_site_options()
|
|
pub.site_options.add_section('api-http-auth-ics')
|
|
pub.site_options.set('api-http-auth-ics', 'user', 'password')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
|
|
# check access is denied if the user has not the appropriate role
|
|
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
|
|
|
# add proper role to user
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
# check it gets the data
|
|
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=200)
|
|
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
|
|
assert resp.text.count('BEGIN:VEVENT') == 10
|
|
|
|
# check it fails with a different password
|
|
app.authorization = ('Basic', ('user', 'password2'))
|
|
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
|
|
|
|
|
|
def test_roles(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='Hello World')
|
|
role.emails = ['toto@example.com', 'zozo@example.com']
|
|
role.details = 'kouign amann'
|
|
role.store()
|
|
|
|
resp = get_app(pub).get('/api/roles', status=403)
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/roles'))
|
|
assert resp.json['data'][0]['text'] == 'Hello World'
|
|
assert resp.json['data'][0]['slug'] == 'hello-world'
|
|
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
|
|
assert resp.json['data'][0]['emails_to_members'] == False
|
|
assert resp.json['data'][0]['details'] == 'kouign amann'
|
|
|
|
# also check old endpoint, for compatibility
|
|
resp = get_app(pub).get(sign_uri('/roles'), headers={'Accept': 'application/json'})
|
|
assert resp.json['data'][0]['text'] == 'Hello World'
|
|
assert resp.json['data'][0]['slug'] == 'hello-world'
|
|
assert resp.json['data'][0]['emails'] == ['toto@example.com', 'zozo@example.com']
|
|
assert resp.json['data'][0]['emails_to_members'] == False
|
|
assert resp.json['data'][0]['details'] == 'kouign amann'
|
|
|
|
|
|
def test_users(pub, local_user):
|
|
resp = get_app(pub).get('/api/users/', status=403)
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/'))
|
|
assert resp.json['data'][0]['user_display_name'] == local_user.name
|
|
assert resp.json['data'][0]['user_email'] == local_user.email
|
|
assert resp.json['data'][0]['user_id'] == local_user.id
|
|
|
|
role = Role(name='Foo bar')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
|
|
assert resp.json['data'][0]['user_email'] == local_user.email
|
|
assert len(resp.json['data'][0]['user_roles']) == 1
|
|
assert resp.json['data'][0]['user_roles'][0]['name'] == 'Foo bar'
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef(pub)
|
|
formdef.fields.append(fields.StringField(id='3', label='test', type='string'))
|
|
formdef.store()
|
|
|
|
local_user.form_data = {'3': 'HELLO'}
|
|
local_user.set_attributes_from_formdata(local_user.form_data)
|
|
local_user.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=HELLO'))
|
|
assert len(resp.json['data']) == 1
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=foobar'))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
|
|
def test_users_unaccent(pub, local_user):
|
|
local_user.name = 'Jean Sénisme'
|
|
local_user.store()
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=jean'))
|
|
assert resp.json['data'][0]['user_email'] == local_user.email
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=senisme'))
|
|
assert resp.json['data'][0]['user_email'] == local_user.email
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=sénisme'))
|
|
assert resp.json['data'][0]['user_email'] == local_user.email
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/users/?q=blah'))
|
|
assert len(resp.json['data']) == 0
|
|
|
|
|
|
def test_workflow_trigger(pub, local_user):
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.trigger = 'XXX'
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
|
|
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
|
|
status=200)
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
|
|
|
|
Role.wipe()
|
|
role = Role(name='xxx')
|
|
role.store()
|
|
|
|
jump.by = [role.id]
|
|
workflow.store()
|
|
|
|
formdata.store() # (will get back to wf-st1)
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'),
|
|
status=403)
|
|
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
|
|
status=403)
|
|
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', user=local_user),
|
|
status=200)
|
|
|
|
|
|
def test_workflow_trigger_with_condition(pub, local_user):
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.trigger = 'XXX'
|
|
jump.condition = {'type': 'django', 'value': 'form_var_foo == "bar"'}
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [fields.StringField(id='0', label='foo', varname='foo')]
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'foo'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
|
|
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=403)
|
|
assert resp.json == {'err_desc': 'unmet condition', 'err': 1}
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
|
|
# check without json
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX', format=None), status=403)
|
|
assert resp.content_type == 'text/html'
|
|
|
|
formdata.data['0'] = 'bar'
|
|
formdata.store()
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
|
|
assert resp.json == {'err': 0, 'url': None}
|
|
|
|
|
|
def test_workflow_trigger_jump_once(pub, local_user):
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
st3 = workflow.add_status('Status3', 'st3')
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.trigger = 'XXX'
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.trigger = 'XXX'
|
|
jump.status = 'st3'
|
|
st2.items.append(jump)
|
|
jump.parent = st2
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
|
|
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
|
|
assert resp.json == {'err': 0, 'url': None}
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
|
|
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'))
|
|
assert resp.json == {'err': 0, 'url': None}
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st3'
|
|
|
|
|
|
def test_workflow_global_webservice_trigger(pub, local_user):
|
|
workflow = Workflow(name='test')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
ac1 = workflow.add_global_action('Action', 'ac1')
|
|
trigger = ac1.append_trigger('webservice')
|
|
trigger.identifier = 'plop'
|
|
|
|
add_to_journal = RegisterCommenterWorkflowStatusItem()
|
|
add_to_journal.id = '_add_to_journal'
|
|
add_to_journal.comment = 'HELLO WORLD'
|
|
ac1.items.append(add_to_journal)
|
|
add_to_journal.parent = ac1
|
|
|
|
workflow.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdef.data_class().get(formdata.id).status == 'wf-st1'
|
|
|
|
# call to undefined hook
|
|
resp = get_app(pub).post(sign_uri(formdata.get_url() + 'hooks/XXX/'), status=404)
|
|
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/XXX/'), status=404)
|
|
|
|
# anonymous call
|
|
resp = get_app(pub).post(formdata.get_url() + 'hooks/plop/', status=200)
|
|
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD'
|
|
|
|
add_to_journal.comment = 'HELLO WORLD 2'
|
|
workflow.store()
|
|
resp = get_app(pub).post(formdata.get_api_url() + 'hooks/plop/', status=200)
|
|
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 2'
|
|
|
|
# call requiring user
|
|
add_to_journal.comment = 'HELLO WORLD 3'
|
|
trigger.roles = ['logged-users']
|
|
workflow.store()
|
|
resp = get_app(pub).post(formdata.get_api_url() + 'hooks/plop/', status=403)
|
|
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/'), status=200)
|
|
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 3'
|
|
|
|
# call requiring roles
|
|
add_to_journal.comment = 'HELLO WORLD 4'
|
|
trigger.roles = ['logged-users']
|
|
workflow.store()
|
|
Role.wipe()
|
|
role = Role(name='xxx')
|
|
role.store()
|
|
trigger.roles = [role.id]
|
|
workflow.store()
|
|
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/'), status=403)
|
|
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user), status=403)
|
|
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
resp = get_app(pub).post(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user), status=200)
|
|
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == 'HELLO WORLD 4'
|
|
|
|
# call adding data
|
|
add_to_journal.comment = 'HELLO {{plop_test}}'
|
|
workflow.store()
|
|
resp = get_app(pub).post_json(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user),
|
|
{'test': 'foobar'}, status=200)
|
|
# (django templating make it turn into HTML)
|
|
assert formdef.data_class().get(formdata.id).evolution[-1].parts[-1].content == '<div>HELLO foobar</div>'
|
|
|
|
# call adding data but with no actions
|
|
ac1.items = []
|
|
workflow.store()
|
|
resp = get_app(pub).post_json(sign_uri(formdata.get_api_url() + 'hooks/plop/', user=local_user),
|
|
{'test': 'BAR'}, status=200)
|
|
assert formdef.data_class().get(formdata.id).workflow_data == {'plop': {'test': 'BAR'}}
|
|
|
|
|
|
def test_tracking_code(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = []
|
|
formdef.enable_tracking_codes = True
|
|
formdef.store()
|
|
|
|
data_class = formdef.data_class()
|
|
formdata = data_class()
|
|
formdata.store()
|
|
|
|
code = get_publisher().tracking_code_class()
|
|
code.formdata = formdata
|
|
code.store()
|
|
|
|
# missing signature
|
|
get_app(pub).get('/api/code/foobar', status=403)
|
|
|
|
resp = get_app(pub).get(sign_url('/api/code/foobar?orig=coucou', '1234'), status=404)
|
|
assert resp.json['err'] == 1
|
|
|
|
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=200)
|
|
assert resp.json['err'] == 0
|
|
assert resp.json['url'] == 'http://example.net/test/%s' % formdata.id
|
|
assert resp.json['load_url'] == 'http://example.net/code/%s/load' % code.id
|
|
|
|
formdef.enable_tracking_codes = False
|
|
formdef.store()
|
|
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
|
|
|
|
formdef.enable_tracking_codes = True
|
|
formdef.store()
|
|
formdata.remove_self()
|
|
resp = get_app(pub).get(sign_url('/api/code/%s?orig=coucou' % code.id, '1234'), status=404)
|
|
|
|
|
|
def test_validate_expression(pub):
|
|
resp = get_app(pub).get('/api/validate-expression?expression=hello')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
resp = get_app(pub).get('/api/validate-expression?expression=[hello]')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
resp = get_app(pub).get('/api/validate-expression?expression==[hello')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'].startswith('syntax error')
|
|
resp = get_app(pub).get('/api/validate-expression?expression==[hello]')
|
|
assert resp.json['klass'] == 'warning'
|
|
assert resp.json['msg'].startswith('Make sure you want a Python expression,')
|
|
resp = get_app(pub).get('/api/validate-expression?expression==hello[0]')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
resp = get_app(pub).get('/api/validate-expression?expression==hello[\'plop\']')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
# django with unicode
|
|
resp = get_app(pub).get('/api/validate-expression?expression={{hello+%C3%A9l%C3%A9phant}}')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'].startswith('syntax error in Django template: Could not parse the remainder')
|
|
|
|
|
|
def test_validate_condition(pub):
|
|
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=~2')
|
|
assert resp.json == {'klass': None, 'msg': ''}
|
|
resp = get_app(pub).get('/api/validate-condition?type=python&value_python=hello -')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'].startswith('syntax error')
|
|
resp = get_app(pub).get('/api/validate-condition?type=python&value_python={{form_number}}==3')
|
|
assert resp.json['klass'] == 'error'
|
|
assert 'Python condition cannot contain {{' in resp.json['msg']
|
|
|
|
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=un+%C3%A9l%C3%A9phant')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'].startswith(u"syntax error: Unused 'éléphant'")
|
|
resp = get_app(pub).get('/api/validate-condition?type=django&value_django=~2')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'].startswith('syntax error')
|
|
|
|
resp = get_app(pub).get('/api/validate-condition?type=unknown&value_unknown=2')
|
|
assert resp.json['klass'] == 'error'
|
|
assert resp.json['msg'] == 'unknown condition type'
|
|
|
|
|
|
@pytest.fixture(params=['sql', 'pickle'])
|
|
def no_request_pub(request):
|
|
pub = create_temporary_pub(sql_mode=bool(request.param == 'sql'))
|
|
pub.app_dir = os.path.join(pub.APP_DIR, 'example.net')
|
|
pub.set_config()
|
|
|
|
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''
|
|
[wscall-secrets]
|
|
api.example.com = 1234
|
|
''')
|
|
return pub
|
|
|
|
|
|
def test_get_secret_and_orig(no_request_pub):
|
|
secret, orig = get_secret_and_orig('https://api.example.com/endpoint/')
|
|
assert secret == '1234'
|
|
assert orig == 'example.net'
|
|
|
|
|
|
def test_reverse_geocoding(pub):
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'}))
|
|
get_app(pub).get('/api/reverse-geocoding', status=400)
|
|
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
|
|
assert resp.content_type == 'application/json'
|
|
assert resp.text == json.dumps({'address': 'xxx'})
|
|
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=18&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
|
|
|
|
pub.site_options.add_section('options')
|
|
pub.site_options.set('options', 'nominatim_reverse_zoom_level', '16')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
|
|
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=16&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
|
|
|
|
pub.site_options.set('options', 'nominatim_key', 'KEY')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
|
|
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/reverse?zoom=16&key=KEY&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
|
|
|
|
pub.site_options.set('options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
|
|
assert urlopen.call_args[0][0] == 'http://reverse.example.net/?param=value&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
|
|
|
|
|
|
def test_formdef_submit_structured(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'test'
|
|
formdef.fields = [
|
|
fields.ItemField(id='0', label='foobar', varname='foobar',
|
|
data_source={
|
|
'type': 'json',
|
|
'value': 'http://datasource.com',
|
|
}),
|
|
fields.ItemField(id='1', label='foobar1', varname='foobar1',
|
|
data_source={
|
|
'type': 'formula',
|
|
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
|
|
}),
|
|
]
|
|
formdef.store()
|
|
data_class = formdef.data_class()
|
|
|
|
def url():
|
|
signed_url = sign_url(
|
|
'http://example.net/api/formdefs/test/submit'
|
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
|
return signed_url[len('http://example.net'):]
|
|
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO('''\
|
|
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
|
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
|
{"id": 2, "text": "deux", "foo": "bar2"}]}''')
|
|
resp = get_app(pub).post_json(url(), {'data': {
|
|
'0': '0',
|
|
"1": '3',
|
|
}})
|
|
|
|
formdata = data_class.get(resp.json['data']['id'])
|
|
assert formdata.status == 'wf-new'
|
|
assert formdata.data['0'] == '0'
|
|
assert formdata.data['0_display'] == 'zéro'
|
|
assert formdata.data['0_structured'] == {
|
|
'id': 0,
|
|
'text': 'zéro',
|
|
'foo': 'bar',
|
|
}
|
|
assert formdata.data['1'] == '3'
|
|
assert formdata.data['1_display'] == 'label 3'
|
|
assert formdata.data['1_structured'] == {
|
|
'id': 3,
|
|
'text': 'label 3',
|
|
'foo': 3,
|
|
}
|
|
|
|
data_class.wipe()
|
|
|
|
|
|
def test_geocoding(pub):
|
|
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
urlopen.side_effect = lambda *args: StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
|
|
get_app(pub).get('/api/geocoding', status=400)
|
|
resp = get_app(pub).get('/api/geocoding?q=test')
|
|
assert resp.content_type == 'application/json'
|
|
assert resp.text == json.dumps([{'lat': 0, 'lon': 0}])
|
|
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/search?format=json&q=test&accept-language=en'
|
|
|
|
pub.site_options.add_section('options')
|
|
pub.site_options.set('options', 'nominatim_key', 'KEY')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
resp = get_app(pub).get('/api/geocoding?q=test')
|
|
assert urlopen.call_args[0][0] == 'https://nominatim.entrouvert.org/search?key=KEY&format=json&q=test&accept-language=en'
|
|
|
|
pub.site_options.set('options', 'geocoding_service_url', 'http://reverse.example.net/?param=value')
|
|
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
|
|
resp = get_app(pub).get('/api/geocoding?q=test')
|
|
assert urlopen.call_args[0][0] == 'http://reverse.example.net/?param=value&format=json&q=test&accept-language=en'
|
|
|
|
|
|
def test_cards(pub, local_user):
|
|
Role.wipe()
|
|
role = Role(name='test')
|
|
role.store()
|
|
local_user.roles = [role.id]
|
|
local_user.store()
|
|
|
|
carddef = CardDef()
|
|
carddef.name = 'test'
|
|
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
|
|
carddef.workflow_roles = {'_viewer': role.id}
|
|
carddef.store()
|
|
|
|
formdata = carddef.data_class()()
|
|
formdata.data = {'0': 'blah'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
resp = get_app(pub).get(sign_uri('/api/cards/test/list'), status=403)
|
|
|
|
resp = get_app(pub).get(sign_uri(
|
|
'/api/cards/test/list?NameID=%s' %
|
|
local_user.name_identifiers[0]))
|
|
assert len(resp.json) == 1
|
|
resp = get_app(pub).get(sign_uri(
|
|
'/api/cards/test/list?NameID=%s&full=on' %
|
|
local_user.name_identifiers[0]))
|
|
assert resp.json[0]['fields']['foo'] == 'blah'
|