passerelle-grandlyon-iodas/grandlyon_iodas/models.py

202 lines
8.4 KiB
Python

import hashlib
import json
from datetime import datetime
import dateutil.relativedelta
import requests
import suds.sudsobject
from django.core.cache import cache
from django.db import models
from django.utils.encoding import force_bytes, force_text
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from suds.client import Client
from suds.transport import Reply
from suds.transport.http import HttpAuthenticated
def sudsobject_to_dict(sudsobject):
out = {}
for key, value in suds.sudsobject.asdict(sudsobject).items():
if hasattr(value, '__keylist__'):
out[key] = sudsobject_to_dict(value)
elif isinstance(value, list):
out[key] = []
for item in value:
if hasattr(item, '__keylist__'):
out[key].append(sudsobject_to_dict(item))
else:
out[key].append(item)
else:
out[key] = value
return out
class GrandlyonIodas(BaseResource):
token_url = models.URLField(_('Token URL'), max_length=256)
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
category = _('Business Process Connectors')
class Meta:
verbose_name = 'Connecteur IODAS Grand Lyon'
def get_token(self, renew=False):
cache_key = 'iodas-%s-token' % self.id
if not renew:
token = cache.get(cache_key)
if token:
return token
headers = {'Authorization': 'Basic %s' % self.token_authorization}
resp = self.requests.post(
self.token_url,
headers=headers,
data={'grant_type': 'client_credentials'},
verify=self.verify_cert,
).json()
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
timeout = int(resp.get('expires_in'))
cache.set(cache_key, token, timeout)
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
return token
def get_client(self):
class Transport(HttpAuthenticated):
def __init__(self, instance):
self.instance = instance
HttpAuthenticated.__init__(self)
def send(self, request):
request.headers['Authorization'] = self.instance.get_token()
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.instance.get_token(renew=True)
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
)
return Reply(resp.status_code, resp.headers, resp.content)
return Client(url=self.wsdl_url, transport=Transport(self))
@endpoint(perm='can_access')
def getProcedures(self, request, nom, pren, datenais, dpap, typepro):
# Params in the order required by the WSDL from stambia
resp = self.get_client().service.ODA_getProceduresSIH(dpap, typepro, datenais, nom, pren)
data = sudsobject_to_dict(resp)
# Get the last procedure filtered by typepro
dataArray = data['procedures']['procedures']['procedure'] if 'procedures' in data else None
procToDisplay = (
next(
(
i
for i in reversed(range(len(dataArray)))
if str(dataArray[i]['idtypeproc']) in typepro.split(',')
),
len(dataArray) - 1,
)
if 'procedures' in data
else ''
)
# recupere la liste des droits en cours de toutes les procedures d'un individu
droits = []
if 'procedures' in data:
for procedure in data['procedures']['procedures']['procedure']:
for etape in procedure['etapes']['etape']:
if 'taches' in etape:
for tache in etape['taches']['tache']:
if (
tache['idtypetache'] == 2
and datetime.strptime(tache['datearret'], '%d/%m/%Y') > datetime.now()
and 'naturearret' not in tache
):
for nb in [3, 6]:
deltafindroit = (
nb
if (
datetime.strptime(tache['datearret'], '%d/%m/%Y')
- dateutil.relativedelta.relativedelta(months=nb)
)
== datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
else 0
)
droits.append(
{
'procedure_libl': procedure['libl'],
'libl': tache['produit']['libl'],
'dateproposition': tache['dateproposition'],
'datearret': tache['datearret'],
'dateeffet': tache['dateeffet'],
'deltafindroit': deltafindroit,
}
)
result = {
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'libl': '',
'datedepot': '',
'etapes': '',
'droits': droits,
'recevabilite': '',
'found': 0,
}
if 'procedures' not in data:
return result
procedure = data['procedures']['procedures']['procedure'][procToDisplay]
result.update(
{
'dpap': data['procedures'].get('dpap', ''),
'datenais': data['procedures'].get('datenais', ''),
'nom': data['procedures'].get('nom', ''),
'pren': data['procedures'].get('pren', ''),
'libl': procedure.get('libl', ''),
'datedepot': procedure.get('datedepot', ''),
'etapes': sorted(
procedure.get('etapes', {}).get('etape', []),
key=lambda x: x.get('id', 0),
reverse=True,
),
'recevabilite': procedure.get('recevabilite', ''),
'found': 1,
}
)
return result
@endpoint(perm='can_access')
def getProceduresPA(self, request, nom, pren, datenais, typepro):
# Params in the order required by the WSDL from stambia
resp = self.get_client().service.ODA_getProceduresPA(typepro, datenais, nom, pren)
data = sudsobject_to_dict(resp)
# Counts procedures to get the last procedure
# nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else ''
procedures = []
nb = 0
for proc in data['procedurespa']['procedurespa']['procedurepa']:
nb += 1
if (
nb < len(data['procedurespa']['procedurespa']['procedurepa'])
and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']
) or nb == len(data['procedurespa']['procedurespa']['procedurepa']):
test = proc['libl']
procedures.append(proc)
return {
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'procedures': procedures,
'found': 1 if 'procedurespa' in data else 0,
}