trivial: apply pre-commit hooks

This commit is contained in:
Frédéric Péters 2022-06-29 12:10:33 +02:00
parent d50a590ae3
commit eace7885ba
6 changed files with 137 additions and 63 deletions

6
debian/control vendored
View File

@ -2,7 +2,11 @@ Source: passerelle-grandlyon-iodas
Maintainer: Frederic Peters <fpeters@entrouvert.com>
Section: python
Priority: optional
Build-Depends: debhelper-compat (= 12), dh-python, python3-setuptools, python3-all, python3-django
Build-Depends: debhelper-compat (= 12),
dh-python,
python3-all,
python3-django,
python3-setuptools,
Standards-Version: 3.9.1
Package: python3-passerelle-grandlyon-iodas

View File

@ -1,7 +1,7 @@
"""
Vérifie qu'il n'existe pas déja un formulaire soumis par l'utilisateur connecté ayant le meme dpap (numéro de dossier papier)
"""
from wcs.qommon.storage import (Equal, NotEqual)
from wcs.qommon.storage import Equal, NotEqual
# id du champ contenant le DPAP
field_id = [x for x in form_objects.formdef.fields if x.varname == 'dpap'][0].id
@ -13,6 +13,15 @@ if form.user is None:
else:
# Si user connecté on check un eventuel doublon : False si doublon, True si pas doublon
criterias = [Equal('user_id', str(form.user.id)), NotEqual('status', 'draft')]
noDuplicate = bool(len([x for x in form_objects.formdef.data_class().select(criterias) if x.data.get(field_id) == form_var_dpap]) == 0)
noDuplicate = bool(
len(
[
x
for x in form_objects.formdef.data_class().select(criterias)
if x.data.get(field_id) == form_var_dpap
]
)
== 0
)
result = noDuplicate

View File

