wcs/tests/api/test_formdata.py

1286 lines
46 KiB
Python

# -*- coding: utf-8 -*-
import base64
import datetime
import os
import re
import time
import xml.etree.ElementTree as ET
import zipfile
import pytest
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from wcs import fields
from wcs.blocks import BlockDef
from wcs.data_sources import NamedDataSource
from wcs.formdata import Evolution
from wcs.formdef import FormDef
from wcs.qommon import ods
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.roles import Role
from wcs.workflows import EditableWorkflowStatusItem, Workflow, WorkflowBackofficeFieldsFormDef
from .utils import sign_uri
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
[api-secrets]
coucou = 1234
'''
)
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.fixture
def local_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.name_identifiers = ['0123456789']
user.store()
return user
@pytest.fixture
def admin_user():
get_publisher().user_class.wipe()
user = get_publisher().user_class()
user.name = 'John Doe Admin'
user.email = 'john.doe@example.com'
user.name_identifiers = ['0123456789']
user.is_admin = True
user.store()
account = PasswordAccount(id='admin')
account.set_password('admin')
account.user_id = user.id
account.store()
return user
@pytest.fixture
def ics_data(local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.url_name = 'test'
formdef.name = 'testé'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2', varname='foobar2'),
]
formdef.digest_template = 'plöp {{ form_var_foobar }} plÔp'
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
date = datetime.datetime(2014, 1, 20, 12, 00)
for i in range(30):
formdata = data_class()
formdata.data = {'0': (date + datetime.timedelta(days=i)).strftime('%Y-%m-%d %H:%M')}
formdata.data['1'] = (date + datetime.timedelta(days=i, minutes=i + 1)).strftime('%Y-%m-%d %H:%M')
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# not a datetime: ignored
date = datetime.date(2014, 1, 20)
formdata = data_class()
formdata.data = {'0': '12:00'}
formdata.data['1'] = '13:00'
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
def test_formdata(pub, local_user):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {
'type': 'formula',
'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
}
data_source.store()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='abc', label='Foo', varname='foo'),
fields.ItemField(id='xyz', label='Test', type='item', data_source={'type': 'foobar'}, varname='bar'),
]
block.store()
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
formdef = FormDef()
formdef.geolocations = {'base': 'blah'}
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2'),
fields.DateField(id='2', label='foobar3', varname='date'),
fields.FileField(id='3', label='foobar4', varname='file'),
fields.ItemField(id='4', label='foobar5', varname='item', data_source={'type': 'foobar'}),
fields.BlockField(id='5', label='test', varname='blockdata', type='block:foobar', max_items=3),
]
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
workflow.roles['_foobar'] = 'Foobar'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id, '_foobar': another_role.id}
formdef.store()
item_field = formdef.fields[4]
formdef.data_class().wipe()
formdata = formdef.data_class()()
date = time.strptime('2014-01-20', '%Y-%m-%d')
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {
'0': 'foo@localhost',
'1': 'xxx',
'2': date,
'3': upload,
'4': '1',
'5': {
'data': [
{'abc': 'plop', 'xyz': '1', 'xyz_display': 'foo', 'xyz_structured': 'XXX'},
],
'schema': {}, # not important here
},
'5_display': 'hello',
}
formdata.data['4_display'] = item_field.store_display_value(formdata.data, item_field.id)
formdata.data['4_structured'] = item_field.store_structured_value(formdata.data, item_field.id)
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.geolocations = {'base': {'lon': 10, 'lat': -12}}
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=403)
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=200)
assert datetime.datetime.strptime(resp.json['last_update_time'], '%Y-%m-%dT%H:%M:%S')
assert datetime.datetime.strptime(resp.json['receipt_time'], '%Y-%m-%dT%H:%M:%S')
assert len(resp.json['fields']) == 8
assert 'foobar' in resp.json['fields']
assert 'foobar2' not in resp.json['fields'] # foobar2 has no varname, not in json
assert resp.json['user']['name'] == local_user.name
assert resp.json['fields']['foobar'] == 'foo@localhost'
assert resp.json['fields']['date'] == '2014-01-20'
assert resp.json['fields']['file']['content'] == 'YmFzZTY0bWU=' # base64('base64me')
assert resp.json['fields']['file']['filename'] == 'test.txt'
assert resp.json['fields']['file']['content_type'] == 'text/plain'
assert resp.json['fields']['item'] == 'foo'
assert resp.json['fields']['item_raw'] == '1'
assert resp.json['fields']['item_structured'] == {'id': '1', 'text': 'foo', 'more': 'XXX'}
assert resp.json['fields']['blockdata'] == 'hello'
assert resp.json['fields']['blockdata_raw'] == [
{'foo': 'plop', 'bar': 'foo', 'bar_raw': '1', 'bar_structured': 'XXX'}
]
assert resp.json['workflow']['status']['name'] == 'New'
assert resp.json['workflow']['real_status']['name'] == 'New'
assert resp.json['submission']['channel'] == 'web'
assert resp.json['geolocations']['base']['lon'] == 10
assert resp.json['geolocations']['base']['lat'] == -12
assert [x.get('id') for x in resp.json['roles']['_receiver']] == [str(role.id)]
assert [x.get('id') for x in resp.json['roles']['_foobar']] == [str(another_role.id)]
assert set([x.get('id') for x in resp.json['roles']['concerned']]) == set(
[str(role.id), str(another_role.id)]
)
assert [x.get('id') for x in resp.json['roles']['actions']] == [str(role.id)]
# check the ?format=json endpoint returns 403
get_app(pub).get('/test/%s/?format=json' % formdata.id, status=403)
get_app(pub).get(sign_uri('/test/%s/' % formdata.id, user=local_user), status=403)
# check status visibility
workflow.add_status('Status1', 'st1')
workflow.possible_status[-1].visibility = ['unknown']
workflow.store()
formdata.jump_status('st1')
assert formdata.status == 'wf-st1'
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=200)
assert resp.json['workflow']['status'] == {'id': 'new', 'name': 'New'}
assert resp.json['workflow']['real_status'] == {'id': 'st1', 'name': 'Status1'}
def test_formdata_backoffice_fields(pub, local_user):
test_formdata(pub, local_user)
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
]
workflow.store()
formdef = FormDef.select()[0]
formdata = formdef.data_class().select()[0]
formdata.data['bo1'] = 'Hello world'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['fields']['backoffice_blah'] == 'Hello world'
def test_formdata_duplicated_varnames(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
formdef = FormDef()
formdef.geolocations = {'base': 'blah'}
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.StringField(id='1', label='foobar2', varname='foobar'),
]
workflow = Workflow.get_default_workflow()
workflow.roles['_foobar'] = 'Foobar'
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id, '_foobar': another_role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'0': 'foo',
'1': 'bar',
}
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.store()
local_user.roles = [role.id]
local_user.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
formdata.data = {
'0': 'foo',
'1': '',
}
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
formdata.data = {
'0': '',
'1': 'foo',
}
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user), status=200)
assert resp.json['fields'] == {'foobar': 'foo'}
def test_formdata_edit(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role.id = '321'
another_role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
workflow.roles['_foobar'] = 'Foobar'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id, '_foobar': another_role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.data = {
'0': 'foo@localhost',
}
formdata.user_id = local_user.id
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
formdata.store()
# not user
get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id), {'data': {'0': 'bar@localhost'}}, status=403
)
# no editable action
get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403,
)
wfedit = EditableWorkflowStatusItem()
wfedit.id = '_wfedit'
wfedit.by = [local_user.roles[0]]
workflow.possible_status[1].items.append(wfedit)
wfedit.parent = workflow.possible_status[1]
workflow.store()
get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=200,
)
assert formdef.data_class().select()[0].data['0'] == 'bar@localhost'
# not editable by user role
wfedit.by = ['XX']
workflow.store()
get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar@localhost'}},
status=403,
)
# edit + jump
wfedit.status = 'rejected'
wfedit.by = [local_user.roles[0]]
workflow.store()
get_app(pub).post_json(
sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user),
{'data': {'0': 'bar2@localhost'}},
status=200,
)
assert formdef.data_class().select()[0].data['0'] == 'bar2@localhost'
assert formdef.data_class().select()[0].status == 'wf-rejected'
def test_formdata_with_workflow_data(pub, local_user):
Role.wipe()
role = Role(name='test')
role.id = '123'
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = []
workflow = Workflow.get_default_workflow()
workflow.id = '2'
workflow.store()
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id}
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.status = 'wf-new'
formdata.evolution[-1].status = 'wf-new'
from wcs.qommon.form import PicklableUpload as PicklableUpload3
upload = PicklableUpload3('test.txt', 'text/plain', 'ascii')
upload.receive([b'test'])
upload2 = PicklableUpload3('test.txt', 'text/plain', 'ascii')
upload2.receive([b'test'])
formdata.workflow_data = {'blah': upload, 'blah2': upload2, 'xxx': 23}
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
assert resp.json['workflow']['data']['xxx'] == 23
assert resp.json['workflow']['data']['blah']['filename'] == 'test.txt'
assert resp.json['workflow']['data']['blah']['content_type'] == 'text/plain'
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah']['content'])) == b'test'
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah2']['content'])) == b'test'
def test_api_list_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
fields.ItemField(
id='1', label='foobar3', varname='foobar3', type='item', items=['foo', 'bar', 'baz']
),
fields.FileField(id='2', label='foobar4', varname='file', type='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i % 4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i % 4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
elif i % 3 == 1:
formdata.jump_status('just_submitted')
else:
formdata.jump_status('finished')
if i % 7 == 0:
formdata.backoffice_submission = True
formdata.submission_channel = 'mail'
formdata.evolution[-1].time = (
datetime.datetime(2020, 1, 2, 3, 4) + datetime.timedelta(hours=i)
).timetuple()
formdata.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/list', user=local_user))
assert len(resp.json) == 30
assert datetime.datetime.strptime(resp.json[0]['receipt_time'], '%Y-%m-%dT%H:%M:%S')
assert 'fields' not in resp.json[0]
# check getting full formdata
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'url' in resp.json[0]['fields']['file']
assert 'content' not in resp.json[0]['fields']['file'] # no file content in full lists
assert 'user' in resp.json[0]
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert 'who' in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
assert resp.json[0]['evolution'][0]['who']['id'] == local_user.id
assert all('status' in x['workflow'] for x in resp.json)
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission'][
'backoffice'
] is True
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 0'][0]['submission'][
'channel'
] == 'mail'
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission'][
'backoffice'
] is False
assert [x for x in resp.json if x['fields']['foobar'] == 'FOO BAR 1'][0]['submission']['channel'] == 'web'
# check filtered results
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=foo', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=bar', user=local_user))
assert len(resp.json) == 8
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar3=baz', user=local_user))
assert len(resp.json) == 14
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=FOO BAR 3', user=local_user))
assert len(resp.json) == 1
# check filter on status
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=pending', user=local_user))
assert len(resp.json) == 20
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=done', user=local_user))
assert len(resp.json) == 10
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp.json) == 30
# check filter on last update time
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-start-mtime=on&filter-start-mtime-value=2020-01-03', user=local_user
)
)
assert len(resp.json) == 16
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-start-mtime=on&filter-start-mtime-value=2020-01-03 10:00',
user=local_user,
)
)
assert len(resp.json) == 10
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-end-mtime=on&filter-end-mtime-value=2020-01-03', user=local_user
)
)
assert len(resp.json) == 14
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-end-mtime=on&filter-end-mtime-value=2020-01-03 10:00',
user=local_user,
)
)
assert len(resp.json) == 20
# check limit and offset
resp_all = get_app(pub).get(sign_uri('/api/forms/test/list?filter=all', user=local_user))
assert len(resp_all.json) == 30
partial_resps = []
for i in range(0, 48, 12):
partial_resps.append(
get_app(pub).get(
sign_uri('/api/forms/test/list?filter=all&offset=%s&limit=12' % i, user=local_user)
)
)
assert len(partial_resps[0].json) == 12
assert len(partial_resps[1].json) == 12
assert len(partial_resps[2].json) == 6
assert len(partial_resps[3].json) == 0
resp_all_ids = [x.get('id') for x in resp_all.json]
resp_partial_ids = []
for resp in partial_resps:
resp_partial_ids.extend([x.get('id') for x in resp.json])
assert resp_all_ids == resp_partial_ids
# check error handling
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
def test_api_anonymized_formdata(pub, local_user, admin_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
fields.ItemField(
id='1', label='foobar3', varname='foobar3', type='item', items=['foo', 'bar', 'baz']
),
fields.FileField(id='2', label='foobar4', varname='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(30):
formdata = data_class()
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
formdata.data = {'0': 'FOO BAR %d' % i, '2': upload}
formdata.user_id = local_user.id
if i % 4 == 0:
formdata.data['1'] = 'foo'
formdata.data['1_display'] = 'foo'
elif i % 4 == 1:
formdata.data['1'] = 'bar'
formdata.data['1_display'] = 'bar'
else:
formdata.data['1'] = 'baz'
formdata.data['1_display'] = 'baz'
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
evo = Evolution()
evo.who = admin_user.id
evo.time = time.localtime()
evo.status = 'wf-%s' % 'finished'
formdata.evolution.append(evo)
formdata.status = evo.status
formdata.store()
# check access is granted even if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on', user=local_user))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert 'who' not in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json[1]['evolution'][1]
assert 'id' in resp.json[1]['evolution'][1]['who']
assert 'email' in resp.json[1]['evolution'][1]['who']
assert 'NameID' in resp.json[1]['evolution'][1]['who']
assert 'name' in resp.json[1]['evolution'][1]['who']
# check access is granted event if there is no user
resp = get_app(pub).get(sign_uri('/api/forms/test/list?anonymise&full=on'))
assert len(resp.json) == 30
assert 'receipt_time' in resp.json[0]
assert 'fields' in resp.json[0]
assert 'user' not in resp.json[0]
assert 'file' not in resp.json[0]['fields'] # no file export in full lists
assert 'foobar3' in resp.json[0]['fields']
assert 'foobar' not in resp.json[0]['fields']
assert 'evolution' in resp.json[0]
assert len(resp.json[0]['evolution']) == 2
assert 'status' in resp.json[0]['evolution'][0]
assert 'who' not in resp.json[0]['evolution'][0]
assert 'time' in resp.json[0]['evolution'][0]
# check anonymise is enforced on detail view
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise&full=on' % resp.json[1]['id']))
assert 'receipt_time' in resp.json
assert 'fields' in resp.json
assert 'user' not in resp.json
assert 'file' not in resp.json['fields'] # no file export in detail
assert 'foobar3' in resp.json['fields']
assert 'foobar' not in resp.json['fields']
assert 'evolution' in resp.json
assert len(resp.json['evolution']) == 2
assert 'status' in resp.json['evolution'][0]
assert 'who' not in resp.json['evolution'][0]
assert 'time' in resp.json['evolution'][0]
# check evolution made by other than _submitter are exported
assert 'who' in resp.json['evolution'][1]
assert 'id' in resp.json['evolution'][1]['who']
assert 'email' in resp.json['evolution'][1]['who']
assert 'NameID' in resp.json['evolution'][1]['who']
assert 'name' in resp.json['evolution'][1]['who']
def test_api_geojson_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
fields.FileField(id='1', label='foobar1', type='file'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?anonymise', user=local_user), status=403)
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
upload.receive([b'base64me'])
foobar = '<font color="red">FOO BAR</font>'
username = '<font color="red">Jean Darmette</font>'
data = {'0': foobar, '1': upload}
local_user.name = username
local_user.store()
for i in range(30):
formdata = data_class()
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
display_fields = resp.json['features'][0]['properties']['display_fields']
assert len(display_fields) == 5
for field in display_fields:
if field['label'] == 'Number':
assert field['varname'] == 'id'
assert field['html_value'] == '1-28'
assert field['value'] == '1-28'
if field['label'] == 'User Label':
assert field['varname'] == 'user_label'
assert field['value'] == username
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;Jean Darmette&lt;/font&gt;"
if field['label'] == 'foobar':
assert field['varname'] == 'foobar'
assert field['value'] == foobar
assert field['html_value'] == "&lt;font color=&quot;red&quot;&gt;FOO BAR&lt;/font&gt;"
if field['label'] == 'foobar1':
assert field['varname'] is None
assert field['value'] == "test.txt"
assert field['html_value'] == (
'<div class="file-field"><a download="test.txt" href="http://example.net/backoffice/management/test/28/download?f=1">'
'<span>test.txt</span></a></div>'
)
field_varnames = [f['varname'] for f in display_fields]
assert 'foobar' not in field_varnames
# check full=on
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?full=on', user=local_user))
assert len(resp.json['features']) == 10
display_fields = resp.json['features'][0]['properties']['display_fields']
assert len(display_fields) == 8
field_varnames = [f['varname'] for f in display_fields]
assert 'foobar' in field_varnames
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson?filter=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
# check with http basic auth
app = get_app(pub)
app.authorization = ('Basic', ('user', 'password'))
resp = app.get('/api/forms/test/geojson?email=%s' % local_user.email, status=401)
# add authentication info
pub.load_site_options()
pub.site_options.add_section('api-http-auth-geojson')
pub.site_options.set('api-http-auth-geojson', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
resp = app.get('/api/forms/test/geojson?email=%s' % local_user.email)
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check 404 if the formdef doesn't have geolocation support
formdef.geolocations = {}
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=404)
def test_api_ods_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar', type='string'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user), status=403)
# even if there's an anonymise parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/ods?anonymise', user=local_user), status=403)
data = {'0': 'foobar'}
for i in range(30):
formdata = data_class()
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
# check it still gives a ods file when there is more data
for i in range(300):
formdata = data_class()
formdata.data = data
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
zipf = zipfile.ZipFile(BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
def test_api_global_geojson(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = []
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
for i in range(30):
formdata = data_class()
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/geojson?status=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
def test_api_global_listing(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
role.store()
# check there's no crash if there are no formdefs
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.store()
for i in range(30):
formdata = data_class()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
if i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user))
assert len(resp.json['data']) == 20
# check limit/offset
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=5&limit=5', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?status=done&offset=18&limit=5', user=local_user))
assert len(resp.json['data']) == 2
# check error handling
get_app(pub).get(sign_uri('/api/forms/?status=done&limit=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/?status=done&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/?category_id=plop', user=local_user), status=400)
# check when there are missing statuses
for formdata in data_class.select():
formdata.status = 'wf-missing'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/?status=all', user=local_user))
assert resp.json['data'][0]['status'] is None
assert 'unknown' in resp.json['data'][0]['title']
def test_api_global_listing_ignored_roles(pub, local_user):
test_api_global_listing(pub, local_user)
role = Role(name='test2')
role.store()
formdef = FormDef()
formdef.name = 'test2'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# considering roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100', user=local_user))
assert len(resp.json['data']) == 30
# ignore roles
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 40
# check sensitive forms are not exposed
formdef.skip_from_360_view = True
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user))
assert len(resp.json['data']) == 30
def test_api_include_anonymised(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
role.store()
# add proper role to user
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# anonymise the last one
formdata.anonymise()
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=on', user=local_user))
assert len(resp.json['data']) == 10
resp = get_app(pub).get(sign_uri('/api/forms/?include-anonymised=off', user=local_user))
assert len(resp.json['data']) == 9
def test_api_ics_formdata(pub, local_user, ics_data):
role = Role.select()[0]
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403)
# even if there's an anonymse parameter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?anonymise', user=local_user), status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
def remove_dtstamp(body):
# remove dtstamp as the precise timing may vary between two consecutive
# calls and we shouldn't care.
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
# check 404 on incomplete ics url access
assert get_app(pub).get(sign_uri('/api/forms/test/ics/', user=local_user), status=404)
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check that description contains form name, display id, workflow status,
# backoffice url and attached user
pattern = re.compile(r'DESCRIPTION:testé \| 1-\d+ \| New', re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 10
assert resp.text.count('Jean Darmette') == 10
assert resp.text.count('DTSTART') == 10
# check formdata digest summary and description contains the formdata digest
pattern = re.compile(r'SUMMARY:testé #1-\d+ - plöp \d{4}-\d{2}-\d{2} \d{2}:\d{2} plÔp', re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 10
assert resp.text.count(r'plöp') == 20
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar?filter=done', user=local_user))
assert resp.text.count('BEGIN:VEVENT') == 20
pattern = re.compile(r'DESCRIPTION:testé \| 1-\d+ \| Finished', re.MULTILINE)
m = pattern.findall(resp.text)
assert len(m) == 20
assert resp.text.count('Jean Darmette') == 20
# check 404 on erroneous field var
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/xxx', user=local_user), status=404)
# check 404 on an erroneous field var for endtime
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/xxx', user=local_user), status=404)
# check 404 on too many path elements
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/xxx', user=local_user), status=404)
# check ics data with start and end varnames
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar/foobar2/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.text.count('DTSTART') == 10
assert resp.text.count('DTEND') == 10
def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
role = Role.select()[0]
# check as admin
app = login(get_app(pub))
resp = app.get('/api/forms/test/ics/foobar', status=200)
# no access
app = get_app(pub)
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
assert resp.headers['Www-Authenticate']
# auth but no access
app = get_app(pub)
app.authorization = ('Basic', ('user', 'password'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
# add authentication info
pub.load_site_options()
pub.site_options.add_section('api-http-auth-ics')
pub.site_options.set('api-http-auth-ics', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
# check access is denied if the user has not the appropriate role
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)
# check access is denied if the user is not specified
resp = app.get('/api/forms/test/ics/foobar', status=403)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=200)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check it fails with a different password
app.authorization = ('Basic', ('user', 'password2'))
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=401)
def test_api_ics_formdata_custom_view(pub, local_user, ics_data):
role = Role.select()[0]
formdef = FormDef.get_by_urlname('test')
pub.custom_view_class.wipe()
custom_view = pub.custom_view_class()
custom_view.title = 'custom view'
custom_view.formdef = formdef
custom_view.columns = {'list': [{'id': '0'}]}
custom_view.filters = {}
custom_view.visibility = 'any'
custom_view.store()
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar', user=local_user), status=403)
# even if there's an anonymise parameter
resp = get_app(pub).get(
sign_uri('/api/forms/test/custom-view/ics/foobar?anonymise', user=local_user), status=403
)
# add proper role to user
local_user.roles = [role.id]
local_user.store()
def remove_dtstamp(body):
# remove dtstamp as the precise timing may vary between two consecutive
# calls and we shouldn't care.
return re.sub('DTSTAMP:.*', 'DTSTAMP:--', body)
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.headers['content-type'] == 'text/calendar; charset=utf-8'
assert resp.text.count('BEGIN:VEVENT') == 10
# check ics data with start and end varnames
resp = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/foobar2', user=local_user))
resp2 = get_app(pub).get(sign_uri('/api/forms/test/custom-view/ics/foobar/foobar2/', user=local_user))
assert remove_dtstamp(resp.text) == remove_dtstamp(resp2.text)
assert resp.text.count('DTSTART') == 10
assert resp.text.count('DTEND') == 10
def test_api_invalid_http_basic_auth(pub, local_user, admin_user, ics_data):
app = get_app(pub)
app.get(
'/api/forms/test/ics/foobar?email=%s' % local_user.email,
headers={'Authorization': 'Basic garbage'},
status=401,
)