import hashlib
import csv
import logging
import re
import datetime
import uuid
logger = logging.getLogger('eudonet')
logger.propagate = False
handler = logging.StreamHandler()
handler.level = logging.WARNING
logging.Formatter('%(asctime)-15s %(levelname)s %(message)s'))
expected_keys = set([
salt = b'eudonet'
uuids = set()
def telephone(row, key):
mobile = row[key]
if mobile:
mobile = mobile.strip()
if mobile == 'NULL':
mobile = ''
mobile = re.sub(r'[\s.-]', '', mobile).strip()
if not mobile.isascii() or not mobile.isdigit() or len(mobile) not in (9, 10):
logger.warning(f'line {row["line"]} : invalid {key} {row[key]}')
mobile = ''
if len(mobile) == 9:
mobile = '0' + mobile
return mobile
ppids = set()
def normalize(row, ppid_index):
row['source'] = 'eudonet'
# convert NULL to None
for key in row:
if row[key] == 'NULL':
row[key] = None
elif key != 'line':
row[key] = row[key].strip()
row['import'] = True
line = row['line']
assert row['ppid'], 'no ppid line %s' % line
assert row['Prenom'], 'no Prenom line %s' % line
assert row['Nom'], 'no Nom line %s' % line
assert row['ppid'] not in ppids, 'duplicate ppid line %s' % line
assert row['Civilite'] in (None, 'Monsieur', 'Madame')
# email
email = row['Email']
if email:
if not re.match(r'^[a-zA-Z0-9_.-]+@[a-zA-Z0-9.-]+$', email):
logger.warning(f'line {line} : invalid Email {row["Email"]}')
email = None
row['email'] = email
# Archivé
if row['Archivé'] == 'OUI':
logger.warning('line {line} Archivé==OUI')
row['import'] = False
guid = uuid.UUID(hashlib.md5(salt + row['ppid'].encode()).hexdigest()[:32]).hex
assert guid not in uuids, f'uuid duplicate {uuid}'
row['guid'] = guid
logger.debug('uuid %s', guid)
# Tel_Portable
row['tel_mobile'] = telephone(row, 'Tel_Portable')
row['tel_fixe'] = telephone(row, 'Téléphone_fixe')
# NO_A_Contacter cannot be fixed
row['tel_a_contacter'] = (row['NO_A_Contacter'] or '').strip() or None
# ID_Conjoint
id_conjoint = (row['ID_Conjoint'] or '').strip()
if id_conjoint:
id_conjoint = ppid_index.get(id_conjoint)
if not id_conjoint:
logger.warning(f'line {line} : unknown ID_Conjoint {row["ID_Conjoint"]}')
row['conjoint_index'] = id_conjoint
# Date_Naissance
birthdate = row['Date_Naissance']
if birthdate:
birthdate = birthdate.strip()
birthdate = datetime.datetime.strptime(birthdate, '%d/%m/%Y').date()
except ValueError:
birthdate = datetime.datetime.strptime(birthdate, '%Y-%m-%d').date()
except ValueError:
logger.warning(f'line {row["line"]} : invalid Date_Naissance {row["Date_Naissance"]}')
row['birthdate'] = birthdate
# convert Accepte_Doc to three fields
accepte_doc = row['Accepte_Doc']
row['canaux_contact'] = []
row['non_envoie_brochure'] = False
row['adresse_erreur'] = False
if accepte_doc is None:
elif accepte_doc == 'NON':
elif accepte_doc == 'Brochure au conjoint':
row['canaux_contact'] = ['courrier']
row['non_envoie_brochure'] = True
elif accepte_doc == 'Par courrier':
row['canaux_contact'] = ['courrier']
elif accepte_doc == 'Par email':
row['canaux_contact'] = ['email']
elif accepte_doc == 'Erreur adresse':
row['canaux_contact'] = ['courrier']
raise ValueError('invalid Accepte_Doc %r' % accepte_doc)
def load(filename):
logger.info(f'Loading rows of {filename}')
with open(filename) as fd:
reader = csv.DictReader(fd)
data = list(reader)
ppid_index = {row['ppid'].strip(): i for i, row in enumerate(data)}
for i, row in enumerate(data):
assert set(row.keys()) == expected_keys, f'row {i+1} keys differ: {row.keys()} != {keys}'
error = False
for i, row in enumerate(data):
row['line'] = (i + 1)
logger.info(f'Loading row {i + 1:05d}')
normalize(row, ppid_index)
except ValueError:
error = True
assert not error
return reader._fieldnames, [row for row in data if row['import']]