misc-bdauvergne/ban/ban.py

140 lines
5.0 KiB
Python

import asyncio
import csv
import io
import subprocess
import asyncpg
import async_lru
dsn = ''
async def go():
conn = await asyncpg.connect(dsn='')
new = True
try:
await conn.execute('CREATE DATABASE ban')
except Exception:
new = False
conn = await asyncpg.connect(database='ban', timeout=600)
if new:
await conn.execute('''\
CREATE UNLOGGED TABLE communes(
id SERIAL PRIMARY KEY,
nom_commune TEXT NOT NULL,
code_postal VARCHAR(5),
code_insee VARCHAR(5),
CONSTRAINT unique_triple UNIQUE (nom_commune, code_postal, code_insee)
)''')
await conn.execute('''\
CREATE UNLOGGED TABLE rues(
id SERIAL PRIMARY KEY,
cle VARCHAR(25) UNIQUE NOT NULL,
nom_voie TEXT NOT NULL,
commune_id INTEGER NOT NULL REFERENCES communes(id)
)''')
# with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
# fd = io.TextIOWrapper(proc.stdout)
# reader = csv.DictReader(fd, delimiter=';')
# maxes = {}
# for row in reader:
# assert row['id']
# assert len(row['id']) <= 24
# assert row['numero']
# assert int(row['numero'])
# assert len(row['rep']) <= 6
# assert len(row['code_postal']) == 5
# assert len(row['code_insee']) == 5
# assert row['nom_commune']
# for key in row:
# maxes[key] = max(len(row[key]), maxes.get(key, 0))
# import pprint
# pprint.pprint(maxes)
print('Loading...')
@async_lru.alru_cache(maxsize=4096)
async def get_commune_id(nom_commune, code_insee, code_postal):
return (await conn.fetchrow(
'INSERT INTO communes (nom_commune, code_insee, code_postal) VALUES ($1, $2, $3) '
'ON CONFLICT ON CONSTRAINT unique_triple '
'DO UPDATE SET nom_commune = communes.nom_commune RETURNING (id)',
nom_commune, code_insee, code_postal))['id']
batch_size = 200000
with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc:
fd = io.TextIOWrapper(proc.stdout)
reader = csv.DictReader(fd, delimiter=';')
count = 0
batch = {}
async def inject(batch):
nonlocal count
values = {}
for row in batch.values():
cle = '_'.join(row['id'].split('_')[:2]) # unique id for street
nom_voie = row['nom_voie']
commune_id = await get_commune_id(row['nom_commune'], row['code_insee'], row['code_postal'])
if cle not in values or len(values[cle][1]) < len(nom_voie):
values[cle] = (cle, nom_voie, commune_id)
values = list(values.values())
await conn.execute('''\
CREATE TEMPORARY TABLE _data(
cle VARCHAR(25) UNIQUE NOT NULL,
nom_voie TEXT NOT NULL,
commune_id INTEGER NOT NULL
)''')
await conn.copy_records_to_table('_data', records=values)
await conn.execute('''\
INSERT INTO rues (cle, nom_voie, commune_id)
SELECT * FROM _data
ON CONFLICT (cle) DO UPDATE
SET nom_voie = excluded.nom_voie WHERE CHAR_LENGTH(rues.nom_voie) < CHAR_LENGTH(excluded.nom_voie)
''')
await conn.execute('DROP TABLE _data')
count += len(batch)
print('Count %010d\r' % count, end='', flush=True)
for i, row in enumerate(reader):
if row['id'] in batch and len(batch[row['id']]['nom_voie']) < len(row['nom_voie']):
batch[row['id']] = row
else:
batch[row['id']] = row
if len(batch) != batch_size:
continue
await inject(batch)
batch = {}
if batch:
await inject(batch)
# ON CONFLICT DO NOTHING
print('Build indexes...')
await conn.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm')
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gist')
await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gin')
try:
await conn.execute('DROP INDEX communes_nom_commune_idx')
except:
pass
try:
await conn.execute('DROP INDEX rues_nom_voie_idx')
except:
pass
await conn.execute('CREATE INDEX communes_nom_commune_idx ON communes USING gin (nom_commune gin_trgm_ops, id)')
await conn.execute('CREATE INDEX rues_nom_voie_idx ON rues USING gist (commune_id, nom_voie gist_trgm_ops, cle)')
# try:
# await conn.execute('''\
# CREATE MATERIALIZED VIEW communes AS (
# SELECT DISTINCT nom_commune, code_postal, code_insee
# FROM ban ORDER BY nom_commune
# )''')
# await conn.execute('''\
#CREATE INDEX voie_by_commune_idx ON ban
# USING gin (code_insee, code_postal, nom_voie gin_trgm_ops)''')
def test_aiopg():
loop = asyncio.get_event_loop()
loop.run_until_complete(go())