api: make /api/forms/ return all accessible forms (drafts in option) (#55337)
This commit is contained in:
parent
b7eded458b
commit
8450bc61fa
|
@ -1489,6 +1489,108 @@ def test_api_include_anonymised(pub, local_user):
|
|||
assert len(resp.json['data']) == 9
|
||||
|
||||
|
||||
def test_api_submitter_and_include_drafts(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
another_user = get_publisher().user_class()
|
||||
another_user.name = 'Antoher'
|
||||
another_user.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.fields = [
|
||||
fields.StringField(id='0', label='foobar', varname='foobar'),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
data_class = formdef.data_class()
|
||||
data_class.wipe()
|
||||
|
||||
# a submitted form
|
||||
formdata = data_class()
|
||||
formdata.data = {'0': 'FOO BAR'}
|
||||
formdata.user_id = local_user.id
|
||||
formdata.just_created()
|
||||
formdata.jump_status('new')
|
||||
formdata.store()
|
||||
|
||||
# a submitted form for another user
|
||||
formdata = data_class()
|
||||
formdata.data = {'0': 'FOO BAR'}
|
||||
formdata.user_id = another_user.id
|
||||
formdata.just_created()
|
||||
formdata.jump_status('new')
|
||||
formdata.store()
|
||||
|
||||
# a draft by user
|
||||
formdata = data_class()
|
||||
formdata.data = {'0': 'FOO BAR'}
|
||||
formdata.user_id = local_user.id
|
||||
formdata.status = 'draft'
|
||||
formdata.store()
|
||||
|
||||
# an anonymous draft
|
||||
formdata = data_class()
|
||||
formdata.data = {'0': 'FOO BAR'}
|
||||
formdata.user_id = None
|
||||
formdata.status = 'draft'
|
||||
formdata.store()
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/?status=all', user=local_user))
|
||||
assert len(resp.json['data']) == 0
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/?status=all&include-own=on', user=local_user))
|
||||
assert len(resp.json['data']) == 1
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri('/api/forms/?status=all&include-own=on&include-drafts=off', user=local_user)
|
||||
)
|
||||
assert len(resp.json['data']) == 1
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri('/api/forms/?status=all&include-own=on&include-drafts=on', user=local_user)
|
||||
)
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri('/api/forms/?status=done&include-own=on&include-drafts=on', user=local_user)
|
||||
)
|
||||
assert len(resp.json['data']) == 1
|
||||
|
||||
# make forms accessible by user, but add filter-user-uuid to ignore them
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
||||
local_user.roles = [role.id]
|
||||
local_user.store()
|
||||
|
||||
formdef.workflow_roles = {'_receiver': role.id}
|
||||
formdef.store()
|
||||
formdef.data_class().rebuild_security()
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri('/api/forms/?status=all&include-own=on&include-drafts=off', user=local_user)
|
||||
)
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri('/api/forms/?status=all&include-own=on&include-drafts=on', user=local_user)
|
||||
)
|
||||
assert len(resp.json['data']) == 3
|
||||
|
||||
resp = get_app(pub).get(
|
||||
sign_uri(
|
||||
'/api/forms/?status=all&include-own=on&include-drafts=on&filter-user-uuid=0123456789',
|
||||
user=local_user,
|
||||
)
|
||||
)
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
|
||||
def test_api_ics_formdata(pub, local_user, ics_data):
|
||||
role = pub.role_class.select()[0]
|
||||
|
||||
|
|
49
wcs/api.py
49
wcs/api.py
|
@ -389,6 +389,7 @@ class ApiFormsDirectory(Directory):
|
|||
|
||||
self.check_access()
|
||||
get_request()._user = get_user_from_api_query_string() or get_request().user
|
||||
ignore_roles_flag = get_query_flag('ignore-roles')
|
||||
|
||||
if get_request().form.get('full') == 'on':
|
||||
raise RequestError('no such parameter "full"')
|
||||
|
@ -401,10 +402,23 @@ class ApiFormsDirectory(Directory):
|
|||
from wcs import sql
|
||||
|
||||
management_directory = ManagementDirectory()
|
||||
criterias = management_directory.get_global_listing_criterias()
|
||||
if get_query_flag('ignore-roles'):
|
||||
criterias = management_directory.get_global_listing_criterias(
|
||||
include_own=bool(
|
||||
get_request()._user and not get_request()._user.is_api_user and get_query_flag('include-own')
|
||||
),
|
||||
include_drafts=get_query_flag('include-drafts'),
|
||||
)
|
||||
if ignore_roles_flag:
|
||||
roles_criterias = criterias
|
||||
criterias = management_directory.get_global_listing_criterias(ignore_user_roles=True)
|
||||
criterias = management_directory.get_global_listing_criterias(
|
||||
ignore_user_roles=True,
|
||||
include_own=bool(
|
||||
get_request()._user
|
||||
and not get_request()._user.is_api_user
|
||||
and get_query_flag('include-own')
|
||||
),
|
||||
include_drafts=get_query_flag('include-drafts'),
|
||||
)
|
||||
|
||||
if not get_query_flag('include-anonymised', default=True):
|
||||
criterias.append(st.Null('anonymised'))
|
||||
|
@ -420,7 +434,7 @@ class ApiFormsDirectory(Directory):
|
|||
)
|
||||
|
||||
formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset)
|
||||
if get_query_flag('ignore-roles'):
|
||||
if ignore_roles_flag:
|
||||
# When ignoring roles formdatas will be returned even if they are
|
||||
# not readable by the user, an additional attribute (readable) is
|
||||
# added to differentiate readable and non-readable formdatas.
|
||||
|
@ -433,21 +447,24 @@ class ApiFormsDirectory(Directory):
|
|||
roles_criterias, order_by=order_by, limit=limit, offset=offset
|
||||
)
|
||||
]
|
||||
output = []
|
||||
for formdata in formdatas:
|
||||
|
||||
output = []
|
||||
for formdata in formdatas:
|
||||
if ignore_roles_flag:
|
||||
readable = bool((formdata.formdef.id, formdata.id) in limited_formdatas)
|
||||
if not readable and formdata.formdef.skip_from_360_view:
|
||||
continue
|
||||
formdata_dict = get_formdata_dict(
|
||||
formdata, user=get_request().user, consider_status_visibility=False
|
||||
)
|
||||
formdata_dict['readable'] = readable
|
||||
output.append(formdata_dict)
|
||||
else:
|
||||
output = [
|
||||
get_formdata_dict(x, user=get_request().user, consider_status_visibility=False)
|
||||
for x in formdatas
|
||||
]
|
||||
else:
|
||||
# do not consider skip_from_360_view when not ignoring roles;
|
||||
# it could actually be useful when filtering on a specific user,
|
||||
# as it's available in /api/users/<id>/forms but this is
|
||||
# ignored for now.
|
||||
readable = True
|
||||
formdata_dict = get_formdata_dict(
|
||||
formdata, user=get_request().user, consider_status_visibility=False
|
||||
)
|
||||
formdata_dict['readable'] = readable
|
||||
output.append(formdata_dict)
|
||||
|
||||
get_response().set_content_type('application/json')
|
||||
return json.dumps({'data': output}, cls=misc.JSONEncoder)
|
||||
|
|
|
@ -68,6 +68,7 @@ from ..qommon.form import (
|
|||
)
|
||||
from ..qommon.misc import C_, ellipsize
|
||||
from ..qommon.storage import (
|
||||
And,
|
||||
Contains,
|
||||
Equal,
|
||||
FtsMatch,
|
||||
|
@ -849,14 +850,16 @@ class ManagementDirectory(Directory):
|
|||
r += htmltext('</ul>')
|
||||
return r.getvalue()
|
||||
|
||||
def get_global_listing_criterias(self, ignore_user_roles=False):
|
||||
def get_global_listing_criterias(self, ignore_user_roles=False, include_own=False, include_drafts=False):
|
||||
parsed_values = {}
|
||||
user_roles = [logged_users_role().id]
|
||||
if get_request().user:
|
||||
user_roles.extend(get_request().user.get_roles())
|
||||
criterias = get_global_criteria(get_request(), parsed_values)
|
||||
criterias = get_global_criteria(get_request(), parsed_values, include_drafts=include_drafts)
|
||||
|
||||
query_parameters = (get_request().form or {}).copy()
|
||||
query_parameters.pop('callback', None) # when using jsonp
|
||||
|
||||
status = query_parameters.get('status', 'waiting')
|
||||
if query_parameters.get('waiting') == 'yes':
|
||||
# compatibility with ?waiting=yes|no parameter, still used in
|
||||
|
@ -864,21 +867,57 @@ class ManagementDirectory(Directory):
|
|||
status = 'waiting'
|
||||
elif query_parameters.get('waiting') == 'no':
|
||||
status = 'open'
|
||||
|
||||
roles_array_column = 'concerned_roles_array'
|
||||
status_criterias = []
|
||||
if status == 'waiting':
|
||||
criterias.append(Equal('is_at_endpoint', False))
|
||||
if not ignore_user_roles:
|
||||
criterias.append(Intersects('actions_roles_array', user_roles))
|
||||
status_criterias.append(Equal('is_at_endpoint', False))
|
||||
roles_array_column = 'actions_roles_array'
|
||||
elif status == 'open':
|
||||
criterias.append(Equal('is_at_endpoint', False))
|
||||
if not ignore_user_roles:
|
||||
criterias.append(Intersects('concerned_roles_array', user_roles))
|
||||
status_criterias.append(Equal('is_at_endpoint', False))
|
||||
elif status == 'done':
|
||||
criterias.append(Equal('is_at_endpoint', True))
|
||||
if not ignore_user_roles:
|
||||
criterias.append(Intersects('concerned_roles_array', user_roles))
|
||||
elif status == 'all':
|
||||
if not ignore_user_roles:
|
||||
criterias.append(Intersects('concerned_roles_array', user_roles))
|
||||
status_criterias.append(Equal('is_at_endpoint', True))
|
||||
|
||||
if not ignore_user_roles:
|
||||
roles_criteria = Intersects(roles_array_column, user_roles)
|
||||
if include_own and get_request().user and not get_request().user.is_api_user:
|
||||
roles_criteria = Or(
|
||||
[
|
||||
roles_criteria,
|
||||
And(
|
||||
[
|
||||
Equal('user_id', str(get_request().user.id)),
|
||||
Intersects(roles_array_column, ['_submitter']),
|
||||
]
|
||||
),
|
||||
]
|
||||
)
|
||||
status_criterias.append(roles_criteria)
|
||||
|
||||
if include_drafts and get_request().user and not get_request().user.is_api_user:
|
||||
# include user drafts, without any endpoint / roles array criteria
|
||||
status_criterias = [
|
||||
Or(
|
||||
[
|
||||
And(status_criterias),
|
||||
And(
|
||||
[
|
||||
Equal('status', 'draft'),
|
||||
Equal('user_id', str(get_request().user.id)),
|
||||
]
|
||||
),
|
||||
]
|
||||
)
|
||||
]
|
||||
|
||||
criterias.extend(status_criterias)
|
||||
|
||||
name_id = query_parameters.get('filter-user-uuid')
|
||||
if name_id:
|
||||
nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
|
||||
if nameid_users:
|
||||
criterias.append(Equal('user_id', str(nameid_users[0].id)))
|
||||
|
||||
if get_request().form.get('submission_channel'):
|
||||
if get_request().form.get('submission_channel') == 'web':
|
||||
criterias.append(Null('submission_channel'))
|
||||
|
@ -3677,13 +3716,15 @@ $(document).ready(function(){
|
|||
return r.getvalue()
|
||||
|
||||
|
||||
def get_global_criteria(request, parsed_values=None):
|
||||
def get_global_criteria(request, parsed_values=None, include_drafts=False):
|
||||
"""
|
||||
Parses the request query string and returns a list of criterias suitable
|
||||
for select() usage. The parsed_values parameter can be given a dictionary,
|
||||
to be filled with the parsed values.
|
||||
"""
|
||||
criterias = [NotEqual('status', 'draft')]
|
||||
criterias = []
|
||||
if not include_drafts:
|
||||
criterias.append(NotEqual('status', 'draft'))
|
||||
try:
|
||||
period_start = misc.get_as_datetime(request.form.get('start')).timetuple()
|
||||
criterias.append(GreaterOrEqual('receipt_time', period_start))
|
||||
|
|
Loading…
Reference in New Issue