api: add data and geojson views covering all formdatas (#14260)

This commit is contained in:
Frédéric Péters 2017-05-28 00:17:55 +02:00
parent 8e7543ed64
commit 5e594d4cb8
4 changed files with 251 additions and 25 deletions

View File

@ -358,6 +358,60 @@ n'est pas nécessaire de préciser l'identifiant d'un utilisateur.
</section>
<section id="global-data">
<title>Données de l'ensemble des formulaires</title>
<p>
De manière similaire à l'API de récupération de la liste des demandes d'un
formulaire, il est possible de récupérer l'ensemble des demandes de la
plateforme, peu importe leurs types.
</p>
<screen>
<output style="prompt">$ </output><input>curl -H "Accept: application/json" \
https://www.example.net/api/forms/</input>
</screen>
<code mime="application/json">
[
{
url: "https://www.example.net/inscriptions/1/",
last_update_time: "2015-03-26T23:08:45",
receipt_time: "2015-03-26T23:08:44",
id: 1
},
{
url: "https://www.example.net/inscriptions/3/",
last_update_time: "2015-03-27T12:11:21",
receipt_time: "2015-03-27T12:45:19",
id: 3
},
{
url: "https://www.example.net/signalement/1/",
last_update_time: "2015-03-25T14:14:21",
receipt_time: "2015-03-25T14:48:20",
id: 1
}
]
</code>
<p>
Des paramètres peuvent être envoyés dans la requête pour filtrer les résultats.
Il s'agit des mêmes paramètres que ceux du tableau global en backoffice.
Par exemple, pour avoir une liste limitée aux demandes terminées :
</p>
<screen>
<output style="prompt">$ </output><input>curl -H "Accept: application/json" \
https://www.example.net/api/forms/?status=done</input>
</screen>
<note><p>
Le paramètre <code>full</code> n'est pas pris en charge dans cette API; le
paramètre <code>anonymise</code> non plus, les données l'étant déjà.
</p></note>
</section>
<section id="geolocation">
<title>Données géolocalisées</title>
@ -400,6 +454,16 @@ De manière identique aux appels précédents, des filtres peuvent être passés
Les URL retournées pour les demandes pointent vers l'interface de gestion de celles-ci.
</p></note>
<p>
Il est également possible d'obtenir les informations géographiques de
l'ensemble des demandes :
</p>
<screen>
<output style="prompt">$ </output><input>curl -H "Accept: application/json" \
https://www.example.net/api/forms/geojson</input>
</screen>
</section>
<section id="tracking-code">

View File

@ -1445,6 +1445,112 @@ def test_api_geojson_formdata(pub, local_user):
formdef.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/geojson', user=local_user), status=404)
def test_api_global_geojson(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = []
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.geolocations = {'base': 'Location'}
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.geolocations = {'base': {'lat': 48, 'lon': 2}}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/geojson?status=done', user=local_user))
assert 'features' in resp.json
assert len(resp.json['features']) == 20
def test_api_global_listing(pub, local_user):
Role.wipe()
role = Role(name='test')
role.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_roles = {'_receiver': role.id}
formdef.fields = [
fields.StringField(id='0', label='foobar', varname='foobar'),
]
formdef.store()
data_class = formdef.data_class()
data_class.wipe()
formdef.store()
for i in range(30):
formdata = data_class()
date = time.strptime('2014-01-20', '%Y-%m-%d')
formdata.data = {'0': 'FOO BAR'}
formdata.user_id = local_user.id
formdata.just_created()
if i%3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 0
# add proper role to user
local_user.roles = [role.id]
local_user.store()
# check it gets the data
resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user))
assert len(resp.json['data']) == 10
# check with a filter
resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user))
assert len(resp.json['data']) == 20
@pytest.fixture
def ics_data(local_user):
Role.wipe()

View File

