API: filter formdefs, forms and drafts by category slugs (#53371)

This commit is contained in:
Lauréline Guérin 2021-04-26 14:54:43 +02:00
parent c79e88fd02
commit 4529a44961
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
4 changed files with 261 additions and 2 deletions

View File

@ -16,6 +16,7 @@ from quixote import get_publisher
from wcs import fields
from wcs.api_access import ApiAccess
from wcs.blocks import BlockDef
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.formdata import Evolution
from wcs.formdef import FormDef
@ -1100,6 +1101,78 @@ def test_api_global_listing(pub, local_user):
assert 'unknown' in resp.json['data'][0]['title']
def test_api_global_listing_categories_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
Category.wipe()
category1 = Category()
category1.name = 'Category 1'
category1.store()
category2 = Category()
category2.name = 'Category 2'
category2.store()
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
FormDef.wipe()
formdef1 = FormDef()
formdef1.name = 'test 1'
formdef1.workflow_roles = {'_receiver': role.id}
formdef1.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef1.category = category1
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test 2'
formdef2.workflow_roles = {'_receiver': role.id}
formdef2.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef2.category = category2
formdef2.store()
data_class1 = formdef1.data_class()
data_class1.wipe()
data_class2 = formdef2.data_class()
data_class2.wipe()
for _ in range(2):
formdata = data_class1()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
for _ in range(3):
formdata = data_class2()
formdata.data = {'0': 'FOO BAZ'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/forms/?category_slugs=category-1', user=local_user))
assert len(resp.json['data']) == 2
resp = get_app(pub).get(sign_uri('/api/forms/?category_slugs=category-2', user=local_user))
assert len(resp.json['data']) == 3
resp = get_app(pub).get(sign_uri('/api/forms/?category_slugs=unknown', user=local_user))
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri('/api/forms/?category_slugs=category-1,unknown', user=local_user))
assert len(resp.json['data']) == 2
resp = get_app(pub).get(sign_uri('/api/forms/?category_slugs=category-1,category-2', user=local_user))
assert len(resp.json['data']) == 5
def test_api_global_listing_ignored_roles(pub, local_user):
test_api_global_listing(pub, local_user)

View File

@ -15,6 +15,7 @@ from quixote import get_publisher
from wcs import fields, qommon
from wcs.api_utils import sign_url
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
@ -147,6 +148,46 @@ def test_formdef_list(pub):
assert resp.json['data'][0]['count'] == 17
def test_formdef_list_categories_filter(pub):
Category.wipe()
category1 = Category()
category1.name = 'Category 1'
category1.store()
category2 = Category()
category2.name = 'Category 2'
category2.store()
FormDef.wipe()
formdef1 = FormDef()
formdef1.name = 'test 1'
formdef1.category_id = category1.id
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test 2'
formdef2.category_id = category2.id
formdef2.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
resp = get_app(pub).get(sign_uri('/api/formdefs/?category_slugs=unknown'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri('/api/formdefs/?category_slugs=category-1'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/formdefs/?category_slugs=category-1,unknown'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 1
resp = get_app(pub).get(sign_uri('/api/formdefs/?category_slugs=category-1,category-2'))
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
def test_limited_formdef_list(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')

View File

@ -8,6 +8,7 @@ from quixote import get_publisher
from wcs import fields
from wcs.admin.settings import UserFieldsFormDef
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
@ -382,6 +383,68 @@ def test_user_forms_limit_offset(pub, local_user):
assert [x['form_number_raw'] for x in resp.json['data']] == [str(x) for x in range(50, 40, -1)]
def test_user_forms_categories_filter(pub, local_user):
Category.wipe()
category1 = Category()
category1.name = 'Category 1'
category1.store()
category2 = Category()
category2.name = 'Category 2'
category2.store()
FormDef.wipe()
formdef1 = FormDef()
formdef1.name = 'test 1'
formdef1.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef1.category = category1
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test 2'
formdef2.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef2.category = category2
formdef2.store()
data_class1 = formdef1.data_class()
data_class1.wipe()
data_class2 = formdef2.data_class()
data_class2.wipe()
for _ in range(2):
formdata = data_class1()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
for _ in range(3):
formdata = data_class2()
formdata.data = {'0': 'FOO BAZ'}
formdata.user_id = local_user.id
formdata.just_created()
formdata.jump_status('new')
formdata.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?category_slugs=category-1' % local_user.id))
assert len(resp.json['data']) == 2
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?category_slugs=category-2' % local_user.id))
assert len(resp.json['data']) == 3
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?category_slugs=unknown' % local_user.id))
assert len(resp.json['data']) == 0
resp = get_app(pub).get(sign_uri('/api/users/%s/forms?category_slugs=category-1,unknown' % local_user.id))
assert len(resp.json['data']) == 2
resp = get_app(pub).get(
sign_uri('/api/users/%s/forms?category_slugs=category-1,category-2' % local_user.id)
)
assert len(resp.json['data']) == 5
def test_user_forms_from_agent(pub, local_user):
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
@ -499,3 +562,65 @@ def test_user_drafts(pub, local_user):
assert resp.json == {'err': 1, 'err_desc': 'unknown NameID', 'data': []}
resp2 = get_app(pub).get(sign_uri('/api/user/drafts?&NameID=xxx'))
assert resp.json == resp2.json
def test_user_drafts_categories_filter(pub, local_user):
Category.wipe()
category1 = Category()
category1.name = 'Category 1'
category1.store()
category2 = Category()
category2.name = 'Category 2'
category2.store()
FormDef.wipe()
formdef1 = FormDef()
formdef1.name = 'test 1'
formdef1.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef1.category = category1
formdef1.store()
formdef2 = FormDef()
formdef2.name = 'test 2'
formdef2.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef2.category = category2
formdef2.store()
data_class1 = formdef1.data_class()
data_class1.wipe()
data_class2 = formdef2.data_class()
data_class2.wipe()
for _ in range(2):
formdata = data_class1()
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.status = 'draft'
formdata.store()
for _ in range(3):
formdata = data_class2()
formdata.data = {'0': 'FOO BAZ'}
formdata.user_id = local_user.id
formdata.status = 'draft'
formdata.store()
resp = get_app(pub).get(sign_uri('/api/users/%s/drafts' % local_user.id))
assert len(resp.json['data']) == 5
resp = get_app(pub).get(sign_uri('/api/users/%s/drafts?category_slugs=category-1' % local_user.id))
assert len(resp.json['data']) == 2
resp = get_app(pub).get(sign_uri('/api/users/%s/drafts?category_slugs=category-2' % local_user.id))
assert len(resp.json['data']) == 3
resp = get_app(pub).get(sign_uri('/api/users/%s/drafts?category_slugs=unknown' % local_user.id))
assert len(resp.json['data']) == 0
resp = get_app(pub).get(
sign_uri('/api/users/%s/drafts?category_slugs=category-1,unknown' % local_user.id)
)
assert len(resp.json['data']) == 2
resp = get_app(pub).get(
sign_uri('/api/users/%s/drafts?category_slugs=category-1,category-2' % local_user.id)
)
assert len(resp.json['data']) == 5

View File

@ -53,7 +53,7 @@ from .qommon.errors import (
UnknownNameIdAccessForbiddenError,
)
from .qommon.form import ComputedExpressionWidget
from .qommon.storage import Equal, NotEqual
from .qommon.storage import Contains, Equal, NotEqual
from .qommon.template import Template, TemplateError
@ -406,6 +406,12 @@ class ApiFormsDirectory(Directory):
)
)
category_slugs = (get_request().form.get('category_slugs') or '').split(',')
category_slugs = [c.strip() for c in category_slugs if c.strip()]
if category_slugs:
categories = Category.select([Contains('url_name', category_slugs)])
criterias.append(Contains('category_id', [c.id for c in categories]))
formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset)
if get_query_flag('ignore-roles'):
# When ignoring roles formdatas will be returned even if they are
@ -616,6 +622,8 @@ class ApiFormdefsDirectory(Directory):
formdefs = FormDef.select(order_by='name', ignore_errors=True, lightweight=True)
include_disabled = get_query_flag('include-disabled')
category_slugs = (get_request().form.get('category_slugs') or '').split(',')
category_slugs = [c.strip() for c in category_slugs if c.strip()]
if not include_disabled:
if backoffice_submission:
@ -625,6 +633,8 @@ class ApiFormdefsDirectory(Directory):
if self.category:
formdefs = [x for x in formdefs if str(x.category_id) == str(self.category.id)]
elif category_slugs:
formdefs = [x for x in formdefs if x.category and (x.category.url_name in category_slugs)]
charset = get_publisher().site_charset
@ -846,6 +856,11 @@ class ApiUserDirectory(Directory):
# early return, this avoids running a query against a missing SQL view.
return []
category_slugs = (get_request().form.get('category_slugs') or '').split(',')
category_slugs = [c.strip() for c in category_slugs if c.strip()]
if category_slugs:
categories = Category.select([Contains('url_name', category_slugs)])
if get_publisher().is_using_postgresql() and not get_request().form.get('full') == 'on':
from wcs import sql
@ -854,8 +869,11 @@ class ApiUserDirectory(Directory):
order_by = 'receipt_time'
if get_request().form.get('sort') == 'desc':
order_by = '-receipt_time'
criterias = [Equal('user_id', str(user.id))]
if category_slugs:
criterias.append(Contains('category_id', [c.id for c in categories]))
user_forms = sql.AnyFormData.select(
[Equal('user_id', str(user.id))],
criterias,
limit=misc.get_int_or_400(get_request().form.get('limit')),
offset=misc.get_int_or_400(get_request().form.get('offset')),
order_by=order_by,
@ -865,6 +883,8 @@ class ApiUserDirectory(Directory):
user_forms = []
for formdef in formdefs:
user_forms.extend(formdef.data_class().get_with_indexed_value('user_id', user.id))
if category_slugs:
user_forms = [f for f in user_forms if f.formdef.category_id in [c.id for c in categories]]
typed_none = time.gmtime(-(10 ** 10))
user_forms.sort(key=lambda x: x.receipt_time or typed_none)
if get_request().form.get('sort') == 'desc':