passerelle/passerelle/apps/base_adresse/models.py

763 lines
29 KiB
Python

import datetime
import gzip
import itertools
import json
from io import StringIO
from urllib import parse as urlparse
from django.contrib.postgres.fields import JSONField
from django.db import connection, models
from django.db.models import Q
from django.utils import timezone
from django.utils.http import urlencode
from django.utils.translation import ugettext_lazy as _
from requests import RequestException
from passerelle.address.models import AddressResource
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import simplify
from passerelle.utils.jsonresponse import APIError
class BaseAdresse(AddressResource):
service_url = models.CharField(
max_length=128,
blank=False,
default='https://api-adresse.data.gouv.fr/',
verbose_name=_('Service URL'),
help_text=_('Base Adresse Web Service URL'),
)
api_geo_url = models.CharField(
max_length=128,
blank=False,
default='https://geo.api.gouv.fr/',
verbose_name=_('API Geo URL'),
help_text=_('Base Adresse API Geo URL'),
)
api_description = _(
'The geocoding endpoints are a partial view of '
'<a href="https://wiki.openstreetmap.org/wiki/Nominatim">Nominatim</a> own API; '
'it currently doesn\'t support all parameters and is limited to the JSON format. '
'The cities, departments and regions endpoints source data from French '
'<a href="https://api.gouv.fr/api/api-geo.html">API Geo</a>.'
)
zipcode = models.CharField(
max_length=600,
blank=True,
verbose_name=_('Postal codes or department number to get streets, separated with commas'),
)
latitude = models.FloatField(
null=True,
blank=True,
verbose_name=_('Latitude'),
help_text=_('Geographic priority for /addresses/ endpoint.'),
)
longitude = models.FloatField(
null=True,
blank=True,
verbose_name=_('Longitude'),
help_text=_('Geographic priority for /addresses/ endpoint.'),
)
class Meta:
verbose_name = _('Base Adresse Web Service')
requests_timeout = 1
requests_max_retries = {
'total': 5,
'backoff_factor': 1,
}
def sectorize(self, address):
ban_id = address.get('ban_id')
if isinstance(ban_id, str) and '_' in ban_id:
address['address']['street_id'] = '_'.join(ban_id.split('_', 2)[0:2])
super().sectorize(address)
def format_address_data(self, data):
result = {}
result['lon'] = str(data['geometry']['coordinates'][0])
result['lat'] = str(data['geometry']['coordinates'][1])
result['address'] = {'country': 'France'}
for prop, value in data['properties'].items():
if prop in ('city', 'postcode', 'citycode'):
result['address'][prop] = value
elif prop == 'housenumber':
result['address']['house_number'] = value
elif prop == 'label':
result['text'] = result['display_name'] = value
elif prop == 'name':
house_number = data['properties'].get('housenumber')
if house_number and value.startswith(house_number):
value = value[len(house_number) :].strip()
result['address']['road'] = value
elif prop == 'id':
result['ban_id'] = value
result['id'] = '%s~%s~%s' % (value, result['lat'], result['lon'])
else:
result.setdefault('extra', {})[prop] = value
result['id'] = '%s~%s' % (result['id'], result['text'])
self.sectorize(result)
return result
@endpoint(
pattern='(?P<q>.+)?$',
description=_('Addresses list'),
parameters={
'id': {'description': _('Address identifier')},
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'},
'page_limit': {'description': _('Maximum number of results to return. Must be lower than 20.')},
'zipcode': {'description': _('Zipcode'), 'example_value': '75014'},
'citycode': {'description': _('INSEE City code')},
'lat': {
'description': _(
'Prioritize results according to coordinates. "lon" parameter must also be present.'
)
},
'lon': {
'description': _(
'Prioritize results according to coordinates. "lat" parameter must also be present.'
)
},
},
)
def addresses(
self, request, id=None, q=None, zipcode='', citycode=None, lat=None, lon=None, page_limit=5
):
if id is not None:
return self.get_by_id(request, id=id, citycode=citycode)
if not q:
return {'data': []}
try:
if int(page_limit) > 20:
page_limit = 20
except ValueError:
page_limit = 5
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urlparse.urljoin(path, 'search/')
query_args = {'q': q, 'limit': page_limit}
if zipcode:
query_args['postcode'] = zipcode
if citycode:
query_args['citycode'] = citycode
if self.latitude and self.longitude or lat and lon:
query_args['lat'] = lat or self.latitude
query_args['lon'] = lon or self.longitude
query = urlencode(query_args)
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
try:
result_response = self.requests.get(url, timeout=5)
result_response.raise_for_status()
except RequestException as e:
raise APIError('failed to get %s: %s' % (url, e))
result = []
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
data = self.format_address_data(feature)
result.append(data)
address, created = self.addresscachemodel_set.get_or_create(
api_id=data['id'][:30], defaults={'data': data}
)
if not created:
address.update_timestamp()
return {'data': result}
def get_by_id(self, request, id, citycode=None):
try:
ban_id, lat, lon, q = id.split('~', 4)
except ValueError: # retrocompatibility with raw BAN id
ban_id = id
lat, lon, q = None, None, None
# Try cache
try:
address = self.addresscachemodel_set.get(api_id=id[:30])
except AddressCacheModel.DoesNotExist:
pass
else:
self.sectorize(address.data) # if sectors have been updated since caching
address.update_timestamp()
return {'data': [address.data]}
# Use search with label as q and lat/lon as geographic hint
if q and lat and lon:
results = self.addresses(request, q=q, lat=lat, lon=lon, citycode=citycode)['data']
for result in results: # match by id if possible
if result['ban_id'] == ban_id:
return {'data': [result]}
self.logger.error('get_by_id: id %s was not found', id)
return {'err': _('Address ID not found')}
@endpoint(
pattern='(?P<q>.+)?$',
description=_('Geocoding (Nominatim API)'),
parameters={
'q': {'description': _('Address'), 'example_value': '169 rue du chateau, paris'},
'zipcode': {'description': _('Zipcode')},
'citycode': {'description': _('INSEE City code')},
'lat': {
'description': _(
'Prioritize results according to coordinates. "lat" parameter must be present.'
)
},
'lon': {
'description': _(
'Prioritize results according to coordinates. "lon" parameter must be present.'
)
},
},
)
def search(self, request, q, zipcode='', citycode=None, lat=None, lon=None, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
result = self.addresses(
request, q=q, zipcode=zipcode, citycode=citycode, lat=lat, lon=lon, page_limit=1
)
return result['data']
@endpoint(
description=_('Reverse geocoding'),
parameters={
'lat': {'description': _('Latitude'), 'example_value': 48.833708},
'lon': {'description': _('Longitude'), 'example_value': 2.323349},
},
)
def reverse(self, request, lat, lon, **kwargs):
if kwargs.get('format', 'json') != 'json':
raise NotImplementedError()
scheme, netloc, path, params, query, fragment = urlparse.urlparse(self.service_url)
path = urlparse.urljoin(path, 'reverse/')
query = urlencode({'lat': lat, 'lon': lon})
url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
try:
result_response = self.requests.get(url, timeout=5)
result_response.raise_for_status()
except RequestException as e:
raise APIError('failed to get %s: %s' % (url, e))
result = None
for feature in result_response.json().get('features'):
if not feature['geometry']['type'] == 'Point':
continue # skip unknown
result = self.format_address_data(feature)
address, created = self.addresscachemodel_set.get_or_create(
api_id=result['id'][:30], defaults={'data': result}
)
if not created:
address.update_timestamp()
break
return result
@endpoint(
description=_('Streets from zipcode'),
parameters={
'id': {'description': _('Street identifier')},
'q': {'description': _("Street name")},
'zipcode': {'description': _('Zipcode')},
'citycode': {'description': _('INSEE City code')},
'page_limit': {'description': _('Maximum number of results to return'), 'example_value': 30},
'distinct': {'description': _('Remove duplicate streets')},
},
)
def streets(self, request, zipcode=None, citycode=None, q=None, id=None, distinct=True, page_limit=None):
result = []
if id is not None:
streets = self.streetmodel_set.filter(ban_id=id)
if not streets.exists(): # fallback to legacy id
try:
id = int(id)
except ValueError:
return {'data': []}
streets = self.streetmodel_set.filter(id=id)
else:
streets = self.streetmodel_set.all()
if q:
streets = streets.filter(unaccent_name__icontains=simplify(q))
if zipcode:
streets = streets.filter(zipcode__startswith=zipcode)
if citycode:
streets = streets.filter(citycode__startswith=citycode)
if distinct and connection.vendor == 'postgresql':
# this only works with postgresql
streets = streets.order_by('name').distinct('name')
if page_limit:
streets = streets[:page_limit]
for street in streets:
result.append(
{
'id': street.ban_id or str(street.id),
'text': street.name,
'type': street.type,
'city': street.city,
'citycode': street.citycode,
'zipcode': street.zipcode,
}
)
return {'data': result}
@endpoint(
description=_('Cities list'),
parameters={
'id': {
'description': _('Get exactly one city using its code and postal code separated with a dot'),
'example_value': '75056.75014',
},
'q': {'description': _("Search text in name or postal code"), 'example_value': 'Paris'},
'code': {
'description': _('INSEE code (or multiple codes separated with commas)'),
'example_value': '75056',
},
'region_code': {'description': _('Region code'), 'example_value': '11'},
'department_code': {'description': _('Department code'), 'example_value': '75'},
},
)
def cities(self, request, id=None, q=None, code=None, region_code=None, department_code=None):
cities = self.citymodel_set.all()
if id is not None:
try:
code, zipcode = id.split('.')
except ValueError:
raise APIError('Invalid id')
cities = cities.filter(code=code, zipcode=zipcode)
if q:
unaccented_q = simplify(q)
cities = cities.filter(
Q(unaccent_name__istartswith=unaccented_q) | Q(zipcode__istartswith=unaccented_q)
)
if code:
if ',' in code:
codes = [c.strip() for c in code.split(',')]
cities = cities.filter(code__in=codes)
else:
cities = cities.filter(code=code)
if region_code:
cities = cities.filter(region__code=region_code)
if department_code:
cities = cities.filter(department__code=department_code)
cities = cities.select_related('department', 'region')
return {'data': [city.to_json() for city in cities]}
@endpoint(
description=_('Departments list'),
parameters={
'id': {'description': _('Get exactly one department using its code'), 'example_value': '59'},
'q': {'description': _('Search text in name or code'), 'example_value': 'Nord'},
'region_code': {'description': _('Region code'), 'example_value': '32'},
},
)
def departments(self, request, id=None, q=None, region_code=None):
departments = self.departmentmodel_set.all()
if id is not None:
departments = departments.filter(code=id)
if q:
unaccented_q = simplify(q)
departments = departments.filter(
Q(unaccent_name__istartswith=unaccented_q) | Q(code__istartswith=unaccented_q)
)
if region_code:
departments = departments.filter(region__code=region_code)
departments = departments.select_related('region')
return {'data': [department.to_json() for department in departments]}
@endpoint(
description=_('Regions list'),
parameters={
'id': {'description': _('Get exactly one region using its code'), 'example_value': '32'},
'q': {'description': _('Search text in name or code'), 'example_value': 'Hauts-de-France'},
},
)
def regions(self, request, id=None, q=None):
regions = self.regionmodel_set.all()
if id is not None:
regions = regions.filter(code=id)
if q:
unaccented_q = simplify(q)
regions = regions.filter(
Q(unaccent_name__istartswith=unaccented_q) | Q(code__istartswith=unaccented_q)
)
return {'data': [region.to_json() for region in regions]}
def check_status(self):
if self.service_url == 'https://api-adresse.data.gouv.fr/':
result = self.search(None, '169 rue du chateau, paris')
if len(result) == 0:
raise Exception('no results')
def get_zipcodes(self):
return tuple(x.strip() for x in self.zipcode.split(',') if x.strip())
def get_streets_queryset(self):
zipcodes = self.get_zipcodes()
criteria = Q(zipcode__startswith=zipcodes[0])
for zipcode in zipcodes[1:]:
criteria |= Q(zipcode__startswith=zipcode)
return self.streetmodel_set.filter(criteria)
def cities_exist(self):
return self.citymodel_set.exists()
def update_streets_data(self):
if not self.get_zipcodes():
return
start_update = timezone.now()
zipcodes = self.get_zipcodes()
departments = set()
for zipcode in zipcodes:
if zipcode.startswith('97'):
departments.add(zipcode[:3])
elif zipcode.startswith('20'):
departments.add('2A')
departments.add('2B')
else:
departments.add(zipcode[:2])
for department in departments:
try:
ban_gz = self.requests.get(
'https://adresse.data.gouv.fr/data/ban/adresses/latest/addok/adresses-addok-{}.ndjson.gz'.format(
department
)
)
except RequestException:
continue
if ban_gz.status_code != 200:
continue
ban_file = StringIO(gzip.decompress(ban_gz.content).decode('utf-8'))
line = _not_found = object()
for line in ban_file:
street_info = json.loads(line)
if street_info['type'] != 'street':
continue
ban_id = street_info.get('id')
if not ban_id or not isinstance(ban_id, str) or '_' not in ban_id:
continue
for key in ('postcode', 'name', 'city'):
if isinstance(street_info[key], list):
street_info[key] = str(street_info[key][0])
if not street_info['postcode'] or not street_info['postcode'].startswith(zipcodes):
continue
citycode = ban_id.split('_', 1)[0]
if isinstance(street_info['citycode'], list):
if citycode not in street_info['citycode']:
continue
elif citycode != street_info['citycode']:
continue
self.streetmodel_set.update_or_create(
resource=self,
citycode=citycode,
name=street_info['name'][:150],
defaults={
'ban_id': ban_id,
'city': street_info['city'],
'zipcode': street_info['postcode'],
'type': street_info['type'],
},
)
if line is _not_found:
raise Exception('bano file is empty')
self.get_streets_queryset().filter(last_update__lt=start_update).delete()
def get_api_geo_endpoint(self, endpoint):
if not self.api_geo_url:
return
error = None
try:
response = self.requests.get(urlparse.urljoin(self.api_geo_url, endpoint))
except RequestException as e:
error = e
else:
if response.status_code != 200:
error = 'bad status code (%s)' % response.status_code
else:
try:
result = response.json()
except ValueError:
error = 'invalid json, got: %s' % response.text
if error:
self.logger.error('failed to update api geo data for endpoint %s: %s', endpoint, error)
return {}
if not result:
raise Exception('api geo returns empty json')
return result
def update_api_geo_data(self):
regions_json = {region['code']: region for region in self.get_api_geo_endpoint('regions')}
departements_json = {
departement['code']: departement for departement in self.get_api_geo_endpoint('departements')
}
cities_json = self.get_api_geo_endpoint('communes')
if not regions_json or not departements_json or not cities_json:
return
regions = {}
departements = {}
def get_region(code_region):
if code_region not in regions:
data = regions_json.get(code_region) or self.get_api_geo_endpoint(f'regions/{code_region}')
if not data:
return None
region, created = self.regionmodel_set.get_or_create(
code=data['code'], defaults={'name': data['nom']}
)
if not created and region.name != data['nom']:
region.name = data['nom']
region.save()
regions[code_region] = region
return regions[code_region]
def get_departement(code_departement):
if code_departement not in departements:
data = departements_json.get(code_departement) or self.get_api_geo_endpoint(
f'departements/{code_departement}'
)
if not data:
return None
region = get_region(data['codeRegion'])
if not region:
return None
departement, created = self.departmentmodel_set.get_or_create(
code=data['code'], defaults={'name': data['nom'], 'region': region}
)
if not created and departement.name != data['nom'] or departement.region != region:
departement.name = data['nom']
departement.region = region
departement.save()
departements[code_departement] = departement
return departements[code_departement]
for code_region in regions_json:
get_region(code_region)
for code_departement in departements_json:
get_departement(code_departement)
def grouper(it, size):
'''Split iterator in equal size chunk of `size` elements.'''
it = iter(it)
return iter(lambda: tuple(itertools.islice(it, size)), ())
city_pks = set()
for batch_data in grouper(cities_json, 1000):
batch_data = list(batch_data)
cities = {
(city.code, city.zipcode): city
for city in self.citymodel_set.filter(code__in=[x['code'] for x in batch_data])
}
for data in batch_data:
for zipcode in data['codesPostaux']:
defaults = {
'name': data['nom'],
'population': data.get('population', 0),
}
if data.get('codeDepartement'):
departement = get_departement(data['codeDepartement'])
if not departement:
continue
defaults['department'] = departement
if data.get('codeRegion'):
region = get_region(data['codeRegion'])
if not region:
continue
defaults['region'] = region
if (data['code'], zipcode) in cities:
city, created = cities[(data['code'], zipcode)], False
else:
city, created = self.citymodel_set.get_or_create(
code=data['code'], zipcode=zipcode, defaults=defaults
)
if not created and any(
getattr(city, key) != defaults.get(key)
for key in ['name', 'population', 'department', 'region']
):
for key in ['name', 'population', 'department', 'region']:
setattr(city, key, defaults.get(key))
city.save()
city_pks.add(city.pk)
self.regionmodel_set.exclude(code__in=regions.keys()).delete()
self.departmentmodel_set.exclude(code__in=departements.keys()).delete()
self.citymodel_set.exclude(pk__in=city_pks).delete()
def clean_addresses_cache(self):
old_addresses = self.addresscachemodel_set.filter(
timestamp__lt=timezone.now() - datetime.timedelta(hours=1)
)
old_addresses.delete()
def save(self, *args, **kwargs):
created = bool(not self.pk)
super().save(*args, **kwargs)
if created:
self.add_job('update_api_geo_data')
if self.get_zipcodes() and not self.get_streets_queryset().exists():
self.add_job('update_streets_data')
def hourly(self):
super().hourly()
self.clean_addresses_cache()
def daily(self):
super().daily()
self.update_streets_data()
self.update_api_geo_data()
class UnaccentNameMixin:
def save(self, *args, **kwargs):
self.unaccent_name = simplify(self.name)
super().save(*args, **kwargs)
class StreetModel(UnaccentNameMixin, models.Model):
ban_id = models.CharField(_('BAN Identifier'), max_length=32, null=True)
city = models.CharField(_('City'), max_length=150)
name = models.CharField(_('Street name'), max_length=150)
unaccent_name = models.CharField(_('Street name ascii char'), max_length=150, null=True)
zipcode = models.CharField(_('Postal code'), max_length=5)
type = models.CharField(_('Street type'), max_length=30)
citycode = models.CharField(_('City Code'), max_length=5)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
resource = models.ForeignKey(BaseAdresse, on_delete=models.CASCADE, verbose_name=_('BAN Connector'))
class Meta:
ordering = ['unaccent_name', 'name']
def __str__(self):
return self.name
class RegionModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('Region name'), max_length=150)
unaccent_name = models.CharField(_('Region name ascii char'), max_length=150, null=True)
code = models.CharField(_('Region code'), max_length=3)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
resource = models.ForeignKey(BaseAdresse, on_delete=models.CASCADE, verbose_name=_('BAN Connector'))
def to_json(self):
return {
'text': str(self),
'id': self.code,
'code': self.code,
'name': self.name,
}
class Meta:
ordering = ['code']
unique_together = ('resource', 'code')
def __str__(self):
return '%s %s' % (self.code, self.name)
class DepartmentModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('Department name'), max_length=100)
unaccent_name = models.CharField(_('Department name ascii char'), max_length=150, null=True)
code = models.CharField(_('Department code'), max_length=3)
region = models.ForeignKey(RegionModel, on_delete=models.CASCADE)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
resource = models.ForeignKey(BaseAdresse, on_delete=models.CASCADE, verbose_name=_('BAN Connector'))
def to_json(self):
return {
'text': str(self),
'id': self.code,
'code': self.code,
'name': self.name,
'region_code': self.region.code,
'region_name': self.region.name,
}
class Meta:
ordering = ['code']
unique_together = ('resource', 'code')
def __str__(self):
return '%s %s' % (self.code, self.name)
class CityModel(UnaccentNameMixin, models.Model):
name = models.CharField(_('City name'), max_length=150)
unaccent_name = models.CharField(_('City name ascii char'), max_length=150, null=True)
code = models.CharField(_('INSEE code'), max_length=5)
zipcode = models.CharField(_('Postal code'), max_length=5)
population = models.PositiveIntegerField(_('Population'))
department = models.ForeignKey(DepartmentModel, on_delete=models.CASCADE, blank=True, null=True)
region = models.ForeignKey(RegionModel, on_delete=models.CASCADE, blank=True, null=True)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
resource = models.ForeignKey(BaseAdresse, on_delete=models.CASCADE, verbose_name=_('BAN Connector'))
def to_json(self):
data = {
'text': str(self),
'id': '%s.%s' % (self.code, self.zipcode),
'code': self.code,
'name': self.name,
'zipcode': self.zipcode,
'population': self.population,
'department_code': self.department.code if self.department else None,
'department_name': self.department.name if self.department else None,
'region_code': self.region.code if self.region else None,
'region_name': self.region.name if self.region else None,
}
return data
class Meta:
ordering = ['-population', 'zipcode', 'unaccent_name', 'name']
unique_together = ('resource', 'code', 'zipcode')
def __str__(self):
return '%s %s' % (self.zipcode, self.name)
class AddressCacheModel(models.Model):
api_id = models.CharField(max_length=30)
data = JSONField()
timestamp = models.DateTimeField(auto_now=True)
resource = models.ForeignKey(BaseAdresse, on_delete=models.CASCADE, verbose_name=_('BAN Connector'))
def update_timestamp(self):
self.save()
class Meta:
unique_together = ('resource', 'api_id')