@ -37,6 +37,7 @@ import wcs.qommon.storage as st
from wcs.api_utils import is_url_signed, get_user_from_api_query_string
from backoffice.management import FormPage as BackofficeFormPage
from backoffice.management import ManagementDirectory
def posted_json_data_to_formdata_data(formdef, data):
# remap fields from varname to field id
@ -66,6 +67,34 @@ def posted_json_data_to_formdata_data(formdef, data):
return data
def get_formdata_dict(formdata, user, consider_status_visibility=True):
if consider_status_visibility:
status = formdata.get_visible_status(user=user)
if not status:
# skip hidden forms
return None
else:
status = formdata.get_status()
title = _('%(name)s #%(id)s (%(status)s)') % {
'name': formdata.formdef.name,
'id': formdata.get_display_id(),
'status': status.name,
}
d = {'title': title,
'name': formdata.formdef.name,
'url': formdata.get_url(),
'datetime': misc.strftime.strftime('%Y-%m-%d %H:%M:%S', formdata.receipt_time),
'status': status.name,
'status_css_class': status.extra_css_class,
'keywords': formdata.formdef.keywords_list,
}
d.update(formdata.get_substitution_variables(minimal=True))
if get_request().form.get('full') == 'on':
d.update(formdata.get_json_export_dict(include_files=False))
return d
class ApiFormdataPage(FormStatusPage):
_q_exports_orig = ['', 'download']
@ -146,6 +175,49 @@ class ApiFormPage(BackofficeFormPage):
class ApiFormsDirectory(Directory):
_q_exports = ['', 'geojson']
def check_access(self):
if not is_url_signed():
# grant access to admins, to ease debug
if not (get_request().user and get_request().user.is_admin):
raise AccessForbiddenError('user not authenticated')
def _q_index(self):
if not get_publisher().is_using_postgresql():
raise TraversalError()
self.check_access()
get_request().user = get_user_from_api_query_string() or get_request().user
from wcs import sql
management_directory = ManagementDirectory()
criterias = management_directory.get_global_listing_criterias()
limit = int(get_request().form.get('limit',
get_publisher().get_site_option('default-page-size') or 20))
offset = int(get_request().form.get('offset', 0))
order_by = get_request().form.get('order_by',
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
output = [get_formdata_dict(x, user=get_request().user, consider_status_visibility=False)
for x in sql.AnyFormData.select(
criterias, order_by=order_by, limit=limit, offset=offset)]
get_response().set_content_type('application/json')
return json.dumps({'data': output},
cls=misc.JSONEncoder,
encoding=get_publisher().site_charset)
def geojson(self):
if not get_publisher().is_using_postgresql():
raise TraversalError()
self.check_access()
get_request().user = get_user_from_api_query_string() or get_request().user
return ManagementDirectory().geojson()
def _q_lookup(self, component):
return ApiFormPage(component)
@ -481,31 +553,11 @@ class ApiUserDirectory(Directory):
for form in self.get_user_forms(user):
if form.is_draft():
continue
visible_status = form.get_visible_status(user=user)
# skip hidden forms
if not visible_status:
formdata_dict = get_formdata_dict(form, user)
if not formdata_dict:
# skip hidden forms
continue
name = form.formdef.name
id = form.get_display_id()
status = visible_status.name
title = _('%(name)s #%(id)s (%(status)s)') % {
'name': name,
'id': id,
'status': status
}
url = form.get_url()
d = {'title': title,
'name': form.formdef.name,
'url': url,
'datetime': misc.strftime.strftime('%Y-%m-%d %H:%M:%S', form.receipt_time),
'status': status,
'status_css_class': visible_status.extra_css_class,
'keywords': form.formdef.keywords_list,
}
d.update(form.get_substitution_variables(minimal=True))
if get_request().form.get('full') == 'on':
d.update(form.get_json_export_dict(include_files=False))
forms.append(d)
forms.append(formdata_dict)
return json.dumps(forms,
cls=misc.JSONEncoder,

View File

@ -30,6 +30,7 @@ from qommon import _
from qommon.storage import StorableObject, Intersects, Contains
import qommon.misc
from qommon import ezt
from qommon.evalutils import make_datetime
from qommon.substitution import Substitutions
from roles import Role
@ -557,7 +558,10 @@ class FormData(StorableObject):
'form_criticality_level': self.criticality_level,
})
if self.receipt_time:
d['form_receipt_datetime'] = datetime.datetime(*self.receipt_time[:6])
# always get receipt time as a datetime object, this handles
# both normal formdata (where receipt_time is a time.struct_time)
# and sql.AnyFormData where it's already a datetime object.
d['form_receipt_datetime'] = make_datetime(self.receipt_time)
d['form_status'] = self.get_status_label()