misc-cazino/venissieux-technocarte/wcs_run.py

312 lines
10 KiB
Python

"""
Usage :
wcs-manage runscript -d wcs.dev.publik.love wcs_run.py reset
wcs-manage runscript -d wcs.dev.publik.love wcs_run.py import --filepath ~/notes/clients/venissieux/famille/technocarte.csv --api-url=https://wcs.dev.publik.love/api/ --email=admin@localhost --orig=wcs.dev.publik.love --key=c25fc1f82bd56b01e7cf62d785ae4410b0f7fdbebfb92a986d3fd6a150f3ba0e
"""
import argparse
import base64
import collections
import csv
import datetime
import hashlib
import hmac
import random
import time
import urllib.parse
import requests
from wcs.carddef import CardDef
CARD_ADULTE_SLUG = 'adultes'
CARD_ENFANT_SLUG = 'enfants'
CARD_FAMILLE_SLUG = 'familles'
CARD_GARDE_SLUG = 'gardes'
def sign_url(url, key, algo='sha256', orig=None, timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, orig, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', orig=None, timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:].rstrip('L')
new_query = query
if new_query:
new_query += '&'
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
if orig is not None:
new_query += '&' + urllib.parse.urlencode({'orig': orig})
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&' + urllib.parse.urlencode({'signature': signature})
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if not isinstance(s, bytes):
s = s.encode('utf-8')
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def api_call(url, args, data=None, user_email=None, method='post'):
url = sign_url(url + '?email=%s' % args.email, args.key, orig=args.orig)
if method == 'post':
post_data = {'data': data}
if user_email:
post_data['user'] = {'email': user_email}
resp = requests.post(url, json=post_data)
elif method == 'get':
resp = requests.get(url)
resp.raise_for_status()
return resp
def get_rows(args):
with open(args.filepath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for i, row in enumerate(reader):
if args.mode != 'full' and i > args.sample_numlines:
break
yield row
def build_email(first_name, last_name):
return 'vpf-%s-%s@yopmail.com' % (first_name, last_name)
def init_human_data(row):
data = {
'nom': row['NOMENF'],
'prenom': row['PREENF']
}
codsex = row['CODSEX']
genre = ''
if codsex == 'M':
genre = 'Homme'
elif codsex == 'F':
genre = 'Femme'
data['genre'] = genre
if row['DATNAI']:
data['date_naissance'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['DATNAI'][:10], '%d/%m/%Y')
)
return data
def create_adultes(args):
adultes = {}
familles_adultes = collections.defaultdict(list)
url = args.api_url + 'cards/adultes/submit'
for row in get_rows(args):
if row['ADULTE'] != '1':
continue
code_famille = row['CODFAM']
if len(familles_adultes[code_famille]) >= 2:
# gestion des doublons
continue
email = build_email(row['PREENF'], row['NOMENF'])
data = init_human_data(row)
data['telephone'] = row['NUMTEL']
data['courriel'] = email
data['numero_voie'] = "%s %s" % (row['NUMVOI'], row['ADRENF1'])
numbis = row['NUMBIS']
if numbis:
numbis = numbis.capitalize()
if numbis in ('Bis', 'Ter', 'Quater'):
data['bis'] = numbis
data['code_postal'] = row['CODPOSENF']
data['commune'] = row['COMMUNE']
if row['NUMTEL2']:
data['portable'] = row['NUMTEL2']
civilite = row['FORPOL']
civilite_clean = None
if civilite == 'M.':
civilite_clean = 'Monsieur'
elif civilite == 'Mme':
civilite_clean = 'Madame'
if civilite_clean:
data['civilite'] = civilite_clean
resp = api_call(url, args, data=data, user_email=email)
adulte = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': code_famille,
'nom': row['NOMENF'],
'email': email
}
adultes[row['CODENF']] = adulte
familles_adultes[code_famille].append(adulte)
return adultes, familles_adultes
def create_enfants(args, familles_adultes):
enfants = {}
familles_enfants = collections.defaultdict(list)
url = args.api_url + 'cards/%s/submit' % CARD_ENFANT_SLUG
for row in get_rows(args):
if row['ADULTE'] == '1':
continue
data = init_human_data(row)
if row['ZONSCOL']:
data['zone_scolaire'] = row['ZONSCOL']
if row['ECOLE']:
data['ecole'] = row['ECOLE']
if row['INSCRITSCO']:
data['date_premiere_inscription'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['INSCRITSCO'][:10], '%d/%m/%Y')
)
if row['GRPSCO']:
data['groupe_scolaire'] = row['GRPSCO']
technocarte_famille_id = row['CODFAM']
# find email of the first parent, for ownership of the card
user_email = None
for adulte in familles_adultes.get(technocarte_famille_id, []):
user_email = adulte['email']
break
resp = api_call(url, args, data, user_email=user_email)
enfant = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': technocarte_famille_id
}
if row['CODPEREBIO']:
enfant['pere_technocarte_id'] = row['CODPEREBIO']
if row['CODMEREBIO']:
enfant['mere_technocarte_id'] = row['CODMEREBIO']
if row['CAF']:
enfant['numero_allocataire_caf'] = row['CAF']
enfants[row['CODENF']] = enfant
familles_enfants[technocarte_famille_id].append(enfant)
return enfants, familles_enfants
def create_familles(args, adultes, familles_enfants):
familles_map = {}
url = args.api_url + 'cards/%s/submit' % CARD_FAMILLE_SLUG
for adulte_technocarte_id, adulte in adultes.items():
technocarte_famille_id = adulte['technocarte_famille_id']
if technocarte_famille_id not in familles_map:
# création famille
data = {
'famille': adulte['nom'],
'adulte1': adulte['publik_id']
}
# try to grab a numero_allocataire_caf
numero_allocataire_caf = None
for enfant in familles_enfants.get(technocarte_famille_id, []):
numero_allocataire_caf = enfant.get('numero_allocataire_caf')
if numero_allocataire_caf:
break
if numero_allocataire_caf:
data['numero_allocataire_caf'] = numero_allocataire_caf
url = args.api_url + 'cards/%s/submit' % CARD_FAMILLE_SLUG
resp = api_call(url, args, data, user_email=adulte['email'])
familles_map[technocarte_famille_id] = str(resp.json()['data']['id'])
else:
url = args.api_url + 'cards/%s/%s/' % (
CARD_FAMILLE_SLUG, familles_map[technocarte_famille_id]
)
card = api_call(url, args, method='get').json()
if card['fields']['adulte2']:
# gestion des doublons, s'il ya déjà un deuxième adulte, on zappe
continue
# ajouter comme deuxième adulte à la famille existante
data = {
'adulte2': adulte['publik_id']
}
if card['fields']['famille'] != adulte['nom']:
# les gens ne se marient plus
data['famille'] = "%s %s" % (card['fields']['famille'], adulte['nom'])
api_call(url, args, data)
return familles_map
def create_gardes(args, enfants, familles, familles_adultes):
gardes = {}
url = args.api_url + 'cards/%s/submit' % CARD_GARDE_SLUG
for _, enfant in enfants.items():
publik_id = enfant['publik_id']
technocarte_famille_id = enfant['technocarte_famille_id']
publik_famille_id = familles.get(technocarte_famille_id)
if publik_famille_id:
# création garde
data = {
'enfant': publik_id,
'famille': publik_famille_id
}
# find email of the first parent, for ownership of the card
user_email = None
for adulte in familles_adultes.get(technocarte_famille_id, []):
user_email = adulte['email']
break
resp = api_call(url, args, data, user_email=user_email)
gardes[publik_id] = str(resp.json()['data']['id'])
return gardes
def import_data(args):
adultes, familles_adultes = create_adultes(args)
enfants, familles_enfants = create_enfants(args, familles_adultes)
familles = create_familles(args, adultes, familles_enfants)
create_gardes(args, enfants, familles, familles_adultes)
def reset(args):
CardDef.get_by_urlname(CARD_ENFANT_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_ADULTE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_FAMILLE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_GARDE_SLUG).data_class().wipe()
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
parser_import = subparsers.add_parser('import')
parser_import.set_defaults(func=import_data)
parser_import.add_argument('--filepath')
parser_import.add_argument('--mode', default='sample', choices=('sample', 'full'))
parser_import.add_argument('--sample-numlines', default=100, type=int)
parser_import.add_argument('--api-url')
parser_import.add_argument('--email')
parser_import.add_argument('--orig')
parser_import.add_argument('--key')
parser_reset = subparsers.add_parser('reset')
parser_reset.set_defaults(func=reset)
args = parser.parse_args()
args.func(args)