passerelle/passerelle/apps/base_adresse/models.py

251 lines
9.9 KiB
Python

import bz2
import json
import os
import urlparse
import unicodedata
from django.db import connection, models
from django.db.models import Q
from django.utils.http import urlencode
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone
from django.utils import six
from django.utils.six.moves.urllib.parse import urljoin
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
class BaseAdresse(BaseResource):
service_url = models.CharField(
max_length=128, blank=False,
default='https://api-adresse.data.gouv.fr/',
verbose_name=_('Service URL'),
help_text=_('Base Adresse Web Service URL'))
category = _('Geographic information system')
api_description = _("The API is a partial view of OpenStreetMap's Nominatim "
"own API; it currently doesn't support all parameters and "
"is limited to the JSON format.")
zipcode = models.CharField(
max_length=600,
blank=True,
verbose_name=_('Postal codes or county number to get streets, separated with commas'))
class Meta:
verbose_name = _('Base Adresse Web Service')
@endpoint(pattern='(?P<q>.+)?$',
description=_('Geocoding'),
parameters={
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'}
})
def search(self, request, q, zipcode='', lat=None, lon=None, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
if not q:
return []
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urljoin(path, 'search/')
query_args = {'q': q, 'limit': 1}
if zipcode:
query_args['postcode'] = zipcode
if lat and lon:
query_args['lat'] = lat
query_args['lon'] = lon
query = urlencode(query_args)
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
result_response = self.requests.get(url)
result = []
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
result.append({
'lon': str(feature['geometry']['coordinates'][0]),
'lat': str(feature['geometry']['coordinates'][1]),
'display_name': feature['properties']['label'],
})
break
return result
@endpoint(description=_('Reverse geocoding'),
parameters={
'lat': {'description': _('Latitude'), 'example_value': 48.833708},
'lon': {'description': _('Longitude'), 'example_value': 2.323349},
})
def reverse(self, request, lat, lon, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urljoin(path, 'reverse/')
query = urlencode({'lat': lat, 'lon': lon})
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
result_response = self.requests.get(url)
result = None
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
result = {}
result['lon'] = str(feature['geometry']['coordinates'][0])
result['lat'] = str(feature['geometry']['coordinates'][1])
result['address'] = {'country': 'France'}
for prop in feature['properties']:
if prop in ('city', 'postcode', 'citycode'):
result['address'][prop] = feature['properties'][prop]
elif prop == 'housenumber':
result['address']['house_number'] = feature['properties'][prop]
elif prop == 'label':
result['display_name'] = feature['properties'][prop]
elif prop == 'name':
house_number = feature['properties'].get('housenumber')
value = feature['properties'][prop]
if house_number and value.startswith(house_number):
value = value[len(house_number):].strip()
result['address']['road'] = value
return result
@endpoint(description=_('Streets from zipcode'),
parameters={
'id': {'description': _('Get exactly one street')},
'q': {'description': _("Street name")},
'zipcode': {'description': _('Zipcode')},
'page_limit': {'description': _('Maximum number of results to return'),
'example_value': 30},
'distinct': {'description': _('Remove duplicate streets')},
})
def streets(self, request, zipcode=None, q=None, id=None, distinct=True, page_limit=None):
result = []
if id is not None:
try:
id = int(id)
except ValueError:
return {'data': []}
streets = StreetModel.objects.filter(id=id)
else:
streets = StreetModel.objects.all()
if q:
unaccented_q = unicodedata.normalize('NFKD', q).encode('ascii', 'ignore').lower()
streets = streets.filter(unaccent_name__icontains=unaccented_q)
if zipcode:
streets = streets.filter(zipcode__startswith=zipcode)
if distinct and connection.vendor == 'postgresql':
# this only works with postgresql
streets = streets.order_by('name').distinct('name')
if page_limit:
streets = streets[:page_limit]
for street in streets:
result.append({'id': street.id,
'text': street.name,
'type': street.type,
'city': street.city,
'citycode': street.citycode,
'zipcode': street.zipcode})
return {'data': result}
def check_status(self):
if self.service_url == 'https://api-adresse.data.gouv.fr/':
result = self.search(None, '169 rue du chateau, paris')
if len(result) == 0:
raise Exception('no results')
def get_zipcodes(self):
return tuple([x.strip() for x in self.zipcode.split(',') if x.strip()])
def get_streets_queryset(self):
zipcodes = self.get_zipcodes()
criteria = Q(zipcode__startswith=zipcodes[0])
for zipcode in zipcodes[1:]:
criteria |= Q(zipcode__startswith=zipcode)
return StreetModel.objects.filter(criteria)
def update_streets_data(self):
if not self.get_zipcodes():
return
start_update = timezone.now()
zipcodes = self.get_zipcodes()
departments = set()
for zipcode in zipcodes:
if zipcode.startswith('97'):
departments.add(zipcode[:3])
elif zipcode.startswith('20'):
departments.add('2A')
departments.add('2B')
else:
departments.add(zipcode[:2])
for department in departments:
ban_file = self.requests.get(
'http://bano.openstreetmap.fr/BAN_odbl/BAN_odbl_{}-json.bz2'.format(department))
if ban_file.status_code != 200:
continue
line = _not_found = object()
for line in bz2.decompress(ban_file.content).splitlines():
street_info = json.loads(line)
if street_info['type'] == 'street' and street_info['postcode'].startswith(zipcodes):
if type(street_info['citycode']) is list:
street_info['citycode'] = six.text_type(street_info['citycode'][0])
if type(street_info['name']) is list:
street_info['name'] = six.text_type(street_info['name'][0])
street = StreetModel.objects.get_or_create(citycode=street_info['citycode'],
name=street_info['name'][:150])
street[0].city = street_info['city']
street[0].name = street_info['name'][:150]
street[0].zipcode = street_info['postcode']
street[0].type = street_info['type']
street[0].citycode = street_info['citycode']
street[0].save()
if line is _not_found:
raise Exception('bano file is empty')
self.get_streets_queryset().filter(last_update__lt=start_update).delete()
def hourly(self):
super(BaseAdresse, self).hourly()
if self.get_zipcodes() and not self.get_streets_queryset().exists():
# don't wait for daily job to grab streets
self.update_streets_data()
def daily(self):
super(BaseAdresse, self).daily()
self.update_streets_data()
class StreetModel(models.Model):
city = models.CharField(_('City'), max_length=100)
name = models.CharField(_('Street name'), max_length=150)
unaccent_name = models.CharField(_('Street name ascii char'), max_length=150, null=True)
zipcode = models.CharField(_('Postal code'), max_length=5)
type = models.CharField(_('Street type'), max_length=30)
citycode = models.CharField(_('City Code'), max_length=5)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
class Meta:
ordering = ['unaccent_name', 'name']
def __unicode__(self):
return self.name
def save(self, *args, **kwargs):
self.unaccent_name = unicodedata.normalize('NFKD', self.name).encode('ascii', 'ignore')
super(StreetModel, self).save(*args, **kwargs)