api: formdata filtering by block field (#58451)

This commit is contained in:
Lauréline Guérin 2021-12-03 15:55:10 +01:00
parent 34ac123c43
commit 7331b81263
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
3 changed files with 239 additions and 36 deletions

View File

@ -793,6 +793,161 @@ def test_api_list_formdata_date_filter(pub, local_user):
assert len(resp.json) == 1
def test_api_list_formdata_block_field_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {
'type': 'formula',
'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
}
data_source.store()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='1', label='String', type='string', varname='string'),
fields.ItemField(id='2', label='Item', type='item', data_source={'type': 'foobar'}, varname='item'),
fields.BoolField(id='3', label='Bool', type='bool', varname='bool'),
fields.DateField(id='4', label='Date', type='date', varname='date'),
fields.EmailField(id='5', label='Email', type='email', varname='email'),
]
block.store()
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.BlockField(id='0', label='Block Data', varname='blockdata', type='block:foobar', max_items=3),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
for i in range(10):
formdata = data_class()
formdata.data = {
'0': {
'data': [
{
'1': 'plop%s' % i,
'2': '1' if i % 2 else '2',
'2_display': 'foo' if i % 2 else 'bar',
'2_structured': 'XXX' if i % 2 else 'YYY',
'3': bool(i % 2),
'4': '2021-06-%02d' % (i + 1),
'5': 'a@localhost' if i % 2 else 'b@localhost',
},
],
'schema': {}, # not important here
},
'0_display': 'hello',
}
if i == 0:
formdata.data['0']['data'].append(
{
'1': 'plop%s' % i,
'2': '1',
'2_display': 'foo',
'2_structured': 'XXX',
'3': True,
'4': '2021-06-02',
'5': 'a@localhost',
},
)
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
# string
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop0', user=local_user))
assert len(resp.json) == 1
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop2', user=local_user))
assert len(resp.json) == 1
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop10', user=local_user))
assert len(resp.json) == 0
# item
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=1', user=local_user))
assert len(resp.json) == 6
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=2', user=local_user))
assert len(resp.json) == 5
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=3', user=local_user))
assert len(resp.json) == 0
# bool
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=true', user=local_user))
assert len(resp.json) == 6
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=false', user=local_user))
assert len(resp.json) == 5
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_bool=foobar', user=local_user), status=400
)
assert resp.json['err_desc'] == 'Invalid value "foobar" for "filter-blockdata_bool"'
# date
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-01', user=local_user)
)
assert len(resp.json) == 1
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-02', user=local_user)
)
assert len(resp.json) == 2
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_date=02/06/2021', user=local_user)
)
assert len(resp.json) == 2
# email
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_email=a@localhost', user=local_user)
)
assert len(resp.json) == 6
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_email=b@localhost', user=local_user)
)
assert len(resp.json) == 5
resp = get_app(pub).get(
sign_uri('/api/forms/test/list?filter-blockdata_email=c@localhost', user=local_user)
)
assert len(resp.json) == 0
# mix
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop1', user=local_user
)
)
assert len(resp.json) == 1
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop1', user=local_user
)
)
assert len(resp.json) == 0
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop0', user=local_user
)
)
assert len(resp.json) == 1
resp = get_app(pub).get(
sign_uri(
'/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop0', user=local_user
)
)
assert len(resp.json) == 1
def test_api_anonymized_formdata(pub, local_user, admin_user):
pub.role_class.wipe()
role = pub.role_class(name='test')

View File