@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
@ -14,15 +11,37 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='GrandlyonIodas',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('title', models.CharField(max_length=50, verbose_name='Title')),
('description', models.TextField(verbose_name='Description')),
('slug', models.SlugField(unique=True)),
('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])),
(
'log_level',
models.CharField(
default=b'INFO',
max_length=10,
verbose_name='Log Level',
choices=[
(b'NOTSET', b'NOTSET'),
(b'DEBUG', b'DEBUG'),
(b'INFO', b'INFO'),
(b'WARNING', b'WARNING'),
(b'ERROR', b'ERROR'),
(b'CRITICAL', b'CRITICAL'),
(b'FATAL', b'FATAL'),
],
),
),
('token_url', models.URLField(max_length=256, verbose_name='Token URL')),
('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')),
('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')),
('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')),
(
'verify_cert',
models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity'),
),
('users', models.ManyToManyField(to='base.ApiUser', blank=True)),
],
options={

View File

@ -1,6 +1,4 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.12 on 2018-11-19 13:53
from __future__ import unicode_literals
from django.db import migrations

View File

@ -1,22 +1,21 @@
import requests
import json
import hashlib
from suds.transport.http import HttpAuthenticated
from suds.client import Client
from suds.transport import Reply
import suds.sudsobject
import json
from datetime import datetime
import dateutil.relativedelta
import requests
import suds.sudsobject
from django.core.cache import cache
from django.db import models
from django.utils.encoding import force_bytes, force_text
from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from suds.client import Client
from suds.transport import Reply
from suds.transport.http import HttpAuthenticated
from datetime import datetime
import dateutil.relativedelta
def sudsobject_to_dict(sudsobject):
out = {}
@ -38,9 +37,8 @@ def sudsobject_to_dict(sudsobject):
class GrandlyonIodas(BaseResource):
token_url = models.URLField(_('Token URL'), max_length=256)
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
verify_cert = models.BooleanField(default=True,
verbose_name=_('Check HTTPS Certificate validity'))
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
category = _('Business Process Connectors')
@ -54,9 +52,12 @@ class GrandlyonIodas(BaseResource):
if token:
return token
headers = {'Authorization': 'Basic %s' % self.token_authorization}
resp = self.requests.post(self.token_url, headers=headers,
data={'grant_type': 'client_credentials'},
verify=self.verify_cert).json()
resp = self.requests.post(
self.token_url,
headers=headers,
data={'grant_type': 'client_credentials'},
verify=self.verify_cert,
).json()
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
timeout = int(resp.get('expires_in'))
cache.set(cache_key, token, timeout)
@ -71,15 +72,21 @@ class GrandlyonIodas(BaseResource):
def send(self, request):
request.headers['Authorization'] = self.instance.get_token()
resp = self.instance.requests.post(request.url, data=request.message,
headers=request.headers,
verify=self.instance.verify_cert)
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.instance.get_token(renew=True)
resp = self.instance.requests.post(request.url, data=request.message,
headers=request.headers,
verify=self.instance.verify_cert)
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
)
return Reply(resp.status_code, resp.headers, resp.content)
@ -89,42 +96,79 @@ class GrandlyonIodas(BaseResource):
def getProcedures(self, request, nom, pren, datenais, dpap, typepro):
# Params in the order required by the WSDL from stambia
resp = self.get_client().service.ODA_getProceduresSIH(dpap, typepro, datenais, nom, pren)
data = sudsobject_to_dict(resp)
data = sudsobject_to_dict(resp)
# Counts procedures to get the last procedure
nbProc = (len(data['procedures']['procedures']['procedure']) - 1) if 'procedures' in data else ''
# recupere la liste des droits en cours de toutes les procedures d'un individu
droits = []
if 'procedures' in data :
for procedure in data['procedures']['procedures']['procedure'] :
for etape in procedure['etapes']['etape'] :
if 'taches' in etape :
for tache in etape['taches']['tache'] :
if tache['idtypetache'] == 2 and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now():
for nb in [3,6] :
deltafindroit = nb if (datetime.strptime(tache['datearret'], "%d/%m/%Y") - dateutil.relativedelta.relativedelta(months=nb)) == datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) else 0
droits.append({"libl":tache['produit']['libl'], "dateproposition":tache['dateproposition'], "datearret":tache['datearret'], "dateeffet":tache['dateeffet'], "deltafindroit":deltafindroit})
if 'procedures' in data:
for procedure in data['procedures']['procedures']['procedure']:
for etape in procedure['etapes']['etape']:
if 'taches' in etape:
for tache in etape['taches']['tache']:
if (
tache['idtypetache'] == 2
and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now()
):
for nb in [3, 6]:
deltafindroit = (
nb
if (
datetime.strptime(tache['datearret'], "%d/%m/%Y")
- dateutil.relativedelta.relativedelta(months=nb)
)
== datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
else 0
)
droits.append(
{
"libl": tache['produit']['libl'],
"dateproposition": tache['dateproposition'],
"datearret": tache['datearret'],
"dateeffet": tache['dateeffet'],
"deltafindroit": deltafindroit,
}
)
return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'libl': data['procedures']['procedures']['procedure'][nbProc]['libl'] if 'procedures' in data else '',
'etapes': sorted(data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'], key=lambda x: x['id'], reverse=True) if 'procedures' in data else '',
'droits': droits,
'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite'] if 'procedures' in data and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc] else '',
'found': 1 if 'procedures' in data else 0}
return {
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'libl': data['procedures']['procedures']['procedure'][nbProc]['libl']
if 'procedures' in data
else '',
'etapes': sorted(
data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'],
key=lambda x: x['id'],
reverse=True,
)
if 'procedures' in data
else '',
'droits': droits,
'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite']
if 'procedures' in data
and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc]
else '',
'found': 1 if 'procedures' in data else 0,
}
@endpoint(perm='can_access')
def getProceduresPA(self, request, nom, pren, datenais, typepro):
# Params in the order required by the WSDL from stambia
resp = self.get_client().service.ODA_getProceduresPA(typepro, datenais, nom, pren)
data = sudsobject_to_dict(resp)
data = sudsobject_to_dict(resp)
# Counts procedures to get the last procedure
#nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else ''
procedures = []
# nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else ''
procedures = []
nb = 0
for proc in data['procedurespa']['procedurespa']['procedurepa'] :
for proc in data['procedurespa']['procedurespa']['procedurepa']:
nb += 1
if (nb < len(data['procedurespa']['procedurespa']['procedurepa']) and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']) or nb == len(data['procedurespa']['procedurespa']['procedurepa']) :
if (
nb < len(data['procedurespa']['procedurespa']['procedurepa'])
and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']
) or nb == len(data['procedurespa']['procedurespa']['procedurepa']):
test = proc['libl']
procedures.append(proc)
return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'procedures': procedures,
'found': 1 if 'procedurespa' in data else 0}
return {
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
'procedures': procedures,
'found': 1 if 'procedurespa' in data else 0,
}

View File

@ -2,12 +2,12 @@
import os
import subprocess
from distutils.cmd import Command
from distutils.command.build import build as _build
from setuptools import find_packages, setup
from setuptools.command.install_lib import install_lib as _install_lib
from setuptools.command.sdist import sdist
from distutils.command.build import build as _build
from distutils.cmd import Command
from setuptools import setup, find_packages
class eo_sdist(sdist):
@ -25,10 +25,10 @@ class eo_sdist(sdist):
def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0- and add the length of the commit log.
tag exists, take 0.0- and add the length of the commit log.
'''
if os.path.exists('VERSION'):
with open('VERSION', 'r') as v:
with open('VERSION') as v:
return v.read()
if os.path.exists('.git'):
p = subprocess.Popen(
@ -59,5 +59,5 @@ setup(
packages=find_packages(),
cmdclass={
'sdist': eo_sdist,
}
},
)