base_adresse: store BAN identifier in streets (#56035)

This commit is contained in:
Thomas NOËL 2021-08-06 21:13:09 +02:00 committed by Thomas NOEL
parent 74895168a8
commit 28576589a8
4 changed files with 72 additions and 28 deletions

View File

@ -0,0 +1,18 @@
# Generated by Django 2.2.19 on 2021-09-09 09:53
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('base_adresse', '0025_baseadresse_sectors'),
]
operations = [
migrations.AddField(
model_name='streetmodel',
name='ban_id',
field=models.CharField(blank=True, max_length=32, verbose_name='BAN Identifier'),
),
]

View File

@ -268,11 +268,13 @@ class BaseAdresse(AddressResource):
def streets(self, request, zipcode=None, citycode=None, q=None, id=None, distinct=True, page_limit=None):
result = []
if id is not None:
try:
id = int(id)
except ValueError:
return {'data': []}
streets = self.streetmodel_set.filter(id=id)
streets = self.streetmodel_set.filter(ban_id=id)
if not streets.exists(): # fallback to legacy id
try:
id = int(id)
except ValueError:
return {'data': []}
streets = self.streetmodel_set.filter(id=id)
else:
streets = self.streetmodel_set.all()
if q:
@ -294,7 +296,7 @@ class BaseAdresse(AddressResource):
for street in streets:
result.append(
{
'id': str(street.id),
'id': street.ban_id or str(street.id),
'text': street.name,
'type': street.type,
'city': street.city,
@ -450,22 +452,33 @@ class BaseAdresse(AddressResource):
line = _not_found = object()
for line in ban_file:
street_info = json_loads(line)
if isinstance(street_info['postcode'], list):
street_info['postcode'] = six.text_type(street_info['postcode'][0])
if street_info['type'] == 'street' and street_info['postcode'].startswith(zipcodes):
for key in ('citycode', 'name', 'city'):
if isinstance(street_info[key], list):
street_info[key] = six.text_type(street_info[key][0])
self.streetmodel_set.update_or_create(
resource=self,
citycode=street_info['citycode'],
name=street_info['name'][:150],
defaults={
'city': street_info['city'],
'zipcode': street_info['postcode'],
'type': street_info['type'],
},
)
if street_info['type'] != 'street':
continue
ban_id = street_info.get('id')
if not ban_id or not isinstance(ban_id, str) or '_' not in ban_id:
continue
for key in ('postcode', 'name', 'city'):
if isinstance(street_info[key], list):
street_info[key] = six.text_type(street_info[key][0])
if not street_info['postcode'].startswith(zipcodes):
continue
citycode = ban_id.split('_', 1)[0]
if isinstance(street_info['citycode'], list):
if citycode not in street_info['citycode']:
continue
elif citycode != street_info['citycode']:
continue
self.streetmodel_set.update_or_create(
resource=self,
citycode=citycode,
name=street_info['name'][:150],
defaults={
'ban_id': ban_id,
'city': street_info['city'],
'zipcode': street_info['postcode'],
'type': street_info['type'],
},
)
if line is _not_found:
raise Exception('bano file is empty')
@ -563,6 +576,7 @@ class UnaccentNameMixin(object):
@six.python_2_unicode_compatible
class StreetModel(UnaccentNameMixin, models.Model):
ban_id = models.CharField(_('BAN Identifier'), max_length=32, blank=True)
city = models.CharField(_('City'), max_length=150)
name = models.CharField(_('Street name'), max_length=150)
unaccent_name = models.CharField(_('Street name ascii char'), max_length=150, null=True)

Binary file not shown.

View File

@ -116,6 +116,7 @@ def base_adresse_coordinates(db):
@pytest.fixture
def street(db):
return StreetModel.objects.create(
ban_id='73001_0000',
city=u'Chambéry',
name=u'Une rüê très äccentuéè',
zipcode=u'73000',
@ -343,13 +344,14 @@ def test_base_adresse_streets_unaccent(app, base_adresse, street):
assert result['text'] == street.name
assert result['citycode'] == street.citycode
assert result['zipcode'] == street.zipcode
assert result['id'] == str(street.id)
assert result['id'] == str(street.ban_id)
def test_base_adresse_streets_get_by_id(app, base_adresse, street):
for i in range(10):
# create additional streets
StreetModel.objects.create(
other_street = StreetModel.objects.create(
ban_id='%d_000T' % (73001 + i),
city=u'Chambéry',
name=u'Une rue différente',
zipcode=str(73001 + i),
@ -361,14 +363,20 @@ def test_base_adresse_streets_get_by_id(app, base_adresse, street):
resp = app.get('/base-adresse/%s/streets?q=une rue tres acc' % base_adresse.slug)
assert 'data' in resp.json
result = resp.json['data'][0]
street_id = result['id']
assert result['id'] == '73001_0000' # it's the "street" fixture
resp = app.get('/base-adresse/%s/streets?id=%s' % (base_adresse.slug, street_id))
resp = app.get('/base-adresse/%s/streets?id=73001_0000' % base_adresse.slug)
assert len(resp.json['data']) == 1
result2 = resp.json['data'][0]
assert result2['text'] == result['text']
# non integer id.
# get by legacy id
resp = app.get('/base-adresse/%s/streets?id=%d' % (base_adresse.slug, other_street.id))
assert len(resp.json['data']) == 1
result3 = resp.json['data'][0]
assert result3['text'] == other_street.name
# non existing and non integer id
resp = app.get('/base-adresse/%s/streets?id=%s' % (base_adresse.slug, 'XXX'))
assert len(resp.json['data']) == 0
@ -431,12 +439,16 @@ def test_base_adresse_command_update(mocked_get, db, base_adresse):
)
streets = StreetModel.objects.all()
assert len(streets) == 3
street = StreetModel.objects.order_by('id').first()
streets = streets.filter(ban_id='73001_0004')
assert streets.count() == 1
street = streets.first()
assert street.name == 'Chemin de la Vie, LA GRANGE DU TRIEU'
assert street.zipcode == '73610'
assert street.type == 'street'
assert street.city == 'Aiguebelette-le-Lac'
assert street.citycode == '73001'
assert street.ban_id == '73001_0004'
# check a new call downloads again
call_command('cron', 'daily')
assert mocked_get.call_count == 2