@ -28,6 +28,7 @@ import vobject
from django.utils.encoding import force_text
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
from quixote.errors import RequestError
from quixote.html import TemplateIO, htmlescape, htmltext
from quixote.http_request import parse_query
@ -1492,25 +1493,36 @@ class FormPage(Directory):
field.has_relations = True
yield UserRelatedField(field)
for field in self.formdef.get_all_fields():
yield field
if not get_publisher().is_using_postgresql():
continue
if not (
field.type == 'item'
and field.data_source
and field.data_source.get('type', '').startswith('carddef:')
):
continue
try:
carddef = CardDef.get_by_urlname(field.data_source['type'][8:])
except KeyError:
continue
for card_field in carddef.get_all_fields():
if not hasattr(card_field, 'get_view_value'):
def iter_fields(fields, block_field=None):
for field in fields:
if block_field:
if field.key == 'items':
# not yet
continue
field.block_field = block_field
yield field
if not get_publisher().is_using_postgresql():
continue
field.has_relations = True
yield RelatedField(carddef, card_field, field)
if field.key == 'block':
yield from iter_fields(field.block.fields, block_field=field)
continue
if not (
field.type == 'item'
and field.data_source
and field.data_source.get('type', '').startswith('carddef:')
):
continue
try:
carddef = CardDef.get_by_urlname(field.data_source['type'][8:])
except KeyError:
continue
for card_field in carddef.get_all_fields():
if not hasattr(card_field, 'get_view_value'):
continue
field.has_relations = True
yield RelatedField(carddef, card_field, field)
yield from iter_fields(self.formdef.get_all_fields())
yield FakeField('status', 'status', _('Status'))
yield FakeField('anonymised', 'anonymised', _('Anonymised'))
@ -1570,6 +1582,8 @@ class FormPage(Directory):
return self.get_view_criterias(query_overrides, request=get_request())
def get_view_criterias(self, query_overrides=None, request=None):
from wcs import sql
fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
@ -1596,11 +1610,19 @@ class FormPage(Directory):
filter_field_key = None
field_varname = None
is_in_block_field = False
if filter_field.varname:
if getattr(filter_field, 'block_field', None) and filter_field.block_field.varname:
field_varname = '%s_%s' % (filter_field.block_field.varname, filter_field.varname)
is_in_block_field = True
else:
field_varname = filter_field.varname
if field_varname:
# if this is a field with a varname and filter-%(varname)s is
# present in the query string, enable this filter.
if filters_dict.get('filter-%s' % filter_field.varname):
filter_field_key = 'filter-%s' % filter_field.varname
if filters_dict.get('filter-%s' % field_varname):
filter_field_key = 'filter-%s' % field_varname
if filter_field.type == 'user-id':
# convert uuid based filter into local id filter
@ -1647,7 +1669,37 @@ class FormPage(Directory):
if not filter_field_value:
continue
if filter_field.type == 'period-date':
if filter_field_value is None and filter_field.type in [
'date',
'bool',
'item',
'items',
'string',
'email',
]:
continue
if filter_field.type == 'date':
try:
filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d')
except ValueError:
continue
elif filter_field.type == 'bool':
if filter_field_value == 'true':
filter_field_value = True
elif filter_field_value == 'false':
filter_field_value = False
else:
raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
if is_in_block_field:
criterias.append(
sql.ArrayContains(
'f%s' % filter_field.block_field.id,
json.dumps([{filter_field.id: filter_field_value}]),
parent_field=filter_field.block_field,
)
)
elif filter_field.type == 'period-date':
try:
filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
except ValueError:
@ -1667,7 +1719,7 @@ class FormPage(Directory):
criterias.append(Equal('user_id', filter_field_value))
elif filter_field.type == 'submission-agent-id':
criterias.append(Equal('submission_agent_id', filter_field_value))
elif filter_field.type in ('item', 'items') and filter_field_value is not None:
elif filter_field.type in ('item', 'items'):
if filter_field.type == 'item':
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type == 'items':
@ -1679,20 +1731,12 @@ class FormPage(Directory):
filter_field_value = option[1]
break
criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
elif filter_field.type == 'bool' and filter_field_value is not None:
if filter_field_value == 'true':
criterias.append(Equal('f%s' % filter_field.id, True))
elif filter_field_value == 'false':
criterias.append(Equal('f%s' % filter_field.id, False))
elif filter_field.type in ('string', 'email') and filter_field_value is not None:
elif filter_field.type == 'bool':
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type in ('string', 'email'):
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type == 'date':
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type == 'date' and filter_field_value is not None:
try:
filter_field_value = misc.get_as_datetime(filter_field_value).date()
except ValueError:
pass
else:
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
return criterias

View File

@ -107,11 +107,15 @@ class Criteria(qommon.storage.Criteria):
self.attribute = attribute.replace('-', '_')
self.value = value
self.field = kwargs.get('field')
self.parent_field = kwargs.get('parent_field')
def as_sql(self):
attribute = self.attribute
if self.field and self.field.key == 'computed':
attribute = "%s->>'data'" % self.attribute
if self.parent_field and self.parent_field.key == 'block':
attribute = "%s->'data'" % self.attribute
return '%s %s %%(c%s)s' % (attribute, self.sql_op, id(self.value))
def as_sql_param(self):
@ -159,7 +163,7 @@ class Contains(Criteria):
def as_sql(self):
if not self.value:
return 'FALSE'
return '%s %s %%(c%s)s' % (self.attribute, self.sql_op, id(self.value))
return super().as_sql()
def as_sql_param(self):
return {'c%s' % id(self.value): tuple(self.value)}