ban: chargeur communes/rues pour la ban
This commit is contained in:
parent
044ba3704f
commit
0d20cd3eb0
|
@ -1,3 +1,5 @@
|
|||
.tox
|
||||
ban/adresses-france.csv.gz
|
||||
cd06/config.py
|
||||
cd06/Makefile.config
|
||||
minimal-django/app.log
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
|
||||
import asyncio
|
||||
import csv
|
||||
import io
|
||||
import subprocess
|
||||
|
||||
import asyncpg
|
||||
import async_lru
|
||||
|
||||
dsn = ''
|
||||
|
||||
|
||||
async def go():
|
||||
conn = await asyncpg.connect(dsn='')
|
||||
new = True
|
||||
try:
|
||||
await conn.execute('CREATE DATABASE ban')
|
||||
except Exception:
|
||||
new = False
|
||||
conn = await asyncpg.connect(database='ban', timeout=600)
|
||||
if new:
|
||||
await conn.execute('''\
|
||||
CREATE UNLOGGED TABLE communes(
|
||||
id SERIAL PRIMARY KEY,
|
||||
nom_commune TEXT NOT NULL,
|
||||
code_postal VARCHAR(5),
|
||||
code_insee VARCHAR(5),
|
||||
CONSTRAINT unique_triple UNIQUE (nom_commune, code_postal, code_insee)
|
||||
)''')
|
||||
await conn.execute('''\
|
||||
CREATE UNLOGGED TABLE rues(
|
||||
id SERIAL PRIMARY KEY,
|
||||
cle VARCHAR(25) UNIQUE NOT NULL,
|
||||
nom_voie TEXT NOT NULL,
|
||||
commune_id INTEGER NOT NULL REFERENCES communes(id)
|
||||
)''')
|
||||
# with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
|
||||
# fd = io.TextIOWrapper(proc.stdout)
|
||||
# reader = csv.DictReader(fd, delimiter=';')
|
||||
# maxes = {}
|
||||
# for row in reader:
|
||||
# assert row['id']
|
||||
# assert len(row['id']) <= 24
|
||||
# assert row['numero']
|
||||
# assert int(row['numero'])
|
||||
# assert len(row['rep']) <= 6
|
||||
# assert len(row['code_postal']) == 5
|
||||
# assert len(row['code_insee']) == 5
|
||||
# assert row['nom_commune']
|
||||
# for key in row:
|
||||
# maxes[key] = max(len(row[key]), maxes.get(key, 0))
|
||||
# import pprint
|
||||
# pprint.pprint(maxes)
|
||||
|
||||
print('Loading...')
|
||||
|
||||
@async_lru.alru_cache(maxsize=4096)
|
||||
async def get_commune_id(nom_commune, code_insee, code_postal):
|
||||
return (await conn.fetchrow(
|
||||
'INSERT INTO communes (nom_commune, code_insee, code_postal) VALUES ($1, $2, $3) '
|
||||
'ON CONFLICT ON CONSTRAINT unique_triple '
|
||||
'DO UPDATE SET nom_commune = communes.nom_commune RETURNING (id)',
|
||||
nom_commune, code_insee, code_postal))['id']
|
||||
|
||||
batch_size = 200000
|
||||
with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
|
||||
fd = io.TextIOWrapper(proc.stdout)
|
||||
reader = csv.DictReader(fd, delimiter=';')
|
||||
count = 0
|
||||
batch = {}
|
||||
|
||||
async def inject(batch):
|
||||
nonlocal count
|
||||
values = {}
|
||||
for row in batch.values():
|
||||
cle = '_'.join(row['id'].split('_')[:2]) # unique id for street
|
||||
nom_voie = row['nom_voie']
|
||||
commune_id = await get_commune_id(row['nom_commune'], row['code_insee'], row['code_postal'])
|
||||
|
||||
if cle not in values or len(values[cle][1]) < len(nom_voie):
|
||||
values[cle] = (cle, nom_voie, commune_id)
|
||||
values = list(values.values())
|
||||
await conn.execute('''\
|
||||
CREATE TEMPORARY TABLE _data(
|
||||
cle VARCHAR(25) UNIQUE NOT NULL,
|
||||
nom_voie TEXT NOT NULL,
|
||||
commune_id INTEGER NOT NULL
|
||||
)''')
|
||||
await conn.copy_records_to_table('_data', records=values)
|
||||
await conn.execute('''\
|
||||
INSERT INTO rues (cle, nom_voie, commune_id)
|
||||
SELECT * FROM _data
|
||||
ON CONFLICT (cle) DO UPDATE
|
||||
SET nom_voie = excluded.nom_voie WHERE CHAR_LENGTH(rues.nom_voie) < CHAR_LENGTH(excluded.nom_voie)
|
||||
''')
|
||||
await conn.execute('DROP TABLE _data')
|
||||
count += len(batch)
|
||||
print('Count %010d\r' % count, end='', flush=True)
|
||||
|
||||
for i, row in enumerate(reader):
|
||||
if row['id'] in batch and len(batch[row['id']]['nom_voie']) < len(row['nom_voie']):
|
||||
batch[row['id']] = row
|
||||
else:
|
||||
batch[row['id']] = row
|
||||
if len(batch) != batch_size:
|
||||
continue
|
||||
await inject(batch)
|
||||
batch = {}
|
||||
if batch:
|
||||
await inject(batch)
|
||||
# ON CONFLICT DO NOTHING
|
||||
print('Build indexes...')
|
||||
await conn.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm')
|
||||
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gist')
|
||||
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gin')
|
||||
try:
|
||||
await conn.execute('DROP INDEX communes_nom_commune_idx')
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
await conn.execute('DROP INDEX rues_nom_voie_idx')
|
||||
except:
|
||||
pass
|
||||
await conn.execute('CREATE INDEX communes_nom_commune_idx ON communes USING gin (nom_commune gin_trgm_ops, id)')
|
||||
await conn.execute('CREATE INDEX rues_nom_voie_idx ON rues USING gist (commune_id, nom_voie gist_trgm_ops, cle)')
|
||||
# try:
|
||||
# await conn.execute('''\
|
||||
# CREATE MATERIALIZED VIEW communes AS (
|
||||
# SELECT DISTINCT nom_commune, code_postal, code_insee
|
||||
# FROM ban ORDER BY nom_commune
|
||||
# )''')
|
||||
# await conn.execute('''\
|
||||
#CREATE INDEX voie_by_commune_idx ON ban
|
||||
# USING gin (code_insee, code_postal, nom_voie gin_trgm_ops)''')
|
||||
|
||||
|
||||
def test_aiopg():
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(go())
|
|
@ -0,0 +1,14 @@
|
|||
[tox]
|
||||
envlist = py3
|
||||
skipsdist = True
|
||||
|
||||
[testenv]
|
||||
whitelist_externals=
|
||||
bash
|
||||
deps =
|
||||
asyncpg
|
||||
async_lru
|
||||
pytest
|
||||
commands =
|
||||
bash -ec "test -f adresses-france.csv.gz || wget https://adresse.data.gouv.fr/data/ban/adresses/latest/csv/adresses-france.csv.gz"
|
||||
pytest --pdb -s ban.py
|
Loading…
Reference in New Issue