229 lines
8.5 KiB
Python
229 lines
8.5 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2021 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
from requests import RequestException
|
|
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.db import models
|
|
from django.utils.http import urlencode
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.utils import timezone
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
|
|
|
|
class AddressCacheModel(models.Model):
|
|
api_id = models.CharField(max_length=32, unique=True)
|
|
data = JSONField()
|
|
timestamp = models.DateTimeField(auto_now=True)
|
|
|
|
def update_timestamp(self):
|
|
self.save()
|
|
|
|
|
|
class Photon(BaseResource):
|
|
service_url = models.CharField(
|
|
max_length=128,
|
|
blank=False,
|
|
default='https://photon.komoot.io/',
|
|
verbose_name=_('Service URL'),
|
|
help_text=_('Base Adresse Web Service URL'),
|
|
)
|
|
latitude = models.FloatField(
|
|
null=True,
|
|
blank=True,
|
|
verbose_name=_('Latitude'),
|
|
help_text=_('Geographic priority for /addresses/ endpoint.'),
|
|
)
|
|
longitude = models.FloatField(
|
|
null=True,
|
|
blank=True,
|
|
verbose_name=_('Longitude'),
|
|
help_text=_('Geographic priority for /addresses/ endpoint.'),
|
|
)
|
|
|
|
category = _('Geographic information system')
|
|
|
|
api_description = _(
|
|
'photon is an open source geocoder built for OpenStreetMap data. '
|
|
'It provides search-as-you-type and multilingual support.'
|
|
)
|
|
|
|
class Meta:
|
|
verbose_name = _('Photon')
|
|
|
|
@staticmethod
|
|
def format_address_data(data):
|
|
result = {}
|
|
result['lon'] = str(data['geometry']['coordinates'][0])
|
|
result['lat'] = str(data['geometry']['coordinates'][1])
|
|
result['address'] = {}
|
|
for prop, value in data['properties'].items():
|
|
if prop in ('country', 'city', 'postcode'):
|
|
result['address'][prop] = value
|
|
elif prop == 'housenumber':
|
|
result['address']['house_number'] = value
|
|
elif prop == 'street':
|
|
result['address']['road'] = value
|
|
|
|
result['text'] = ''
|
|
if result['address'].get('house_number'):
|
|
result['text'] += '%s, ' % result['address']['house_number']
|
|
if result['address'].get('road'):
|
|
result['text'] += '%s ' % result['address']['road']
|
|
if result['address'].get('postcode'):
|
|
result['text'] += '%s ' % result['address']['postcode']
|
|
if result['address'].get('city'):
|
|
result['text'] += '%s' % result['address']['city']
|
|
result['display_name'] = result['text']
|
|
result['id'] = data['properties']['osm_id']
|
|
return result
|
|
|
|
@endpoint(
|
|
pattern='(?P<q>.+)?$',
|
|
description=_('Addresses list'),
|
|
parameters={
|
|
'id': {'description': _('Address identifier')},
|
|
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'},
|
|
'page_limit': {
|
|
'description': _('Maximum number of results to return. Must be ' 'lower than 20.')
|
|
},
|
|
'zipcode': {'description': _('Zipcode'), 'example_value': '75014'},
|
|
'lat': {'description': _('Prioritize results according to coordinates.')},
|
|
'lon': {'description': _('Prioritize results according to coordinates.')},
|
|
},
|
|
)
|
|
def addresses(
|
|
self, request, id=None, q=None, zipcode=None, lat=None, lon=None, page_limit=5, lang='fr', bbox=None
|
|
):
|
|
if id is not None:
|
|
try:
|
|
address = AddressCacheModel.objects.get(api_id=id)
|
|
except AddressCacheModel.DoesNotExist:
|
|
return {'err': _('Address ID not found')}
|
|
address.update_timestamp()
|
|
return {'data': [address.data]}
|
|
|
|
if not q:
|
|
return {'data': []}
|
|
|
|
try:
|
|
if int(page_limit) > 20:
|
|
page_limit = 20
|
|
except ValueError:
|
|
page_limit = 5
|
|
|
|
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
|
|
path = urlparse.urljoin(path, 'api/')
|
|
query_args = {'q': q, 'limit': page_limit, 'lang': lang}
|
|
if self.latitude and self.longitude or lat and lon:
|
|
query_args['lat'] = lat or self.latitude
|
|
query_args['lon'] = lon or self.longitude
|
|
if bbox:
|
|
query_args['bbox'] = bbox
|
|
query = urlencode(query_args)
|
|
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
|
|
|
|
result_response = self.requests.get(url)
|
|
try:
|
|
result_response.raise_for_status()
|
|
except RequestException as e:
|
|
raise APIError('Bad response code from API: %s' % e)
|
|
|
|
result = []
|
|
|
|
for feature in result_response.json().get('features'):
|
|
if not feature['geometry']['type'] == 'Point':
|
|
continue # skip unknown
|
|
if zipcode and feature['properties'].get('postcode') != zipcode:
|
|
continue
|
|
data = self.format_address_data(feature)
|
|
result.append(data)
|
|
address, created = AddressCacheModel.objects.get_or_create(
|
|
api_id=data['id'], defaults={'data': data}
|
|
)
|
|
if not created:
|
|
address.update_timestamp()
|
|
|
|
return {'data': result}
|
|
|
|
@endpoint(
|
|
description=_('Geocoding (Nominatim API)'),
|
|
pattern='(?P<q>.+)?$',
|
|
parameters={
|
|
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'},
|
|
'zipcode': {'description': _('Zipcode')},
|
|
'lat': {'description': _('Prioritize results according to coordinates.')},
|
|
'lon': {'description': _('Prioritize results according to coordinates.')},
|
|
},
|
|
)
|
|
def search(self, request, q, zipcode=None, lat=None, lon=None, **kwargs):
|
|
if kwargs.get('format', 'json') != 'json':
|
|
raise NotImplementedError()
|
|
lang = kwargs.get('accept-language', 'fr')
|
|
bbox = kwargs.get('viewbox')
|
|
result = self.addresses(
|
|
request, q=q, zipcode=zipcode, lat=lat, lon=lon, page_limit=1, lang=lang, bbox=bbox
|
|
)
|
|
return result['data']
|
|
|
|
@endpoint(
|
|
description=_('Reverse geocoding (Nominatim API)'),
|
|
parameters={
|
|
'lat': {'description': _('Latitude'), 'example_value': 48.833708},
|
|
'lon': {'description': _('Longitude'), 'example_value': 2.323349},
|
|
},
|
|
)
|
|
def reverse(self, request, lat, lon, **kwargs):
|
|
if kwargs.get('format', 'json') != 'json':
|
|
raise NotImplementedError()
|
|
|
|
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
|
|
path = urlparse.urljoin(path, 'reverse/')
|
|
query = urlencode({'lat': lat, 'lon': lon})
|
|
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
|
|
|
|
result_response = self.requests.get(url)
|
|
try:
|
|
result_response.raise_for_status()
|
|
except RequestException as e:
|
|
raise APIError('Bad response code from API: %s' % e)
|
|
result = None
|
|
|
|
for feature in result_response.json().get('features'):
|
|
if not feature['geometry']['type'] == 'Point':
|
|
continue # skip unknown
|
|
result = self.format_address_data(feature)
|
|
break
|
|
return result
|
|
|
|
def check_status(self):
|
|
self.search(None, 'passerelle status check')
|
|
|
|
def clean_addresses_cache(self):
|
|
old_addresses = AddressCacheModel.objects.filter(
|
|
timestamp__lt=timezone.now() - datetime.timedelta(hours=1)
|
|
)
|
|
old_addresses.delete()
|
|
|
|
def hourly(self):
|
|
super().hourly()
|
|
self.clean_addresses_cache()
|