misc-cazino/venissieux-technocarte/run.py

253 lines
7.3 KiB
Python

import base64
import collections
import csv
import datetime
import hashlib
import hmac
import random
import sys
import time
import urllib.parse
import requests
from wcs.carddef import CardDef
CARD_ADULTE_SLUG = 'adultes'
CARD_ENFANT_SLUG = 'enfants'
CARD_FAMILLE_SLUG = 'familles'
CARD_GARDE_SLUG = 'gardes'
USER = 'admin'
WCS_BASE_URL = 'https://@wcs.dev.publik.love/api/'
EMAIL = 'admin@localhost'
WCS_ORIG = 'wcs.dev.publik.love'
WCS_KEY = 'c25fc1f82bd56b01e7cf62d785ae4410b0f7fdbebfb92a986d3fd6a150f3ba0e'
FILEPATH = None
ACTION = sys.argv[1]
assert ACTION in ('import', 'reset')
if ACTION == 'import':
FILEPATH = sys.argv[2]
MODE = 'sample'
if ACTION == 'import' and len(sys.argv) > 3:
MODE = sys.argv[3]
assert MODE in ('sample', 'full')
def sign_url(url, key, algo='sha256', orig=None, timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, orig, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', orig=None, timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:].rstrip('L')
new_query = query
if new_query:
new_query += '&'
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
if orig is not None:
new_query += '&' + urllib.parse.urlencode({'orig': orig})
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&' + urllib.parse.urlencode({'signature': signature})
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if not isinstance(s, bytes):
s = s.encode('utf-8')
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def api_call(url, data=None, method='post'):
url = sign_url(url + '?email=%s' % EMAIL, WCS_KEY, orig=WCS_ORIG)
if method == 'post':
resp = requests.post(url, json={'data': data})
elif method == 'get':
resp = requests.get(url)
resp.raise_for_status()
return resp
def get_field_value(carddata, varname):
field = None
for field in carddata.formdef.get_all_fields():
if field.varname == varname:
break
assert field is not None
return carddata.data.get(field.id)
def get_rows():
with open(FILEPATH) as csvfile:
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for i, row in enumerate(reader):
if MODE != 'full' and i > 100:
break
yield row
def init_human_data(row):
data = {
'nom': row['NOMENF'],
'prenom': row['PREENF']
}
codsex = row['CODSEX']
genre = ''
if codsex == 'M':
genre = 'Homme'
elif codsex == 'F':
genre = 'Femme'
data['genre'] = genre
if row['DATNAI']:
data['date_naissance'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['DATNAI'][:10], '%d/%m/%Y')
)
return data
def create_adultes():
adultes = {}
familles_adulte_count = collections.defaultdict(int)
url = WCS_BASE_URL + 'cards/adultes/submit'
for row in get_rows():
if row['ADULTE'] != '1':
continue
code_famille = row['CODFAM']
if familles_adulte_count[code_famille] >= 2:
# gestion des doublons
continue
data = init_human_data(row)
data['telephone'] = row['NUMTEL']
data['courriel'] = row['MAIL']
data['numero_voie'] = "%s %s" % (row['NUMVOI'], row['ADRENF1'])
data['code_postal'] = row['CODPOSENF']
data['commune'] = row['COMMUNE']
resp = api_call(url, data)
familles_adulte_count[code_famille] += 1
adultes[row['CODENF']] = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': code_famille,
'nom': row['NOMENF']
}
return adultes
def create_enfants():
enfants = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_ENFANT_SLUG
for row in get_rows():
if row['ADULTE'] == '1':
continue
data = init_human_data(row)
resp = api_call(url, data)
enfant = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': row['CODFAM']
}
if row['CODPEREBIO']:
enfant['pere_technocarte_id'] = row['CODPEREBIO']
if row['CODMEREBIO']:
enfant['mere_technocarte_id'] = row['CODMEREBIO']
enfants[row['CODENF']] = enfant
return enfants
def create_familles(adultes, enfants):
familles_map = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
for adulte_technocarte_id, adulte in adultes.items():
technocarte_famille_id = adulte['technocarte_famille_id']
if technocarte_famille_id not in familles_map:
# création famille
data = {
'famille': adulte['nom'],
'adulte1': adulte['publik_id']
}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
resp = api_call(url, data)
familles_map[technocarte_famille_id] = str(resp.json()['data']['id'])
else:
url = WCS_BASE_URL + 'cards/%s/%s/' % (
CARD_FAMILLE_SLUG, familles_map[technocarte_famille_id]
)
card = api_call(url, method='get').json()
if card['fields']['adulte2']:
# gestion des doublons, s'il ya déjà un deuxième adulte, on zappe
continue
# ajouter comme deuxième adulte à la famille existante
data = {
'adulte2': adulte['publik_id']
}
if card['fields']['famille'] != adulte['nom']:
# les gens ne se marient plus
data['famille'] = "%s %s" % (card['fields']['famille'], adulte['nom'])
api_call(url, data)
return familles_map
def create_gardes(enfants, familles):
gardes = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_GARDE_SLUG
for _, enfant in enfants.items():
publik_id = enfant['publik_id']
technocarte_famille_id = enfant['technocarte_famille_id']
publik_famille_id = familles[technocarte_famille_id]
# création garde
data = {
'enfant': publik_id,
'famille': publik_famille_id
}
resp = api_call(url, data)
gardes[publik_id] = str(resp.json()['data']['id'])
return gardes
def import_data():
adultes = create_adultes()
enfants = create_enfants()
familles = create_familles(adultes, enfants)
create_gardes(enfants, familles)
def reset():
CardDef.get_by_urlname(CARD_ENFANT_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_ADULTE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_FAMILLE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_GARDE_SLUG).data_class().wipe()
if ACTION == 'import':
import_data()
elif ACTION == 'reset':
reset()