cd06/senior: add script to load senior cards

This commit is contained in:
Benjamin Dauvergne 2020-09-17 16:33:29 +02:00
parent 37489d412e
commit 5d93fe6716
3 changed files with 247 additions and 12 deletions

View File

@ -32,6 +32,8 @@ logger = loader.logger
keys, data = loader.load(sys.argv[1])
logger.handlers[0].level = logging.INFO
with atomic():
senior_ou, created = OU.objects.update_or_create(slug='senior', defaults={'name': 'Sénior'})

231
cd06/senior/load-in-wcs.py Normal file
View File

@ -0,0 +1,231 @@
import sys
import loader
from quixote import get_publisher
from wcs.carddef import CardDef
import qommon.storage as st
logger = loader.logger
pub = get_publisher()
# work-around non initialization of substitutions by runscript
pub.substitutions.reset()
pub.substitutions.feed(pub)
for extra_source in pub.extra_sources:
pub.substitutions.feed(extra_source(pub, None))
User = pub.user_class
user_fields = {field.varname: field for field in User.get_fields()}
card_def = CardDef.get_by_urlname('senior')
SeniorData = card_def.data_class()
fields = {field.varname: field for field in card_def.fields if field.varname}
# adresse_erreur
# annee_cm
# archive
# canaux_contact
# ci
# cm
# conjoint_senior
# ea
# fiche_senior_signee
# lieu_inscription
# memo
# non_envoi_brochure
# okphoto
# particularites
# pc_lien
# pc_nom_prenom
# pc_telephone
# rib
# url_historique
keys, data = loader.load(sys.argv[1])
# guid index
guids = {row['guid']: row for row in data}
ppids = {row['ppid']: row for row in data}
# Create/find users
logger.info('Creating users...')
users = User.select([st.Intersects('name_identifiers', list(guids))])
users_by_guid = {user.name_identifiers[0]: user for user in users}
# Verify some invariants
assert len(users_by_guid) == len(users)
assert all(len(user.name_identifiers) == 1 for user in users)
logger.info('Found %d users.', len(users_by_guid))
created = 0
updated = 0
for row in data:
guid = row['guid']
user = users_by_guid.get(guid, User())
if not user.id:
user.form_data = {}
user.name_identifiers = [guid]
user.name = 'Senior %s' % guid
old_email = user.email
old_data = user.form_data.copy()
old_name = user.name
birthdate = row['birthdate']
if birthdate:
user.form_data[str(user_fields['birthdate'].id)] = row['birthdate'].timetuple()
user.form_data[str(user_fields['first_name'].id)] = row['Prenom']
user.form_data[str(user_fields['last_name'].id)] = row['Nom']
user.form_data[str(user_fields['phone'].id)] = row['tel_fixe'] or ''
user.form_data[str(user_fields['mobile'].id)] = row['tel_mobile'] or ''
user.email = row['email'] or ''
user.name = '%s %s' % (row['Prenom'], row['Nom'])
if not user.id:
user.store()
created += 1
elif old_data != user.form_data or old_name != user.name or old_email != user.email:
user.store()
updated += 1
logger.info('Created %d users.', created)
logger.info('Updated %d users.', updated)
users = User.select([st.Intersects('name_identifiers', list(guids))])
users_by_guid = {user.name_identifiers[0]: user for user in users}
users_by_id = {str(user.id): user for user in users}
assert len(users_by_guid) == len(data)
# Create fiches
logger.info('Creating fiches...')
seniors = SeniorData.select([st.Contains('user_id', [str(id) for id in users_by_id])])
senior_by_id = {str(senior.id): senior for senior in seniors}
senior_by_guid = {users_by_id[senior.user_id].name_identifiers[0]: senior for senior in seniors}
assert len(seniors) == len(senior_by_guid)
logger.info('Found %d seniors.', len(seniors))
lieux_d_accueil = fields['lieu_inscription'].get_extended_options()
lieux_d_accueil_by_text = {x['text'].replace('-', ' ').lower(): x for x in lieux_d_accueil}
created = 0
updated = 0
for row in data:
guid = row['guid']
senior = senior_by_guid.get(guid, SeniorData())
if not senior.id:
senior.user_id = str(users_by_guid[guid].id)
senior.data = {}
else:
old_data = senior.data.copy()
# Autorise_Photos -> okphoto
field = fields['okphoto']
photo = row['Autorise_Photos']
if photo == 'OUI':
photo = 'Oui'
else:
photo = 'Non'
senior.data[field.id] = photo
senior.data[field.id + '_display'] = photo
# Lieux d'accueil
field = fields['lieu_inscription']
lieu = row['Lieu_Inscription']
if lieu:
lieu = lieu.replace('-', ' ').lower().replace('msd', 'mds')
lieu = lieux_d_accueil_by_text[lieu]
senior.data[field.id] = lieu['id']
senior.data[field.id + '_display'] = lieu['text']
senior.data[field.id + '_structured'] = lieu
# Accepte_Doc
canaux_contact = fields['canaux_contact']
non_envoi_brochure = fields['non_envoi_brochure']
adresse_erreur = fields['adresse_erreur']
accepte_doc = row['Accepte_Doc']
if accepte_doc and accepte_doc.lower() != 'non':
if accepte_doc == 'Brochure au conjoint':
senior.data[canaux_contact.id] = ['sms']
senior.data[canaux_contact.id + '_display'] = 'SMS'
senior.data[non_envoi_brochure.id] = True
elif accepte_doc == 'Par email':
senior.data[canaux_contact.id] = ['email', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courriel'
elif accepte_doc == 'Erreur adresse':
senior.data[canaux_contact.id] = ['courrier', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courrier,SMS'
senior.data[adresse_erreur.id] = True
elif accepte_doc == 'Par courrier':
senior.data[canaux_contact.id] = ['courrier', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courrier,SMS'
else:
raise ValueError('unknown Accepte_Doc %s' % accepte_doc)
# Certificat_Medical
certificat_medical = row['Certificat_Medical']
if certificat_medical and certificat_medical.lower() != 'non':
assert certificat_medical.lower().startswith('oui'), certificat_medical
senior.data[fields['cm'].id] = True
if certificat_medical.lower() == 'oui 2018/2019':
senior.data[fields['annee_cm'].id] = '2018/2019'
senior.data[fields['annee_cm'].id + '_display'] = '2018/2019'
elif certificat_medical.lower() == 'oui 2017':
senior.data[fields['annee_cm'].id] = '2017/2018'
senior.data[fields['annee_cm'].id + '_display'] = '2017/2018'
elif certificat_medical.lower() == 'oui 2019/2020':
senior.data[fields['annee_cm'].id] = '2019/2020'
senior.data[fields['annee_cm'].id + '_display'] = '2019/2020'
else:
raise ValueError('unknown Certificat_Medical %s' % certificat_medical)
else:
senior.data[fields['cm'].id] = False
# Personne_A_Contacter
senior.data[fields['pc_nom_prenom'].id] = row['Personne_A_Contacter'] or ''
senior.data[fields['pc_telephone'].id] = row['tel_a_contacter'] or ''
senior.data[fields['pc_lien'].id] = row['Lien_relationnel'] or ''
senior.data[fields['memo'].id] = row['Annotations_particulières2'] or ''
senior.data[fields['url_historique'].id] = row['URL_Historique_actvites'] or ''
if not senior.id:
created += 1
senior.just_created()
senior.store()
senior.perform_workflow()
else:
if old_data != senior.data:
updated += 1
senior.store()
logger.info('Created %d seniors.', created)
logger.info('Updated %d seniors.', updated)
seniors = SeniorData.select([st.Contains('user_id', [str(id) for id in users_by_id])])
senior_by_id = {str(senior.id): senior for senior in seniors}
senior_by_guid = {users_by_id[senior.user_id].name_identifiers[0]: senior for senior in seniors}
# Set conjoint later
logger.info('Adding conjoints...')
added = 0
for row in data:
# ID_Conjoint -> ItemField conjoint_senior
if row['conjoint_index']:
conjoint_row = ppids[row['ID_Conjoint']]
conjoint_guid = conjoint_row['guid']
conjoint_senior = senior_by_guid[conjoint_guid]
assert guids[conjoint_guid]['ppid'] == row['ID_Conjoint']
senior = senior_by_guid[row['guid']]
old_data = senior.data.copy()
senior.data[fields['conjoint_senior'].id] = str(conjoint_senior.id)
senior.data[fields['conjoint_senior'].id + '_display'] = conjoint_senior.digest
if senior.data != old_data:
added += 1
senior.store()
logger.info('Added %d conjoints.', added)

View File

@ -8,11 +8,10 @@ import uuid
logger = logging.getLogger('eudonet')
logger.propagate = False
handler = logging.StreamHandler()
handler.level = logging.WARNING
handler.setFormatter(
logging.Formatter('%(asctime)-15s %(levelname)s %(message)s'))
handler.level = logging.INFO
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)s %(message)s'))
logger.addHandler(handler)
# Accepte_Doc
# Adr_Personnelle
@ -95,7 +94,7 @@ def telephone(row, key):
else:
mobile = re.sub(r'[\s.-]', '', mobile).strip()
if not mobile.isascii() or not mobile.isdigit() or len(mobile) not in (9, 10):
logger.warning(f'line {row["line"]} : invalid {key} {row[key]}')
logger.warning(f'line {row["line"]} ppid {row["ppid"]} : invalid {key} {row[key]}')
mobile = ''
if len(mobile) == 9:
mobile = '0' + mobile
@ -129,13 +128,13 @@ def normalize(row, ppid_index):
email = row['Email']
if email:
if not re.match(r'^[a-zA-Z0-9_.-]+@[a-zA-Z0-9.-]+$', email):
logger.warning(f'line {line} : invalid Email {row["Email"]}')
logger.warning(f'line {line} ppid {row["ppid"]} : invalid Email {row["Email"]}')
email = None
row['email'] = email
# Archivé
if row['Archivé'] == 'OUI':
logger.warning('line {line} Archivé==OUI')
logger.warning(f'line {line} ppid {row["ppid"]} : Archivé==OUI')
row['import'] = False
# UUID
@ -156,7 +155,7 @@ def normalize(row, ppid_index):
if id_conjoint:
id_conjoint = ppid_index.get(id_conjoint)
if not id_conjoint:
logger.warning(f'line {line} : unknown ID_Conjoint {row["ID_Conjoint"]}')
logger.warning(f'line {line} ppid {row["ppid"]} : unknown ID_Conjoint {row["ID_Conjoint"]}')
row['conjoint_index'] = id_conjoint
# Date_Naissance
@ -169,7 +168,7 @@ def normalize(row, ppid_index):
try:
birthdate = datetime.datetime.strptime(birthdate, '%Y-%m-%d').date()
except ValueError:
logger.warning(f'line {row["line"]} : invalid Date_Naissance {row["Date_Naissance"]}')
logger.warning(f'line {row["line"]} ppid {row["ppid"]} : invalid Date_Naissance {row["Date_Naissance"]}')
row['birthdate'] = birthdate
# convert Accepte_Doc to three fields
@ -194,13 +193,16 @@ def normalize(row, ppid_index):
raise ValueError('invalid Accepte_Doc %r' % accepte_doc)
def load(filename):
def load(filename, number_of_rows=None):
logger.info(f'Loading rows of {filename}')
with open(filename) as fd:
reader = csv.DictReader(fd)
data = list(reader)
if number_of_rows:
data = [row for i, row in zip(range(number_of_rows), reader)]
else:
data = list(reader)
ppid_index = {row['ppid'].strip(): i for i, row in enumerate(data)}
@ -210,7 +212,7 @@ def load(filename):
error = False
for i, row in enumerate(data):
row['line'] = (i + 1)
logger.info(f'Loading row {i + 1:05d}')
logger.debug(f'Loading row {i + 1:05d}')
try:
normalize(row, ppid_index)
except ValueError: