misc-cazino/venissieux-technocarte/run.py

398 lines
13 KiB
Python

"""
Usage :
wcs-manage runscript -d wcs.dev.publik.love run.py wcs-reset
wcs-manage runscript -d wcs.dev.publik.love run.py wcs-import --filepath ~/notes/clients/venissieux/famille/technocarte.csv --api-url=https://wcs.dev.publik.love/api/ --email=admin@localhost --orig=wcs.dev.publik.love --key=c25fc1f82bd56b01e7cf62d785ae4410b0f7fdbebfb92a986d3fd6a150f3ba0e
"""
import argparse
import base64
import collections
import csv
import datetime
import hashlib
import hmac
import random
import time
import urllib.parse
import requests
from requests.auth import HTTPBasicAuth
from wcs.carddef import CardDef
CARD_ADULTE_SLUG = 'adultes'
CARD_ENFANT_SLUG = 'enfants'
CARD_FAMILLE_SLUG = 'familles'
CARD_GARDE_SLUG = 'gardes'
def sign_url(url, key, algo='sha256', orig=None, timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, orig, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', orig=None, timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:].rstrip('L')
new_query = query
if new_query:
new_query += '&'
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
if orig is not None:
new_query += '&' + urllib.parse.urlencode({'orig': orig})
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&' + urllib.parse.urlencode({'signature': signature})
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if not isinstance(s, bytes):
s = s.encode('utf-8')
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def wcs_api_call(url, args, data=None, user_email=None, method='post'):
url = sign_url(url + '?email=%s' % args.email, args.key, orig=args.orig)
if method == 'post':
post_data = {'data': data}
if user_email:
post_data['user'] = {'email': user_email}
resp = requests.post(url, json=post_data)
elif method == 'get':
resp = requests.get(url)
resp.raise_for_status()
return resp
def a2_api_call(url, args, data=None, method='post'):
if method == 'post':
resp = requests.post(url, json=data, auth=HTTPBasicAuth(args.api_user, args.api_pass))
elif method == 'get':
resp = requests.get(url, auth=HTTPBasicAuth(args.api_user, args.api_pass))
elif method == 'delete':
resp = requests.delete(url, auth=HTTPBasicAuth(args.api_user, args.api_pass))
resp.raise_for_status()
return resp
def get_field_value(carddata, varname):
field = None
for field in carddata.formdef.get_all_fields():
if field.varname == varname:
break
assert field is not None
return carddata.data.get(field.id)
def get_rows(args):
with open(args.filepath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for i, row in enumerate(reader):
if args.mode != 'full' and i > args.sample_numlines:
break
yield row
def build_email(first_name, last_name):
return 'vpf-%s-%s@yopmail.com' % (first_name, last_name)
def init_human_data(row):
data = {
'nom': row['NOMENF'],
'prenom': row['PREENF']
}
codsex = row['CODSEX']
genre = ''
if codsex == 'M':
genre = 'Homme'
elif codsex == 'F':
genre = 'Femme'
data['genre'] = genre
if row['DATNAI']:
data['date_naissance'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['DATNAI'][:10], '%d/%m/%Y')
)
return data
def create_adultes(args):
adultes = {}
familles_adultes = collections.defaultdict(list)
url = args.api_url + 'cards/adultes/submit'
for row in get_rows(args):
if row['ADULTE'] != '1':
continue
code_famille = row['CODFAM']
if len(familles_adultes[code_famille]) >= 2:
# gestion des doublons
continue
email = build_email(row['PREENF'], row['NOMENF'])
data = init_human_data(row)
data['telephone'] = row['NUMTEL']
data['courriel'] = email
data['numero_voie'] = "%s %s" % (row['NUMVOI'], row['ADRENF1'])
numbis = row['NUMBIS']
if numbis:
numbis = numbis.capitalize()
if numbis in ('Bis', 'Ter', 'Quater'):
data['bis'] = numbis
data['code_postal'] = row['CODPOSENF']
data['commune'] = row['COMMUNE']
if row['NUMTEL2']:
data['portable'] = row['NUMTEL2']
civilite = row['FORPOL']
civilite_clean = None
if civilite == 'M.':
civilite_clean = 'Monsieur'
elif civilite == 'Mme':
civilite_clean = 'Madame'
if civilite_clean:
data['civilite'] = civilite_clean
resp = wcs_api_call(url, args, data=data, user_email=email)
adulte = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': code_famille,
'nom': row['NOMENF'],
'email': email
}
adultes[row['CODENF']] = adulte
familles_adultes[code_famille].append(adulte)
return adultes, familles_adultes
def create_enfants(args, familles_adultes):
enfants = {}
familles_enfants = collections.defaultdict(list)
url = args.api_url + 'cards/%s/submit' % CARD_ENFANT_SLUG
for row in get_rows(args):
if row['ADULTE'] == '1':
continue
data = init_human_data(row)
if row['ZONSCOL']:
data['zone_scolaire'] = row['ZONSCOL']
if row['ECOLE']:
data['ecole'] = row['ECOLE']
if row['INSCRITSCO']:
data['date_premiere_inscription'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['INSCRITSCO'][:10], '%d/%m/%Y')
)
if row['GRPSCO']:
data['groupe_scolaire'] = row['GRPSCO']
technocarte_famille_id = row['CODFAM']
# find email of the first parent, for ownership of the card
user_email = None
for adulte in familles_adultes.get(technocarte_famille_id, []):
user_email = adulte['email']
break
resp = wcs_api_call(url, args, data, user_email=user_email)
enfant = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': technocarte_famille_id
}
if row['CODPEREBIO']:
enfant['pere_technocarte_id'] = row['CODPEREBIO']
if row['CODMEREBIO']:
enfant['mere_technocarte_id'] = row['CODMEREBIO']
if row['CAF']:
enfant['numero_allocataire_caf'] = row['CAF']
enfants[row['CODENF']] = enfant
familles_enfants[technocarte_famille_id].append(enfant)
return enfants, familles_enfants
def create_familles(args, adultes, familles_enfants):
familles_map = {}
url = args.api_url + 'cards/%s/submit' % CARD_FAMILLE_SLUG
for adulte_technocarte_id, adulte in adultes.items():
technocarte_famille_id = adulte['technocarte_famille_id']
if technocarte_famille_id not in familles_map:
# création famille
data = {
'famille': adulte['nom'],
'adulte1': adulte['publik_id']
}
# try to grab a numero_allocataire_caf
numero_allocataire_caf = None
for enfant in familles_enfants.get(technocarte_famille_id, []):
numero_allocataire_caf = enfant.get('numero_allocataire_caf')
if numero_allocataire_caf:
break
if numero_allocataire_caf:
data['numero_allocataire_caf'] = numero_allocataire_caf
url = args.api_url + 'cards/%s/submit' % CARD_FAMILLE_SLUG
resp = wcs_api_call(url, args, data, user_email=adulte['email'])
familles_map[technocarte_famille_id] = str(resp.json()['data']['id'])
else:
url = args.api_url + 'cards/%s/%s/' % (
CARD_FAMILLE_SLUG, familles_map[technocarte_famille_id]
)
card = wcs_api_call(url, args, method='get').json()
if card['fields']['adulte2']:
# gestion des doublons, s'il ya déjà un deuxième adulte, on zappe
continue
# ajouter comme deuxième adulte à la famille existante
data = {
'adulte2': adulte['publik_id']
}
if card['fields']['famille'] != adulte['nom']:
# les gens ne se marient plus
data['famille'] = "%s %s" % (card['fields']['famille'], adulte['nom'])
wcs_api_call(url, args, data)
return familles_map
def create_gardes(args, enfants, familles, familles_adultes):
gardes = {}
url = args.api_url + 'cards/%s/submit' % CARD_GARDE_SLUG
for _, enfant in enfants.items():
publik_id = enfant['publik_id']
technocarte_famille_id = enfant['technocarte_famille_id']
publik_famille_id = familles.get(technocarte_famille_id)
if publik_famille_id:
# création garde
data = {
'enfant': publik_id,
'famille': publik_famille_id
}
# find email of the first parent, for ownership of the card
user_email = None
for adulte in familles_adultes.get(technocarte_famille_id, []):
user_email = adulte['email']
break
resp = wcs_api_call(url, args, data, user_email=user_email)
gardes[publik_id] = str(resp.json()['data']['id'])
return gardes
def wcs_import_data(args):
adultes, familles_adultes = create_adultes(args)
enfants, familles_enfants = create_enfants(args, familles_adultes)
familles = create_familles(args, adultes, familles_enfants)
create_gardes(args, enfants, familles, familles_adultes)
def a2_import_data(args):
familles_adulte_count = collections.defaultdict(int)
url = args.api_url + 'users/'
for row in get_rows(args):
if row['ADULTE'] != '1':
continue
code_famille = row['CODFAM']
if familles_adulte_count[code_famille] >= 2:
# gestion des doublons
continue
data = {
'last_name': row['NOMENF'],
'first_name': row['PREENF'],
'email': build_email(row['PREENF'], row['NOMENF']),
'send_registration_email': False
}
codsex = row['CODSEX']
if codsex == 'M':
data['gender'] = 1
elif codsex == 'F':
data['gender'] = 2
a2_api_call(url, args, data)
def a2_reset(args):
to_delete = []
url = args.api_url + 'users/'
next_ = True
while next_:
data = a2_api_call(url, args, method='get').json()
next_ = data['next']
for user in data['results']:
email, uuid = user['email'], user['uuid']
if email.startswith('vpf-') and email.endswith('yopmail.com'):
to_delete.append(uuid)
for uuid in to_delete:
url = args.api_url + 'users/%s/' % uuid
a2_api_call(url, args, method='delete')
def wcs_reset(args):
CardDef.get_by_urlname(CARD_ENFANT_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_ADULTE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_FAMILLE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_GARDE_SLUG).data_class().wipe()
def add_import_parser_args(parser):
parser.add_argument('--filepath')
parser.add_argument('--mode', default='sample', choices=('sample', 'full'))
parser.add_argument('--sample-numlines', default=100, type=int)
return parser
def add_a2_parser_args(parser):
parser.add_argument('--api-url')
parser.add_argument('--api-user')
parser.add_argument('--api-pass')
return parser
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
wcs_parser_import = subparsers.add_parser('wcs-import')
wcs_parser_import.set_defaults(func=wcs_import_data)
wcs_parser_import = add_import_parser_args(wcs_parser_import)
wcs_parser_import.add_argument('--api-url')
wcs_parser_import.add_argument('--email')
wcs_parser_import.add_argument('--orig')
wcs_parser_import.add_argument('--key')
wcs_parser_reset = subparsers.add_parser('wcs-reset')
wcs_parser_reset.set_defaults(func=wcs_reset)
a2_parser_import = subparsers.add_parser('a2-import')
a2_parser_import.set_defaults(func=a2_import_data)
a2_parser_import = add_import_parser_args(a2_parser_import)
a2_parser_import = add_a2_parser_args(a2_parser_import)
a2_parser_reset = subparsers.add_parser('a2-reset')
a2_parser_reset.set_defaults(func=a2_reset)
a2_parser_reset = add_a2_parser_args(a2_parser_reset)
args = parser.parse_args()
args.func(args)