223 lines
5.9 KiB
Python
223 lines
5.9 KiB
Python
import hashlib
|
|
import csv
|
|
import logging
|
|
import re
|
|
import datetime
|
|
import uuid
|
|
|
|
logger = logging.getLogger('eudonet')
|
|
logger.propagate = False
|
|
handler = logging.StreamHandler()
|
|
handler.level = logging.INFO
|
|
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)s %(message)s'))
|
|
logger.addHandler(handler)
|
|
|
|
|
|
# Accepte_Doc
|
|
# Adr_Personnelle
|
|
# Adresse_Principale
|
|
# Annee_Inscription
|
|
# Annotations_particulières2
|
|
# Archivé
|
|
# Autorise_Photos
|
|
# Batiment_Residence
|
|
# CP
|
|
# Certificat_Medical
|
|
# Civilite
|
|
# Date_Naissance
|
|
# Email
|
|
# Habite_Pas_Adresse_Indiquee
|
|
# ID_Conjoint
|
|
# Lien_relationnel
|
|
# Lieu_Inscription
|
|
# Lieu_Naissance
|
|
# NO_A_Contacter
|
|
# NO_Voie
|
|
# Nom
|
|
# Nom_Conjoint
|
|
# Nom_JF
|
|
# Personne_A_Contacter
|
|
# Prenom
|
|
# Prenom_Conjoint
|
|
# Profil_Contact
|
|
# Tel_Portable
|
|
# Téléphone_fixe
|
|
# URL_Historique_actvites
|
|
# Ville
|
|
# ppid
|
|
|
|
expected_keys = set([
|
|
'ppid',
|
|
'Nom',
|
|
'Nom_JF',
|
|
'Prenom',
|
|
'Civilite',
|
|
'Date_Naissance',
|
|
'Lieu_Naissance',
|
|
'ID_Conjoint',
|
|
'Nom_Conjoint',
|
|
'Prenom_Conjoint',
|
|
'Tel_Portable',
|
|
'Autorise_Photos',
|
|
'Annee_Inscription',
|
|
'Lieu_Inscription',
|
|
'Accepte_Doc',
|
|
'Certificat_Medical',
|
|
'Personne_A_Contacter',
|
|
'NO_A_Contacter',
|
|
'Lien_relationnel',
|
|
'Annotations_particulières2',
|
|
'Profil_Contact',
|
|
'Archivé',
|
|
'Téléphone_fixe',
|
|
'NO_Voie',
|
|
'Batiment_Residence',
|
|
'CP',
|
|
'Ville',
|
|
'Email',
|
|
'Adr_Personnelle',
|
|
'Adresse_Principale',
|
|
'Habite_Pas_Adresse_Indiquee',
|
|
'URL_Historique_actvites',
|
|
])
|
|
|
|
salt = b'eudonet'
|
|
uuids = set()
|
|
|
|
|
|
def telephone(row, key):
|
|
mobile = row[key]
|
|
if mobile:
|
|
mobile = mobile.strip()
|
|
if mobile == 'NULL':
|
|
mobile = ''
|
|
else:
|
|
mobile = re.sub(r'[\s.-]', '', mobile).strip()
|
|
if not mobile.isascii() or not mobile.isdigit() or len(mobile) not in (9, 10):
|
|
logger.warning(f'line {row["line"]} ppid {row["ppid"]} : invalid {key} {row[key]}')
|
|
mobile = ''
|
|
if len(mobile) == 9:
|
|
mobile = '0' + mobile
|
|
|
|
return mobile
|
|
|
|
ppids = set()
|
|
|
|
|
|
def normalize(row, ppid_index):
|
|
row['source'] = 'eudonet'
|
|
|
|
# convert NULL to None
|
|
for key in row:
|
|
if row[key] == 'NULL':
|
|
row[key] = None
|
|
elif key != 'line':
|
|
row[key] = row[key].strip()
|
|
|
|
row['import'] = True
|
|
line = row['line']
|
|
|
|
assert row['ppid'], 'no ppid line %s' % line
|
|
assert row['Prenom'], 'no Prenom line %s' % line
|
|
assert row['Nom'], 'no Nom line %s' % line
|
|
assert row['ppid'] not in ppids, 'duplicate ppid line %s' % line
|
|
ppids.add(row['ppid'])
|
|
assert row['Civilite'] in (None, 'Monsieur', 'Madame')
|
|
|
|
# email
|
|
email = row['Email']
|
|
if email:
|
|
if not re.match(r'^[a-zA-Z0-9_.-]+@[a-zA-Z0-9.-]+$', email):
|
|
logger.warning(f'line {line} ppid {row["ppid"]} : invalid Email {row["Email"]}')
|
|
email = None
|
|
row['email'] = email
|
|
|
|
# Archivé
|
|
if row['Archivé'] == 'OUI':
|
|
logger.warning(f'line {line} ppid {row["ppid"]} : Archivé==OUI')
|
|
row['import'] = False
|
|
|
|
# UUID
|
|
guid = uuid.UUID(hashlib.md5(salt + row['ppid'].encode()).hexdigest()[:32]).hex
|
|
assert guid not in uuids, f'uuid duplicate {uuid}'
|
|
uuids.add(guid)
|
|
row['guid'] = guid
|
|
logger.debug('uuid %s', guid)
|
|
|
|
# Tel_Portable
|
|
row['tel_mobile'] = telephone(row, 'Tel_Portable')
|
|
row['tel_fixe'] = telephone(row, 'Téléphone_fixe')
|
|
# NO_A_Contacter cannot be fixed
|
|
row['tel_a_contacter'] = (row['NO_A_Contacter'] or '').strip() or None
|
|
|
|
# ID_Conjoint
|
|
id_conjoint = (row['ID_Conjoint'] or '').strip()
|
|
if id_conjoint:
|
|
id_conjoint = ppid_index.get(id_conjoint)
|
|
if not id_conjoint:
|
|
logger.warning(f'line {line} ppid {row["ppid"]} : unknown ID_Conjoint {row["ID_Conjoint"]}')
|
|
row['conjoint_index'] = id_conjoint
|
|
|
|
# Date_Naissance
|
|
birthdate = row['Date_Naissance']
|
|
if birthdate:
|
|
birthdate = birthdate.strip()
|
|
try:
|
|
birthdate = datetime.datetime.strptime(birthdate, '%d/%m/%Y').date()
|
|
except ValueError:
|
|
try:
|
|
birthdate = datetime.datetime.strptime(birthdate, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
logger.warning(f'line {row["line"]} ppid {row["ppid"]} : invalid Date_Naissance {row["Date_Naissance"]}')
|
|
row['birthdate'] = birthdate
|
|
|
|
# convert Accepte_Doc to three fields
|
|
accepte_doc = row['Accepte_Doc']
|
|
row['canaux_contact'] = []
|
|
row['non_envoie_brochure'] = False
|
|
row['adresse_erreur'] = False
|
|
if accepte_doc is None:
|
|
pass
|
|
elif accepte_doc == 'NON':
|
|
pass
|
|
elif accepte_doc == 'Brochure au conjoint':
|
|
row['canaux_contact'] = ['courrier']
|
|
row['non_envoie_brochure'] = True
|
|
elif accepte_doc == 'Par courrier':
|
|
row['canaux_contact'] = ['courrier']
|
|
elif accepte_doc == 'Par email':
|
|
row['canaux_contact'] = ['email']
|
|
elif accepte_doc == 'Erreur adresse':
|
|
row['canaux_contact'] = ['courrier']
|
|
else:
|
|
raise ValueError('invalid Accepte_Doc %r' % accepte_doc)
|
|
|
|
|
|
def load(filename, number_of_rows=None):
|
|
logger.info(f'Loading rows of {filename}')
|
|
|
|
with open(filename) as fd:
|
|
reader = csv.DictReader(fd)
|
|
|
|
if number_of_rows:
|
|
data = [row for i, row in zip(range(number_of_rows), reader)]
|
|
else:
|
|
data = list(reader)
|
|
|
|
ppid_index = {row['ppid'].strip(): i for i, row in enumerate(data)}
|
|
|
|
for i, row in enumerate(data):
|
|
assert set(row.keys()) == expected_keys, f'row {i+1} keys differ: {row.keys()} != {keys}'
|
|
|
|
error = False
|
|
for i, row in enumerate(data):
|
|
row['line'] = (i + 1)
|
|
logger.debug(f'Loading row {i + 1:05d}')
|
|
try:
|
|
normalize(row, ppid_index)
|
|
except ValueError:
|
|
error = True
|
|
assert not error
|
|
|
|
return reader._fieldnames, [row for row in data if row['import']]
|