workflows: add action to geolocate a formdata (#10581)

This commit is contained in:
Frédéric Péters 2016-04-30 12:02:46 +02:00
parent fd4aa8de1b
commit e5826951f7
5 changed files with 418 additions and 1 deletions

70
help/fr/wf-geolocate.page Normal file
View File

@ -0,0 +1,70 @@
<page xmlns="http://projectmallard.org/1.0/"
type="topic" id="wf-geolocate" xml:lang="fr">
<info>
<link type="guide" xref="index#wf" />
<revision docversion="0.1" date="2016-05-01" status="draft"/>
<credit type="author">
<name>Frédéric Péters</name>
<email>fpeters@entrouvert.com</email>
</credit>
</info>
<title>Géolocalisation</title>
<p>
Une fois la géolocalisation activée pour un formulaire, le workflow associé
peut faire appel à l'action de géolocalisation pour attacher des coordonnées
géographiques à la demande.
</p>
<p>
Ces coordonnées peuvent être obtenues par géocodage à partir d'une adresse ou
en les extrayant d'un champ « Carte » ou des métadonnées attachées à une
photographie qui aurait été transférée via un champ de type « Fichier ».
</p>
<p>
Si les coordonnées peuvent être tirées de différentes sources, il est possible
de faire se succéder plusieurs appels à une action de géolocalisation, en les
paramétrant pour ne pas écraser des coordonnées précédemment acquises.
</p>
<section>
<title>Géocodage à partir d'une adresse</title>
<p>
Le paramétrage se fait en renseignant une chaîne de caractère produisant une
adresse, généralement en utilisant le mécanisme de substitution pour
concaténer plusieurs champs.
</p>
<example><code>[form_var_numero] [form_var_voie], [form_var_commune]</code></example>
</section>
<section>
<title>Extraction d'un champ « Carte »</title>
<p>
Le paramètre est une expression faisant référence à une variable tirée d'un
champ « Carte ».
</p>
<example><code>=form_var_carte</code></example>
</section>
<section>
<title>Extraction d'une photographie</title>
<p>
Le paramètre est une expression pointant une variable tirée d'un champ de
type « Fichier »; le fichier ainsi pointé doit être une image contenant des
métadonnées EXIF, renseignant la localisation de la prise de vue.
</p>
<example><code>=form_var_photo</code></example>
</section>
</page>

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

View File

@ -2,6 +2,9 @@ import datetime
import pytest
import shutil
import time
import urllib2
import mock
from quixote import cleanup, get_response
from wcs.qommon.http_request import HTTPRequest
@ -9,7 +12,7 @@ from qommon.form import *
from wcs.formdef import FormDef
from wcs import sessions
from wcs.fields import StringField, DateField
from wcs.fields import StringField, DateField, MapField, FileField
from wcs.roles import Role
from wcs.workflows import (Workflow, WorkflowStatusItem,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
@ -26,6 +29,7 @@ from wcs.wf.remove import RemoveWorkflowStatusItem
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.export_to_model import transform_to_pdf
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from utilities import (create_temporary_pub, MockSubstitutionVariables, emails,
http_requests, clean_temporary_pub, sms_mocking)
@ -1073,6 +1077,163 @@ def test_criticality(pub):
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
def test_geolocate_address(pub):
formdef = FormDef()
formdef.geolocations = {'base': 'bla'}
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '169 rue du chateau'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'address_string'
item.address_string = '[form_var_string], paris, france'
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert urllib2.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
# check for invalid ezt
item.address_string = '[if-any], paris, france'
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim server error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 500,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim returning an empty result set
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, json.dumps([]), None)
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_image(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'photo_variable'
item.photo_variable = '=form_var_file_raw'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == -1
assert int(formdata.geolocations['base']['lon']) == 6
# invalid expression
formdata.geolocations = None
item.photo_variable = '=1/0'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid type
formdata.geolocations = None
item.photo_variable = '="bla"'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid photo
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt')).read()])
formdata.data = {'3': upload}
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_map(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
# invalid data
formdata.geolocations = None
item.map_variable = '=form_var'
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_overwrite(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
formdata.data = {'2': '48.8337085;3.3233693'}
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
formdata.data = {'2': '48.8337085;4.3233693'}
item.overwrite = False
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
def test_transform_to_pdf():
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'))

185
wcs/wf/geolocate.py Normal file
View File

@ -0,0 +1,185 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import json
import urllib2
try:
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS
except ImportError:
Image = None
from quixote import get_publisher
from qommon import get_logger
from qommon.form import RadiobuttonsWidget, StringWidget, CheckboxWidget
from qommon.misc import http_get_page
from wcs.workflows import (WorkflowStatusItem, register_item_class,
template_on_formdata)
class GeolocateWorkflowStatusItem(WorkflowStatusItem):
description = N_('Geolocate')
key = 'geolocate'
method = 'address_string'
address_string = None
map_variable = None
photo_variable = None
overwrite = True
def get_parameters(self):
return ('method', 'address_string', 'map_variable', 'photo_variable', 'overwrite')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
methods = collections.OrderedDict(
[('address_string', _('Address String')),
('map_variable', _('Map Variable')),
('photo_variable', _('Photo Variable'))])
if Image is None:
del methods['photo_variable']
if 'method' in parameters:
form.add(RadiobuttonsWidget, '%smethod' % prefix,
title=_('Method'),
options=methods.items(),
value=self.method,
attrs={'data-dynamic-display-parent': 'true'})
if 'address_string' in parameters:
form.add(StringWidget, '%saddress_string' % prefix, size=50,
title=_('Address String'), value=self.address_string,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('address_string'),
})
if 'map_variable' in parameters:
form.add(StringWidget, '%smap_variable' % prefix, size=50,
title=_('Map Variable'), value=self.map_variable,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('map_variable'),
})
if 'photo_variable' in parameters:
form.add(StringWidget, '%sphoto_variable' % prefix, size=50,
title=_('Photo Variable'), value=self.photo_variable,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('photo_variable'),
})
if 'overwrite' in parameters:
form.add(CheckboxWidget, '%soverwrite' % prefix,
title=_('Overwrite existing geolocation'),
value=self.overwrite)
def perform(self, formdata):
if not self.method:
return
if not formdata.formdef.geolocations:
return
geolocation_point = formdata.formdef.geolocations.keys()[0]
if not formdata.geolocations:
formdata.geolocations = {}
if formdata.geolocations.get(geolocation_point) and not self.overwrite:
return
location = getattr(self, 'geolocate_' + self.method)(formdata)
if location:
formdata.geolocations[geolocation_point] = location
formdata.store()
def geolocate_address_string(self, formdata):
nominatim_url = get_publisher().get_site_option('nominatim_url')
if not nominatim_url:
nominatim_url = 'http://nominatim.openstreetmap.org'
try:
address = template_on_formdata(formdata, self.address_string)
except Exception, e:
get_logger().error('error in template for address string [%r]', e)
return
url = '%s/search?q=%s&format=json' % (nominatim_url, urllib2.quote(address))
if get_publisher().get_site_option('nominatim_key'):
url += '&key=' + get_publisher().get_site_option('nominatim_key')
response, status, data, auth_header = http_get_page(url)
if status != 200:
get_logger().error('error calling geocoding service [%s]', status)
return
data = json.loads(data)
if len(data) == 0:
get_logger().error('error finding location')
return
coords = data[0]
return {'lon': float(coords['lon']), 'lat': float(coords['lat'])}
def geolocate_map_variable(self, formdata):
value = self.compute(self.map_variable)
if not value:
return
try:
lat, lon = map(float, value.split(';'))
except Exception, e:
get_logger().error('error geolocating from map variable [%r]', e)
return
return {'lon': lon, 'lat': lat}
def geolocate_photo_variable(self, formdata):
if Image is None:
get_logger().error('error geolocating from file (missing PIL)')
return
value = self.compute(self.photo_variable)
if not hasattr(value, 'get_file_pointer'):
get_logger().error('error geolocating from photo, invalid variable')
return
try:
image = Image.open(value.get_file_pointer())
except IOError:
get_logger().error('error geolocating from photo, invalid file')
return
exif_data = image._getexif()
if exif_data:
gps_info = exif_data.get(0x8825)
if gps_info:
# lat_ref will be N/S, lon_ref wil l be E/W
# lat and lon will be degrees/minutes/seconds (value, denominator),
# like ((33, 1), (51, 1), (2191, 100))
lat, lon = gps_info[2], gps_info[4]
try:
lat_ref = gps_info[1]
except KeyError:
lat_ref = 'N'
try:
lon_ref = gps_info[3]
except KeyError:
lon_ref = 'E'
lat = (1.0*lat[0][0]/lat[0][1] + 1.0*lat[1][0]/lat[1][1]/60 + 1.0*lat[2][0]/lat[2][1]/3600)
lon = (1.0*lon[0][0]/lon[0][1] + 1.0*lon[1][0]/lon[1][1]/60 + 1.0*lon[2][0]/lon[2][1]/3600)
if lat_ref == 'S':
lat = -lat
if lon_ref == 'W':
lon = -lon
return {'lon': lon, 'lat': lat}
return
register_item_class(GeolocateWorkflowStatusItem)

View File

@ -2225,6 +2225,7 @@ def load_extra():
import wf.remove
import wf.roles
import wf.dispatch
import wf.geolocate
import wf.wscall
import wf.form
import wf.register_comment