misc-cazino/venissieux-technocarte/run.py

224 lines
6.4 KiB
Python

import base64
import collections
import csv
import datetime
import hashlib
import hmac
import random
import sys
import time
import urllib.parse
import requests
from wcs.carddef import CardDef
CARD_ENFANT_SLUG = 'enfants'
CARD_ADULTE_SLUG = 'adultes'
CARD_FAMILLE_SLUG = 'familles'
USER = 'admin'
WCS_BASE_URL = 'https://@wcs.dev.publik.love/api/'
EMAIL = 'admin@localhost'
WCS_ORIG = 'wcs.dev.publik.love'
WCS_KEY = 'c25fc1f82bd56b01e7cf62d785ae4410b0f7fdbebfb92a986d3fd6a150f3ba0e'
FILEPATH = None
ACTION = sys.argv[1]
assert ACTION in ('import', 'reset')
if ACTION == 'import':
FILEPATH = sys.argv[2]
def sign_url(url, key, algo='sha256', orig=None, timestamp=None, nonce=None):
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, orig, timestamp, nonce)
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', orig=None, timestamp=None, nonce=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None:
nonce = hex(random.getrandbits(128))[2:].rstrip('L')
new_query = query
if new_query:
new_query += '&'
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
if orig is not None:
new_query += '&' + urllib.parse.urlencode({'orig': orig})
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&' + urllib.parse.urlencode({'signature': signature})
return new_query
def sign_string(s, key, algo='sha256', timedelta=30):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if not isinstance(s, bytes):
s = s.encode('utf-8')
digestmod = getattr(hashlib, algo)
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest()
def api_call(url, data=None, method='post'):
url = sign_url(url + '?email=%s' % EMAIL, WCS_KEY, orig=WCS_ORIG)
if method == 'post':
resp = requests.post(url, json={'data': data})
elif method == 'get':
resp = requests.get(url)
resp.raise_for_status()
return resp
def get_field_value(carddata, varname):
field = None
for field in carddata.formdef.get_all_fields():
if field.varname == varname:
break
assert field is not None
return carddata.data.get(field.id)
def get_rows():
with open(FILEPATH) as csvfile:
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for i, row in enumerate(reader):
if i > 100:
break
yield row
def init_human_data(row):
data = {
'nom': row['NOMENF'],
'prenom': row['PREENF']
}
codsex = row['CODSEX']
genre = ''
if codsex == 'M':
genre = 'Homme'
elif codsex == 'F':
genre = 'Femme'
data['genre'] = genre
if row['DATNAI']:
data['date_naissance'] = time.strftime(
'%Y-%m-%d',
time.strptime(row['DATNAI'][:10], '%d/%m/%Y')
)
return data
def create_adultes():
adultes = {}
familles_adulte_count = collections.defaultdict(int)
url = WCS_BASE_URL + 'cards/adultes/submit'
for row in get_rows():
if row['ADULTE'] != '1':
continue
code_famille = row['CODFAM']
if familles_adulte_count[code_famille] >= 2:
# gestion des doublons
continue
data = init_human_data(row)
data['telephone'] = row['NUMTEL']
data['courriel'] = row['MAIL']
data['numero_voie'] = "%s %s" % (row['NUMVOI'], row['ADRENF1'])
data['code_postal'] = row['CODPOSENF']
data['commune'] = row['COMMUNE']
resp = api_call(url, data)
familles_adulte_count[code_famille] += 1
adultes[row['CODENF']] = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': code_famille,
'nom': row['NOMENF']
}
return adultes
def create_enfants():
enfants = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_ENFANT_SLUG
for row in get_rows():
if row['ADULTE'] == '1':
continue
data = init_human_data(row)
resp = api_call(url, data)
enfant = {
'technocarte_id': row['CODENF'],
'publik_id': str(resp.json()['data']['id']),
'technocarte_famille_id': row['CODFAM']
}
if row['CODPEREBIO']:
enfant['pere_technocarte_id'] = row['CODPEREBIO']
if row['CODMEREBIO']:
enfant['mere_technocarte_id'] = row['CODMEREBIO']
enfants[row['CODENF']] = enfant
return enfants
def create_familles(adultes, enfants):
familles_map = {}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
for adulte_technocarte_id, adulte in adultes.items():
technocarte_famille_id = adulte['technocarte_famille_id']
if technocarte_famille_id not in familles_map:
# création famille
data = {
'famille': adulte['nom'],
'adulte1': adulte['publik_id']
}
url = WCS_BASE_URL + 'cards/%s/submit' % CARD_FAMILLE_SLUG
resp = api_call(url, data)
familles_map[technocarte_famille_id] = str(resp.json()['data']['id'])
else:
url = WCS_BASE_URL + 'cards/%s/%s/' % (
CARD_FAMILLE_SLUG, familles_map[technocarte_famille_id]
)
card = api_call(url, method='get').json()
if card['fields']['adulte2']:
# gestion des doublons, s'il ya déjà un deuxième adulte, on zappe
continue
# ajouter comme deuxième adulte à la famille existante
data = {
'adulte2': adulte['publik_id']
}
if card['fields']['famille'] != adulte['nom']:
# les gens ne se marient plus
data['famille'] = "%s %s" % (card['fields']['famille'], adulte['nom'])
api_call(url, data)
def import_data():
adultes = create_adultes()
enfants = create_enfants()
create_familles(adultes, enfants)
def reset():
CardDef.get_by_urlname(CARD_ENFANT_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_ADULTE_SLUG).data_class().wipe()
CardDef.get_by_urlname(CARD_FAMILLE_SLUG).data_class().wipe()
if ACTION == 'import':
import_data()
elif ACTION == 'reset':
reset()