api: add include-accessible flag to /api/user/forms (#56518)
This commit is contained in:
parent
522d605701
commit
917e961519
|
@ -550,6 +550,85 @@ def test_user_forms_from_agent(pub, local_user):
|
|||
get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user), status=403)
|
||||
|
||||
|
||||
def test_user_forms_include_accessible(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='Foo bar')
|
||||
role.store()
|
||||
|
||||
another_user = get_publisher().user_class()
|
||||
another_user.name = 'Another user'
|
||||
another_user.email = 'another@example.com'
|
||||
another_user.name_identifiers = ['AZERTY']
|
||||
another_user.store()
|
||||
|
||||
agent_user = get_publisher().user_class()
|
||||
agent_user.name = 'Agent'
|
||||
agent_user.email = 'agent@example.com'
|
||||
agent_user.name_identifiers = ['ABCDE']
|
||||
agent_user.roles = [role.id]
|
||||
agent_user.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.fields = [
|
||||
fields.StringField(id='0', label='foobar', varname='foobar'),
|
||||
fields.StringField(id='1', label='foobar2'),
|
||||
]
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
formdata1 = formdef.data_class()()
|
||||
formdata1.data = {'0': 'foo@localhost', '1': 'xxx'}
|
||||
formdata1.user_id = local_user.id
|
||||
formdata1.just_created()
|
||||
formdata1.jump_status('new')
|
||||
formdata1.store()
|
||||
|
||||
formdata2 = formdef.data_class()()
|
||||
formdata2.data = {'0': 'foo@localhost', '1': 'xxx'}
|
||||
formdata2.user_id = another_user.id
|
||||
formdata2.just_created()
|
||||
formdata2.jump_status('new')
|
||||
formdata2.store()
|
||||
|
||||
formdata3 = formdef.data_class()()
|
||||
formdata3.data = {'0': 'foo@localhost', '1': 'xxx'}
|
||||
formdata3.user_id = another_user.id
|
||||
formdata3.just_created()
|
||||
formdata3.jump_status('new')
|
||||
formdata3.workflow_roles = {'_receiver': ['_user:%s' % local_user.id]}
|
||||
formdata3.store()
|
||||
|
||||
formdata4 = formdef.data_class()()
|
||||
formdata4.data = {'0': 'foo@localhost', '1': 'xxx'}
|
||||
formdata4.user_id = agent_user.id
|
||||
formdata4.just_created()
|
||||
formdata4.jump_status('new')
|
||||
formdata4.store()
|
||||
|
||||
def get_ids(url):
|
||||
resp = get_app(pub).get(url)
|
||||
return {int(x['form_number_raw']) for x in resp.json['data']}
|
||||
|
||||
resp = get_ids(sign_uri('/api/user/forms', user=local_user))
|
||||
assert resp == {formdata1.id}
|
||||
|
||||
resp = get_ids(sign_uri('/api/user/forms?include-accessible=on', user=local_user))
|
||||
assert resp == {formdata1.id, formdata3.id}
|
||||
|
||||
# an agent gets the same results
|
||||
resp = get_ids(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user))
|
||||
assert resp == {formdata1.id}
|
||||
|
||||
resp = get_ids(sign_uri('/api/users/%s/forms?include-accessible=on' % local_user.id, user=agent_user))
|
||||
assert resp == {formdata1.id, formdata3.id}
|
||||
|
||||
|
||||
def test_user_drafts(pub, local_user):
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
|
|
26
wcs/api.py
26
wcs/api.py
|
@ -894,7 +894,18 @@ class ApiUserDirectory(Directory):
|
|||
order_by = 'receipt_time'
|
||||
if get_request().form.get('sort') == 'desc':
|
||||
order_by = '-receipt_time'
|
||||
criterias = [Equal('user_id', str(user.id))]
|
||||
if get_query_flag('include-accessible'):
|
||||
user_roles = user.get_roles()
|
||||
criterias = [
|
||||
Or(
|
||||
[
|
||||
Intersects('concerned_roles_array', user_roles),
|
||||
Equal('user_id', str(user.id)),
|
||||
]
|
||||
)
|
||||
]
|
||||
else:
|
||||
criterias = [Equal('user_id', str(user.id))]
|
||||
if category_slugs:
|
||||
criterias.append(Contains('category_id', [c.id for c in categories]))
|
||||
user_forms = sql.AnyFormData.select(
|
||||
|
@ -919,7 +930,13 @@ class ApiUserDirectory(Directory):
|
|||
# and put them back in order
|
||||
sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms]
|
||||
user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples]
|
||||
else:
|
||||
# prefetch evolutions to avoid individual loads when computing
|
||||
# formdata.get_visible_status().
|
||||
sql.AnyFormData.load_all_evolutions(user_forms)
|
||||
else:
|
||||
if get_query_flag('include-accessible'):
|
||||
return HttpResponseBadRequest('not supported')
|
||||
formdefs = FormDef.select()
|
||||
user_forms = []
|
||||
for formdef in formdefs:
|
||||
|
@ -971,13 +988,6 @@ class ApiUserDirectory(Directory):
|
|||
# ignore confidential forms
|
||||
forms = [x for x in forms if x.readable or not x.formdef.skip_from_360_view]
|
||||
|
||||
if get_publisher().is_using_postgresql() and not get_request().form.get('full') == 'on':
|
||||
# prefetch evolutions to avoid individual loads when computing
|
||||
# formdata.get_visible_status().
|
||||
from wcs import sql
|
||||
|
||||
sql.AnyFormData.load_all_evolutions(forms)
|
||||
|
||||
include_drafts = include_drafts or get_query_flag('include-drafts')
|
||||
result = []
|
||||
for form in forms:
|
||||
|
|
Loading…
Reference in New Issue