ban: chargeur communes/rues pour la ban

This commit is contained in:
Benjamin Dauvergne 2020-11-22 11:44:13 +01:00
parent 044ba3704f
commit 0d20cd3eb0
3 changed files with 155 additions and 0 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
.tox
ban/adresses-france.csv.gz
cd06/config.py
cd06/Makefile.config
minimal-django/app.log

139
ban/ban.py Normal file
View File

@ -0,0 +1,139 @@
import asyncio
import csv
import io
import subprocess
import asyncpg
import async_lru
dsn = ''
async def go():
conn = await asyncpg.connect(dsn='')
new = True
try:
await conn.execute('CREATE DATABASE ban')
except Exception:
new = False
conn = await asyncpg.connect(database='ban', timeout=600)
if new:
await conn.execute('''\
CREATE UNLOGGED TABLE communes(
id SERIAL PRIMARY KEY,
nom_commune TEXT NOT NULL,
code_postal VARCHAR(5),
code_insee VARCHAR(5),
CONSTRAINT unique_triple UNIQUE (nom_commune, code_postal, code_insee)
)''')
await conn.execute('''\
CREATE UNLOGGED TABLE rues(
id SERIAL PRIMARY KEY,
cle VARCHAR(25) UNIQUE NOT NULL,
nom_voie TEXT NOT NULL,
commune_id INTEGER NOT NULL REFERENCES communes(id)
)''')
# with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
# fd = io.TextIOWrapper(proc.stdout)
# reader = csv.DictReader(fd, delimiter=';')
# maxes = {}
# for row in reader:
# assert row['id']
# assert len(row['id']) <= 24
# assert row['numero']
# assert int(row['numero'])
# assert len(row['rep']) <= 6
# assert len(row['code_postal']) == 5
# assert len(row['code_insee']) == 5
# assert row['nom_commune']
# for key in row:
# maxes[key] = max(len(row[key]), maxes.get(key, 0))
# import pprint
# pprint.pprint(maxes)
print('Loading...')
@async_lru.alru_cache(maxsize=4096)
async def get_commune_id(nom_commune, code_insee, code_postal):
return (await conn.fetchrow(
'INSERT INTO communes (nom_commune, code_insee, code_postal) VALUES ($1, $2, $3) '
'ON CONFLICT ON CONSTRAINT unique_triple '
'DO UPDATE SET nom_commune = communes.nom_commune RETURNING (id)',
nom_commune, code_insee, code_postal))['id']
batch_size = 200000
with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
fd = io.TextIOWrapper(proc.stdout)
reader = csv.DictReader(fd, delimiter=';')
count = 0
batch = {}
async def inject(batch):
nonlocal count
values = {}
for row in batch.values():
cle = '_'.join(row['id'].split('_')[:2]) # unique id for street
nom_voie = row['nom_voie']
commune_id = await get_commune_id(row['nom_commune'], row['code_insee'], row['code_postal'])
if cle not in values or len(values[cle][1]) < len(nom_voie):
values[cle] = (cle, nom_voie, commune_id)
values = list(values.values())
await conn.execute('''\
CREATE TEMPORARY TABLE _data(
cle VARCHAR(25) UNIQUE NOT NULL,
nom_voie TEXT NOT NULL,
commune_id INTEGER NOT NULL
)''')
await conn.copy_records_to_table('_data', records=values)
await conn.execute('''\
INSERT INTO rues (cle, nom_voie, commune_id)
SELECT * FROM _data
ON CONFLICT (cle) DO UPDATE
SET nom_voie = excluded.nom_voie WHERE CHAR_LENGTH(rues.nom_voie) < CHAR_LENGTH(excluded.nom_voie)
''')
await conn.execute('DROP TABLE _data')
count += len(batch)
print('Count %010d\r' % count, end='', flush=True)
for i, row in enumerate(reader):
if row['id'] in batch and len(batch[row['id']]['nom_voie']) < len(row['nom_voie']):
batch[row['id']] = row
else:
batch[row['id']] = row
if len(batch) != batch_size:
continue
await inject(batch)
batch = {}
if batch:
await inject(batch)
# ON CONFLICT DO NOTHING
print('Build indexes...')
await conn.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm')
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gist')
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gin')
try:
await conn.execute('DROP INDEX communes_nom_commune_idx')
except:
pass
try:
await conn.execute('DROP INDEX rues_nom_voie_idx')
except:
pass
await conn.execute('CREATE INDEX communes_nom_commune_idx ON communes USING gin (nom_commune gin_trgm_ops, id)')
await conn.execute('CREATE INDEX rues_nom_voie_idx ON rues USING gist (commune_id, nom_voie gist_trgm_ops, cle)')
# try:
# await conn.execute('''\
# CREATE MATERIALIZED VIEW communes AS (
# SELECT DISTINCT nom_commune, code_postal, code_insee
# FROM ban ORDER BY nom_commune
# )''')
# await conn.execute('''\
#CREATE INDEX voie_by_commune_idx ON ban
# USING gin (code_insee, code_postal, nom_voie gin_trgm_ops)''')
def test_aiopg():
loop = asyncio.get_event_loop()
loop.run_until_complete(go())

14
ban/tox.ini Normal file
View File

@ -0,0 +1,14 @@
[tox]
envlist = py3
skipsdist = True
[testenv]
whitelist_externals=
bash
deps =
asyncpg
async_lru
pytest
commands =
bash -ec "test -f adresses-france.csv.gz || wget https://adresse.data.gouv.fr/data/ban/adresses/latest/csv/adresses-france.csv.gz"
pytest --pdb -s ban.py