From 0d20cd3eb0c780eb63850bb26273e0f00b43f856 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sun, 22 Nov 2020 11:44:13 +0100 Subject: [PATCH] ban: chargeur communes/rues pour la ban --- .gitignore | 2 + ban/ban.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++ ban/tox.ini | 14 ++++++ 3 files changed, 155 insertions(+) create mode 100644 ban/ban.py create mode 100644 ban/tox.ini diff --git a/.gitignore b/.gitignore index 7ab14a6..2942f5d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.tox +ban/adresses-france.csv.gz cd06/config.py cd06/Makefile.config minimal-django/app.log diff --git a/ban/ban.py b/ban/ban.py new file mode 100644 index 0000000..b346c88 --- /dev/null +++ b/ban/ban.py @@ -0,0 +1,139 @@ + +import asyncio +import csv +import io +import subprocess + +import asyncpg +import async_lru + +dsn = '' + + +async def go(): + conn = await asyncpg.connect(dsn='') + new = True + try: + await conn.execute('CREATE DATABASE ban') + except Exception: + new = False + conn = await asyncpg.connect(database='ban', timeout=600) + if new: + await conn.execute('''\ + CREATE UNLOGGED TABLE communes( + id SERIAL PRIMARY KEY, + nom_commune TEXT NOT NULL, + code_postal VARCHAR(5), + code_insee VARCHAR(5), + CONSTRAINT unique_triple UNIQUE (nom_commune, code_postal, code_insee) + )''') + await conn.execute('''\ + CREATE UNLOGGED TABLE rues( + id SERIAL PRIMARY KEY, + cle VARCHAR(25) UNIQUE NOT NULL, + nom_voie TEXT NOT NULL, + commune_id INTEGER NOT NULL REFERENCES communes(id) + )''') +# with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc: +# fd = io.TextIOWrapper(proc.stdout) +# reader = csv.DictReader(fd, delimiter=';') +# maxes = {} +# for row in reader: +# assert row['id'] +# assert len(row['id']) <= 24 +# assert row['numero'] +# assert int(row['numero']) +# assert len(row['rep']) <= 6 +# assert len(row['code_postal']) == 5 +# assert len(row['code_insee']) == 5 +# assert row['nom_commune'] +# for key in row: +# maxes[key] = max(len(row[key]), maxes.get(key, 0)) +# import pprint +# pprint.pprint(maxes) + + print('Loading...') + + @async_lru.alru_cache(maxsize=4096) + async def get_commune_id(nom_commune, code_insee, code_postal): + return (await conn.fetchrow( + 'INSERT INTO communes (nom_commune, code_insee, code_postal) VALUES ($1, $2, $3) ' + 'ON CONFLICT ON CONSTRAINT unique_triple ' + 'DO UPDATE SET nom_commune = communes.nom_commune RETURNING (id)', + nom_commune, code_insee, code_postal))['id'] + + batch_size = 200000 + with subprocess.Popen(['gunzip', '-dc', 'adresses-france.csv.gz'], stdout=subprocess.PIPE) as proc: + fd = io.TextIOWrapper(proc.stdout) + reader = csv.DictReader(fd, delimiter=';') + count = 0 + batch = {} + + async def inject(batch): + nonlocal count + values = {} + for row in batch.values(): + cle = '_'.join(row['id'].split('_')[:2]) # unique id for street + nom_voie = row['nom_voie'] + commune_id = await get_commune_id(row['nom_commune'], row['code_insee'], row['code_postal']) + + if cle not in values or len(values[cle][1]) < len(nom_voie): + values[cle] = (cle, nom_voie, commune_id) + values = list(values.values()) + await conn.execute('''\ +CREATE TEMPORARY TABLE _data( + cle VARCHAR(25) UNIQUE NOT NULL, + nom_voie TEXT NOT NULL, + commune_id INTEGER NOT NULL + )''') + await conn.copy_records_to_table('_data', records=values) + await conn.execute('''\ +INSERT INTO rues (cle, nom_voie, commune_id) +SELECT * FROM _data +ON CONFLICT (cle) DO UPDATE +SET nom_voie = excluded.nom_voie WHERE CHAR_LENGTH(rues.nom_voie) < CHAR_LENGTH(excluded.nom_voie) +''') + await conn.execute('DROP TABLE _data') + count += len(batch) + print('Count %010d\r' % count, end='', flush=True) + + for i, row in enumerate(reader): + if row['id'] in batch and len(batch[row['id']]['nom_voie']) < len(row['nom_voie']): + batch[row['id']] = row + else: + batch[row['id']] = row + if len(batch) != batch_size: + continue + await inject(batch) + batch = {} + if batch: + await inject(batch) +# ON CONFLICT DO NOTHING + print('Build indexes...') + await conn.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') + await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gist') + await conn.execute('CREATE EXTENSION IF NOT EXISTS btree_gin') + try: + await conn.execute('DROP INDEX communes_nom_commune_idx') + except: + pass + try: + await conn.execute('DROP INDEX rues_nom_voie_idx') + except: + pass + await conn.execute('CREATE INDEX communes_nom_commune_idx ON communes USING gin (nom_commune gin_trgm_ops, id)') + await conn.execute('CREATE INDEX rues_nom_voie_idx ON rues USING gist (commune_id, nom_voie gist_trgm_ops, cle)') +# try: +# await conn.execute('''\ +# CREATE MATERIALIZED VIEW communes AS ( +# SELECT DISTINCT nom_commune, code_postal, code_insee +# FROM ban ORDER BY nom_commune +# )''') +# await conn.execute('''\ +#CREATE INDEX voie_by_commune_idx ON ban +# USING gin (code_insee, code_postal, nom_voie gin_trgm_ops)''') + + +def test_aiopg(): + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) diff --git a/ban/tox.ini b/ban/tox.ini new file mode 100644 index 0000000..c99f2b9 --- /dev/null +++ b/ban/tox.ini @@ -0,0 +1,14 @@ +[tox] +envlist = py3 +skipsdist = True + +[testenv] +whitelist_externals= + bash +deps = + asyncpg + async_lru + pytest +commands = + bash -ec "test -f adresses-france.csv.gz || wget https://adresse.data.gouv.fr/data/ban/adresses/latest/csv/adresses-france.csv.gz" + pytest --pdb -s ban.py