passerelle/passerelle/apps/base_adresse/models.py

492 lines
20 KiB
Python

import bz2
import json
import urlparse
import unicodedata
from requests import RequestException
from django.db import connection, models
from django.db.models import Q
from django.utils.http import urlencode
from django.utils.translation import ugettext_lazy as _
from django.utils import timezone
from django.utils import six
from django.utils.six.moves.urllib.parse import urljoin
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import simplify
from passerelle.utils.jsonresponse import APIError
class BaseAdresse(BaseResource):
service_url = models.CharField(
max_length=128, blank=False,
default='https://api-adresse.data.gouv.fr/',
verbose_name=_('Service URL'),
help_text=_('Base Adresse Web Service URL'))
api_geo_url = models.CharField(
max_length=128, blank=False,
default='https://geo.api.gouv.fr/',
verbose_name=_('API Geo URL'),
help_text=_('Base Adresse API Geo URL'))
category = _('Geographic information system')
api_description = _(
'The geocoding endpoints are a partial view of '
'<a href="https://wiki.openstreetmap.org/wiki/Nominatim">Nominatim</a> own API; '
'it currently doesn\'t support all parameters and is limited to the JSON format. '
'The cities, departments and regions endpoints source data from French '
'<a href="https://api.gouv.fr/api/api-geo.html">API Geo</a>.'
)
zipcode = models.CharField(
max_length=600,
blank=True,
verbose_name=_('Postal codes or department number to get streets, separated with commas'))
class Meta:
verbose_name = _('Base Adresse Web Service')
@endpoint(pattern='(?P<q>.+)?$',
description=_('Geocoding'),
parameters={
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'}
})
def search(self, request, q, zipcode='', lat=None, lon=None, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
if not q:
return []
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urljoin(path, 'search/')
query_args = {'q': q, 'limit': 1}
if zipcode:
query_args['postcode'] = zipcode
if lat and lon:
query_args['lat'] = lat
query_args['lon'] = lon
query = urlencode(query_args)
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
result_response = self.requests.get(url)
result = []
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
result.append({
'lon': str(feature['geometry']['coordinates'][0]),
'lat': str(feature['geometry']['coordinates'][1]),
'display_name': feature['properties']['label'],
})
break
return result
@endpoint(description=_('Reverse geocoding'),
parameters={
'lat': {'description': _('Latitude'), 'example_value': 48.833708},
'lon': {'description': _('Longitude'), 'example_value': 2.323349},
})
def reverse(self, request, lat, lon, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urljoin(path, 'reverse/')
query = urlencode({'lat': lat, 'lon': lon})
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
result_response = self.requests.get(url)
result = None
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
result = {}
result['lon'] = str(feature['geometry']['coordinates'][0])
result['lat'] = str(feature['geometry']['coordinates'][1])
result['address'] = {'country': 'France'}
for prop in feature['properties']:
if prop in ('city', 'postcode', 'citycode'):
result['address'][prop] = feature['properties'][prop]
elif prop == 'housenumber':
result['address']['house_number'] = feature['properties'][prop]
elif prop == 'label':
result['display_name'] = feature['properties'][prop]
elif prop == 'name':
house_number = feature['properties'].get('housenumber')
value = feature['properties'][prop]
if house_number and value.startswith(house_number):
value = value[len(house_number):].strip()
result['address']['road'] = value
return result
@endpoint(description=_('Streets from zipcode'),
parameters={
'id': {'description': _('Street identifier')},
'q': {'description': _("Street name")},
'zipcode': {'description': _('Zipcode')},
'page_limit': {'description': _('Maximum number of results to return'),
'example_value': 30},
'distinct': {'description': _('Remove duplicate streets')},
})
def streets(self, request, zipcode=None, q=None, id=None, distinct=True, page_limit=None):
result = []
if id is not None:
try:
id = int(id)
except ValueError:
return {'data': []}
streets = StreetModel.objects.filter(id=id)
else:
streets = StreetModel.objects.all()
if q:
streets = streets.filter(unaccent_name__icontains=simplify(q))
if zipcode:
streets = streets.filter(zipcode__startswith=zipcode)
if distinct and connection.vendor == 'postgresql':
# this only works with postgresql
streets = streets.order_by('name').distinct('name')
if page_limit:
streets = streets[:page_limit]
for street in streets:
result.append({'id': street.id,
'text': street.name,
'type': street.type,
'city': street.city,
'citycode': street.citycode,
'zipcode': street.zipcode})
return {'data': result}
@endpoint(description=_('Cities list'),
parameters={
'id': {'description': _('Get exactly one city using its code and postal code '
'separated with a dot'),
'example_value': '75056.75014'},
'q': {'description': _("Search text in name or postal code"),
'example_value': 'Paris'},
'code': {'description': _('INSEE code'), 'example_value': '75056'},
'region_code': {'description': _('Region code'), 'example_value': '11'},
'department_code': {'description': _('Department code'), 'example_value': '75'},
})
def cities(self, request, id=None, q=None, code=None, region_code=None,
department_code=None):
cities = CityModel.objects.all()
if id is not None:
try:
code, zipcode = id.split('.')
except ValueError:
raise APIError('Invalid id')
cities = cities.filter(code=code, zipcode=zipcode)
if q:
unaccented_q = simplify(q)
cities = cities.filter(Q(unaccent_name__istartswith=unaccented_q) |
Q(zipcode__istartswith=unaccented_q))
if code:
cities = cities.filter(code=code)
if region_code:
cities = cities.filter(region__code=region_code)
if department_code:
cities = cities.filter(department__code=department_code)
cities = cities.select_related('department', 'region')
return {'data': [city.to_json() for city in cities]}
@endpoint(description=_('Departments list'),
parameters={
'id': {'description': _('Get exactly one department using its code'),
'example_value': '59'},
'q': {'description': _('Search text in name or code'), 'example_value': 'Nord'},
'region_code': {'description': _('Region code'), 'example_value': '32'},
})
def departments(self, request, id=None, q=None, region_code=None):
departments = DepartmentModel.objects.all()
if id is not None:
departments = departments.filter(code=id)
if q:
unaccented_q = simplify(q)
departments = departments.filter(Q(unaccent_name__istartswith=unaccented_q) |
Q(code__istartswith=unaccented_q))
if region_code:
departments = departments.filter(region__code=region_code)
departments = departments.select_related('region')
return {'data': [department.to_json() for department in departments]}
@endpoint(description=_('Regions list'),
parameters={
'id': {'description': _('Get exactly one region using its code'),
'example_value': '32'},
'q': {'description': _('Search text in name or code'),
'example_value': 'Hauts-de-France'},
})
def regions(self, request, id=None, q=None):
regions = RegionModel.objects.all()
if id is not None:
regions = regions.filter(code=id)
if q:
unaccented_q = simplify(q)
regions = regions.filter(Q(unaccent_name__istartswith=unaccented_q) |
Q(code__istartswith=unaccented_q))
return {'data': [region.to_json() for region in regions]}
def check_status(self):
if self.service_url == 'https://api-adresse.data.gouv.fr/':
result = self.search(None, '169 rue du chateau, paris')
if len(result) == 0:
raise Exception('no results')
def get_zipcodes(self):
return tuple([x.strip() for x in self.zipcode.split(',') if x.strip()])
def get_streets_queryset(self):
zipcodes = self.get_zipcodes()
criteria = Q(zipcode__startswith=zipcodes[0])
for zipcode in zipcodes[1:]:
criteria |= Q(zipcode__startswith=zipcode)
return StreetModel.objects.filter(criteria)
def cities_exist(self):
return CityModel.objects.exists()
def update_streets_data(self):
if not self.get_zipcodes():
return
start_update = timezone.now()
zipcodes = self.get_zipcodes()
departments = set()
for zipcode in zipcodes:
if zipcode.startswith('97'):
departments.add(zipcode[:3])
elif zipcode.startswith('20'):
departments.add('2A')
departments.add('2B')
else:
departments.add(zipcode[:2])
for department in departments:
ban_file = self.requests.get(
'http://bano.openstreetmap.fr/BAN_odbl/BAN_odbl_{}-json.bz2'.format(department))
if ban_file.status_code != 200:
continue
line = _not_found = object()
for line in bz2.decompress(ban_file.content).splitlines():
street_info = json.loads(line)
if street_info['type'] == 'street' and street_info['postcode'].startswith(zipcodes):
if type(street_info['citycode']) is list:
street_info['citycode'] = six.text_type(street_info['citycode'][0])
if type(street_info['name']) is list:
street_info['name'] = six.text_type(street_info['name'][0])
street = StreetModel.objects.get_or_create(citycode=street_info['citycode'],
name=street_info['name'][:150])
street[0].city = street_info['city']
street[0].name = street_info['name'][:150]
street[0].zipcode = street_info['postcode']
street[0].type = street_info['type']
street[0].citycode = street_info['citycode']
street[0].save()
if line is _not_found:
raise Exception('bano file is empty')
self.get_streets_queryset().filter(last_update__lt=start_update).delete()
def get_api_geo_endpoint(self, endpoint):
if not self.api_geo_url:
return
error = None
try:
response = self.requests.get(urljoin(self.api_geo_url, endpoint))
except RequestException as e:
error = e
else:
if response.status_code != 200:
error = 'bad status code (%s)' % response.status_code
else:
try:
result = response.json()
except ValueError:
error = 'invalid json, got: %s' % response.text
if error:
self.logger.error('failed to update api geo data for endpoint %s: %s',
endpoint, error)
return
if not result:
raise Exception('api geo returns empty json')
return result
def update_api_geo_data(self):
regions_json = self.get_api_geo_endpoint('regions')
departments_json = self.get_api_geo_endpoint('departements')
cities_json = self.get_api_geo_endpoint('communes')
if not (regions_json and departments_json and cities_json):
return
start_update = timezone.now()
for data in regions_json:
defaults = {
'name': data['nom'],
}
RegionModel.objects.update_or_create(code=data['code'], defaults=defaults)
RegionModel.objects.filter(last_update__lt=start_update).delete()
for data in departments_json:
defaults = {
'name': data['nom'],
'region': RegionModel.objects.get(code=data['codeRegion']),
}
DepartmentModel.objects.update_or_create(code=data['code'], defaults=defaults)
DepartmentModel.objects.filter(last_update__lt=start_update).delete()
for data in cities_json:
for zipcode in data['codesPostaux']:
defaults = {
'name': data['nom'],
'population': data.get('population', 0),
}
if data.get('codeDepartement'):
defaults['department'] = DepartmentModel.objects.get(code=data['codeDepartement'])
if data.get('codeRegion'):
defaults['region'] = RegionModel.objects.get(code=data['codeRegion'])
CityModel.objects.update_or_create(
code=data['code'], zipcode=zipcode, defaults=defaults)
CityModel.objects.filter(last_update__lt=start_update).delete()
def hourly(self):
super(BaseAdresse, self).hourly()
# don't wait for daily job to grab data
if self.get_zipcodes() and not self.get_streets_queryset().exists():
self.update_streets_data()
if not CityModel.objects.exists():
self.update_api_geo_data()
def daily(self):
super(BaseAdresse, self).daily()
self.update_streets_data()
self.update_api_geo_data()
class UnaccentNameMixin(object):
def save(self, *args, **kwargs):
self.unaccent_name = unicodedata.normalize('NFKD', self.name).encode('ascii', 'ignore').lower()
super(UnaccentNameMixin, self).save(*args, **kwargs)
class StreetModel(UnaccentNameMixin, models.Model):
city = models.CharField(_('City'), max_length=150)
name = models.CharField(_('Street name'), max_length=150)
unaccent_name = models.CharField(_('Street name ascii char'), max_length=150, null=True)
zipcode = models.CharField(_('Postal code'), max_length=5)
type = models.CharField(_('Street type'), max_length=30)
citycode = models.CharField(_('City Code'), max_length=5)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
class Meta:
ordering = ['unaccent_name', 'name']
def __unicode__(self):
return self.name
@six.python_2_unicode_compatible
class RegionModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('Region name'), max_length=150)
unaccent_name = models.CharField(_('Region name ascii char'), max_length=150, null=True)
code = models.CharField(_('Region code'), max_length=2, unique=True)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
def to_json(self):
return {
'text': str(self),
'id': self.code,
'code': self.code,
'name': self.name,
}
class Meta:
ordering = ['code']
def __str__(self):
return '%s %s' % (self.code, self.name)
@six.python_2_unicode_compatible
class DepartmentModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('Department name'), max_length=100)
unaccent_name = models.CharField(_('Department name ascii char'), max_length=150, null=True)
code = models.CharField(_('Department code'), max_length=3, unique=True)
region = models.ForeignKey(RegionModel, on_delete=models.CASCADE)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
def to_json(self):
return {
'text': str(self),
'id': self.code,
'code': self.code,
'name': self.name,
'region_code': self.region.code,
'region_name': self.region.name,
}
class Meta:
ordering = ['code']
def __str__(self):
return '%s %s' % (self.code, self.name)
@six.python_2_unicode_compatible
class CityModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('City name'), max_length=150)
unaccent_name = models.CharField(_('City name ascii char'), max_length=150, null=True)
code = models.CharField(_('INSEE code'), max_length=5)
zipcode = models.CharField(_('Postal code'), max_length=5)
population = models.PositiveIntegerField(_('Population'))
department = models.ForeignKey(DepartmentModel, on_delete=models.CASCADE, blank=True, null=True)
region = models.ForeignKey(RegionModel, on_delete=models.CASCADE, blank=True, null=True)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
def to_json(self):
data = {
'text': str(self),
'id': '%s.%s' % (self.code, self.zipcode),
'code': self.code,
'name': self.name,
'zipcode': self.zipcode,
'population': self.population,
'department_code': self.department.code if self.department else None,
'department_name': self.department.name if self.department else None,
'region_code': self.region.code if self.region else None,
'region_name': self.region.name if self.region else None,
}
return data
class Meta:
ordering = ['-population', 'zipcode', 'unaccent_name', 'name']
unique_together = ('code', 'zipcode')
def __str__(self):
return '%s %s' % (self.zipcode, self.name)