misc-cazino/venissieux-technocarte/run.py

212 lines
6.0 KiB
Python

import base64
import csv
import datetime
import hashlib
import hmac
import random
import sys
import time
import urllib.parse
import requests
from wcs.carddef import CardDef
CARD_ENFANT_SLUG = 'enfants'
CARD_ADULTE_SLUG = 'adultes'
CARD_FAMILLE_SLUG = 'familles'
USER = 'admin'
WCS_BASE_URL = 'https://@wcs.dev.publik.love/api/'
EMAIL = 'admin@localhost'
WCS_ORIG = 'wcs.dev.publik.love'
WCS_KEY = 'c25fc1f82bd56b01e7cf62d785ae4410b0f7fdbebfb92a986d3fd6a150f3ba0e'
FILEPATH = None
ACTION = sys.argv[1]
assert ACTION in ('import', 'reset')
if ACTION == 'import':
FILEPATH = sys.argv[2]
def sign_url(url, key, algo='sha256', orig=None, timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, orig, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', orig=None, timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:].rstrip('L')
new_query = query
if new_query:
new_query += '&'
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
if orig is not None:
new_query += '&' + urllib.parse.urlencode({'orig': orig})
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&' + urllib.parse.urlencode({'signature': signature})
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if not isinstance(s, bytes):
s = s.encode('utf-8')
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def api_call(url, data=None, method='post'):
url = sign_url(url + '?email=%s' % EMAIL, WCS_KEY, orig=WCS_ORIG)
if method == 'post':
resp = requests.post(url, json={'data': data})
elif method == 'get':
resp = requests.get(url)
resp.raise_for_status()
return resp
def get_field_value(carddata, varname):
field = None
for field in carddata.formdef.get_all_fields():
if field.varname == varname:
break
assert field is not None
return carddata.data.get(field.id)
def get_rows():
with open(FILEPATH) as csvfile:
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for i, row in enumerate(reader):
if i > 100:
break
yield row
def init_human_data(row):
data = {
'nom': row['NOMENF'],
'prenom': row['PREENF']
}
codsex = row['CODSEX']
genre = ''
if codsex == 'M':
genre = 'Homme'
elif codsex == 'F':
genre = 'Femme'
data['genre'] = genre
if row['DATNAI']:
data['date_naissance'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['DATNAI'][:10], '%d/%m/%Y')
)
return data
def create_adultes():
adultes = {}
url = WCS_BASE_URL + 'cards/adultes/submit'
for row in get_rows():
if row['ADULTE'] != '1':
continue
data = init_human_data(row)
data['telephone'] = row['NUMTEL']
data['courriel'] = row['MAIL']
data['numero_voie'] = "%s %s" % (row['NUMVOI'], row['ADRENF1'])
data['code_postal'] = row['CODPOSENF']
data['commune'] = row['COMMUNE']
resp = api_call(url, data)
adultes[row['CODENF']] = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': row['CODFAM'],
'nom': row['NOMENF']
}
return adultes
def create_enfants():
enfants = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_ENFANT_SLUG
for row in get_rows():
if row['ADULTE'] == '1':
continue
data = init_human_data(row)
resp = api_call(url, data)
enfant = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': row['CODFAM']
}
if row['CODPEREBIO']:
enfant['pere_technocarte_id'] = row['CODPEREBIO']
if row['CODMEREBIO']:
enfant['mere_technocarte_id'] = row['CODMEREBIO']
enfants[row['CODENF']] = enfant
return enfants
def create_familles(adultes, enfants):
familles_map = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
for adulte_technocarte_id, adulte in adultes.items():
technocarte_famille_id = adulte['technocarte_famille_id']
if technocarte_famille_id not in familles_map:
# création famille
data = {
'famille': adulte['nom'],
'adulte1': adulte['publik_id']
}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
resp = api_call(url, data)
familles_map[technocarte_famille_id] = str(resp.json()['data']['id'])
else:
# ajouter comme deuxième adulte à la famille existante
url = WCS_BASE_URL + 'cards/%s/%s/' % (
CARD_FAMILLE_SLUG, familles_map[technocarte_famille_id]
)
data = {
'adulte2': adulte['publik_id']
}
nom_famille = api_call(url, method='get').json()['fields']['famille']
if nom_famille != adulte['nom']:
# les gens ne se marient plus
data['famille'] = "%s %s" % (nom_famille, adulte['nom'])
api_call(url, data)
def import_data():
adultes = create_adultes()
enfants = create_enfants()
create_familles(adultes, enfants)
def reset():
CardDef.get_by_urlname(CARD_ENFANT_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_ADULTE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_FAMILLE_SLUG).data_class().wipe()
if ACTION == 'import':
import_data()
elif ACTION == 'reset':
reset()