misc-bdauvergne/cd06/senior/load-in-wcs.py

239 lines
8.4 KiB
Python

import sys
import loader
from quixote import get_publisher
from wcs.carddef import CardDef
import qommon.storage as st
logger = loader.logger
pub = get_publisher()
# work-around non initialization of substitutions by runscript
pub.substitutions.reset()
pub.substitutions.feed(pub)
for extra_source in pub.extra_sources:
pub.substitutions.feed(extra_source(pub, None))
User = pub.user_class
user_fields = {field.varname: field for field in User.get_fields()}
card_def = CardDef.get_by_urlname('senior')
SeniorData = card_def.data_class()
fields = {field.varname: field for field in card_def.fields if field.varname}
# adresse_erreur
# annee_cm
# archive
# canaux_contact
# ci
# cm
# conjoint_senior
# ea
# fiche_senior_signee
# lieu_inscription
# memo
# non_envoi_brochure
# okphoto
# particularites
# pc_lien
# pc_nom_prenom
# pc_telephone
# rib
# url_historique
keys, data = loader.load(sys.argv[1])
# guid index
guids = {row['guid']: row for row in data}
ppids = {row['ppid']: row for row in data}
# Create/find users
logger.info('Creating users...')
users = User.select([st.Intersects('name_identifiers', list(guids))])
users_by_guid = {user.name_identifiers[0]: user for user in users}
# Verify some invariants
assert len(users_by_guid) == len(users)
assert all(len(user.name_identifiers) == 1 for user in users)
logger.info('Found %d users.', len(users_by_guid))
created = 0
updated = 0
for row in data:
guid = row['guid']
user = users_by_guid.get(guid, User())
if not user.id:
user.form_data = {}
user.name_identifiers = [guid]
user.name = 'Senior %s' % guid
old_email = user.email
old_data = user.form_data.copy()
old_name = user.name
birthdate = row['birthdate']
if birthdate:
user.form_data[str(user_fields['birthdate'].id)] = row['birthdate'].timetuple()
user.form_data[str(user_fields['first_name'].id)] = row['Prenom']
user.form_data[str(user_fields['last_name'].id)] = row['Nom']
user.form_data[str(user_fields['nom_de_naissance'].id)] = row['Nom_JF'] or ''
user.form_data[str(user_fields['phone'].id)] = row['tel_fixe'] or ''
user.form_data[str(user_fields['mobile'].id)] = row['tel_mobile'] or ''
user.form_data[str(user_fields['title'].id)] = row['Civilite'] or ''
user.form_data[str(user_fields['lieu_de_naissance'].id)] = row['Lieu_Naissance'] or ''
user.form_data[str(user_fields['address'].id)] = row['NO_Voie'] or ''
user.form_data[str(user_fields['complement_d_adresse'].id)] = row['Batiment_Residence'] or ''
user.form_data[str(user_fields['zipcode'].id)] = row['CP'] or ''
user.form_data[str(user_fields['city'].id)] = row['Ville'] or ''
user.email = row['email'] or ''
user.name = '%s %s' % (row['Prenom'], row['Nom'])
if not user.id:
user.store()
created += 1
elif old_data != user.form_data or old_name != user.name or old_email != user.email:
user.store()
updated += 1
logger.info('Created %d users.', created)
logger.info('Updated %d users.', updated)
users = User.select([st.Intersects('name_identifiers', list(guids))])
users_by_guid = {user.name_identifiers[0]: user for user in users}
users_by_id = {str(user.id): user for user in users}
assert len(users_by_guid) == len(data)
# Create fiches
logger.info('Creating fiches...')
seniors = SeniorData.select([st.Contains('user_id', [str(id) for id in users_by_id])])
senior_by_id = {str(senior.id): senior for senior in seniors}
senior_by_guid = {users_by_id[senior.user_id].name_identifiers[0]: senior for senior in seniors}
assert len(seniors) == len(senior_by_guid)
logger.info('Found %d seniors.', len(seniors))
lieux_d_accueil = fields['lieu_inscription'].get_extended_options()
lieux_d_accueil_by_text = {x['text'].replace('-', ' ').lower(): x for x in lieux_d_accueil}
created = 0
updated = 0
for row in data:
guid = row['guid']
senior = senior_by_guid.get(guid, SeniorData())
if not senior.id:
senior.user_id = str(users_by_guid[guid].id)
senior.data = {}
else:
old_data = senior.data.copy()
# Autorise_Photos -> okphoto
field = fields['okphoto']
photo = row['Autorise_Photos']
if photo == 'OUI':
photo = 'Oui'
else:
photo = 'Non'
senior.data[field.id] = photo
senior.data[field.id + '_display'] = photo
# Lieux d'accueil
field = fields['lieu_inscription']
lieu = row['Lieu_Inscription']
if lieu:
lieu = lieu.replace('-', ' ').lower().replace('msd', 'mds')
lieu = lieux_d_accueil_by_text[lieu]
senior.data[field.id] = lieu['id']
senior.data[field.id + '_display'] = lieu['text']
senior.data[field.id + '_structured'] = lieu
# Accepte_Doc
canaux_contact = fields['canaux_contact']
non_envoi_brochure = fields['non_envoi_brochure']
adresse_erreur = fields['adresse_erreur']
accepte_doc = row['Accepte_Doc']
if accepte_doc and accepte_doc.lower() != 'non':
if accepte_doc == 'Brochure au conjoint':
senior.data[canaux_contact.id] = ['sms']
senior.data[canaux_contact.id + '_display'] = 'SMS'
senior.data[non_envoi_brochure.id] = True
elif accepte_doc == 'Par email':
senior.data[canaux_contact.id] = ['email', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courriel'
elif accepte_doc == 'Erreur adresse':
senior.data[canaux_contact.id] = ['courrier', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courrier,SMS'
senior.data[adresse_erreur.id] = True
elif accepte_doc == 'Par courrier':
senior.data[canaux_contact.id] = ['courrier', 'sms']
senior.data[canaux_contact.id + '_display'] = 'Courrier,SMS'
else:
raise ValueError('unknown Accepte_Doc %s' % accepte_doc)
# Certificat_Medical
certificat_medical = row['Certificat_Medical']
if certificat_medical and certificat_medical.lower() != 'non':
assert certificat_medical.lower().startswith('oui'), certificat_medical
senior.data[fields['cm'].id] = True
if certificat_medical.lower() == 'oui 2018/2019':
senior.data[fields['annee_cm'].id] = '2018/2019'
senior.data[fields['annee_cm'].id + '_display'] = '2018/2019'
elif certificat_medical.lower() == 'oui 2017':
senior.data[fields['annee_cm'].id] = '2017/2018'
senior.data[fields['annee_cm'].id + '_display'] = '2017/2018'
elif certificat_medical.lower() == 'oui 2019/2020':
senior.data[fields['annee_cm'].id] = '2019/2020'
senior.data[fields['annee_cm'].id + '_display'] = '2019/2020'
else:
raise ValueError('unknown Certificat_Medical %s' % certificat_medical)
else:
senior.data[fields['cm'].id] = False
# Personne_A_Contacter
senior.data[fields['pc_nom_prenom'].id] = row['Personne_A_Contacter'] or ''
senior.data[fields['pc_telephone'].id] = row['tel_a_contacter'] or ''
senior.data[fields['pc_lien'].id] = row['Lien_relationnel'] or ''
senior.data[fields['memo'].id] = row['Annotations_particulières2'] or ''
senior.data[fields['url_historique'].id] = row['URL_Historique_actvites'] or ''
if not senior.id:
created += 1
senior.just_created()
senior.store()
senior.perform_workflow()
else:
if old_data != senior.data:
updated += 1
senior.store()
logger.info('Created %d seniors.', created)
logger.info('Updated %d seniors.', updated)
seniors = SeniorData.select([st.Contains('user_id', [str(id) for id in users_by_id])])
senior_by_id = {str(senior.id): senior for senior in seniors}
senior_by_guid = {users_by_id[senior.user_id].name_identifiers[0]: senior for senior in seniors}
# Set conjoint later
logger.info('Adding conjoints...')
added = 0
for row in data:
# ID_Conjoint -> ItemField conjoint_senior
if row['conjoint_index']:
conjoint_row = ppids[row['ID_Conjoint']]
conjoint_guid = conjoint_row['guid']
conjoint_senior = senior_by_guid[conjoint_guid]
assert guids[conjoint_guid]['ppid'] == row['ID_Conjoint']
senior = senior_by_guid[row['guid']]
old_data = senior.data.copy()
senior.data[fields['conjoint_senior'].id] = str(conjoint_senior.id)
senior.data[fields['conjoint_senior'].id + '_display'] = conjoint_senior.digest
if senior.data != old_data:
added += 1
senior.store()
logger.info('Added %d conjoints.', added)