diff --git a/debian/control b/debian/control index 40262e2..272920e 100644 --- a/debian/control +++ b/debian/control @@ -2,7 +2,11 @@ Source: passerelle-grandlyon-iodas Maintainer: Frederic Peters Section: python Priority: optional -Build-Depends: debhelper-compat (= 12), dh-python, python3-setuptools, python3-all, python3-django +Build-Depends: debhelper-compat (= 12), + dh-python, + python3-all, + python3-django, + python3-setuptools, Standards-Version: 3.9.1 Package: python3-passerelle-grandlyon-iodas diff --git a/extra/scripts/checkDuplicate.py b/extra/scripts/checkDuplicate.py index 1d1594b..eec9b1c 100644 --- a/extra/scripts/checkDuplicate.py +++ b/extra/scripts/checkDuplicate.py @@ -1,7 +1,7 @@ """ Vérifie qu'il n'existe pas déja un formulaire soumis par l'utilisateur connecté ayant le meme dpap (numéro de dossier papier) """ -from wcs.qommon.storage import (Equal, NotEqual) +from wcs.qommon.storage import Equal, NotEqual # id du champ contenant le DPAP field_id = [x for x in form_objects.formdef.fields if x.varname == 'dpap'][0].id @@ -13,6 +13,15 @@ if form.user is None: else: # Si user connecté on check un eventuel doublon : False si doublon, True si pas doublon criterias = [Equal('user_id', str(form.user.id)), NotEqual('status', 'draft')] - noDuplicate = bool(len([x for x in form_objects.formdef.data_class().select(criterias) if x.data.get(field_id) == form_var_dpap]) == 0) + noDuplicate = bool( + len( + [ + x + for x in form_objects.formdef.data_class().select(criterias) + if x.data.get(field_id) == form_var_dpap + ] + ) + == 0 + ) result = noDuplicate diff --git a/grandlyon_iodas/migrations/0001_initial.py b/grandlyon_iodas/migrations/0001_initial.py index 1d3bef8..ef81702 100644 --- a/grandlyon_iodas/migrations/0001_initial.py +++ b/grandlyon_iodas/migrations/0001_initial.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - from django.db import migrations, models @@ -14,15 +11,37 @@ class Migration(migrations.Migration): migrations.CreateModel( name='GrandlyonIodas', fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ( + 'id', + models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True), + ), ('title', models.CharField(max_length=50, verbose_name='Title')), ('description', models.TextField(verbose_name='Description')), ('slug', models.SlugField(unique=True)), - ('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])), + ( + 'log_level', + models.CharField( + default=b'INFO', + max_length=10, + verbose_name='Log Level', + choices=[ + (b'NOTSET', b'NOTSET'), + (b'DEBUG', b'DEBUG'), + (b'INFO', b'INFO'), + (b'WARNING', b'WARNING'), + (b'ERROR', b'ERROR'), + (b'CRITICAL', b'CRITICAL'), + (b'FATAL', b'FATAL'), + ], + ), + ), ('token_url', models.URLField(max_length=256, verbose_name='Token URL')), ('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')), ('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')), - ('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')), + ( + 'verify_cert', + models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity'), + ), ('users', models.ManyToManyField(to='base.ApiUser', blank=True)), ], options={ diff --git a/grandlyon_iodas/migrations/0002_auto_20181119_1453.py b/grandlyon_iodas/migrations/0002_auto_20181119_1453.py index 167ccb2..68fec13 100644 --- a/grandlyon_iodas/migrations/0002_auto_20181119_1453.py +++ b/grandlyon_iodas/migrations/0002_auto_20181119_1453.py @@ -1,6 +1,4 @@ -# -*- coding: utf-8 -*- # Generated by Django 1.11.12 on 2018-11-19 13:53 -from __future__ import unicode_literals from django.db import migrations diff --git a/grandlyon_iodas/models.py b/grandlyon_iodas/models.py index 2577697..ee2f09d 100644 --- a/grandlyon_iodas/models.py +++ b/grandlyon_iodas/models.py @@ -1,22 +1,21 @@ -import requests -import json import hashlib -from suds.transport.http import HttpAuthenticated -from suds.client import Client -from suds.transport import Reply -import suds.sudsobject +import json +from datetime import datetime +import dateutil.relativedelta +import requests +import suds.sudsobject +from django.core.cache import cache from django.db import models from django.utils.encoding import force_bytes, force_text from django.utils.translation import ugettext_lazy as _ -from django.core.cache import cache - from passerelle.base.models import BaseResource from passerelle.utils.api import endpoint from passerelle.utils.jsonresponse import APIError +from suds.client import Client +from suds.transport import Reply +from suds.transport.http import HttpAuthenticated -from datetime import datetime -import dateutil.relativedelta def sudsobject_to_dict(sudsobject): out = {} @@ -38,9 +37,8 @@ def sudsobject_to_dict(sudsobject): class GrandlyonIodas(BaseResource): token_url = models.URLField(_('Token URL'), max_length=256) token_authorization = models.CharField(_('Token Authorization'), max_length=128) - wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file:// - verify_cert = models.BooleanField(default=True, - verbose_name=_('Check HTTPS Certificate validity')) + wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file:// + verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity')) category = _('Business Process Connectors') @@ -54,9 +52,12 @@ class GrandlyonIodas(BaseResource): if token: return token headers = {'Authorization': 'Basic %s' % self.token_authorization} - resp = self.requests.post(self.token_url, headers=headers, - data={'grant_type': 'client_credentials'}, - verify=self.verify_cert).json() + resp = self.requests.post( + self.token_url, + headers=headers, + data={'grant_type': 'client_credentials'}, + verify=self.verify_cert, + ).json() token = '%s %s' % (resp.get('token_type'), resp.get('access_token')) timeout = int(resp.get('expires_in')) cache.set(cache_key, token, timeout) @@ -71,15 +72,21 @@ class GrandlyonIodas(BaseResource): def send(self, request): request.headers['Authorization'] = self.instance.get_token() - resp = self.instance.requests.post(request.url, data=request.message, - headers=request.headers, - verify=self.instance.verify_cert) + resp = self.instance.requests.post( + request.url, + data=request.message, + headers=request.headers, + verify=self.instance.verify_cert, + ) if resp.status_code == 401: # ask for a new token, and retry request.headers['Authorization'] = self.instance.get_token(renew=True) - resp = self.instance.requests.post(request.url, data=request.message, - headers=request.headers, - verify=self.instance.verify_cert) + resp = self.instance.requests.post( + request.url, + data=request.message, + headers=request.headers, + verify=self.instance.verify_cert, + ) return Reply(resp.status_code, resp.headers, resp.content) @@ -89,42 +96,79 @@ class GrandlyonIodas(BaseResource): def getProcedures(self, request, nom, pren, datenais, dpap, typepro): # Params in the order required by the WSDL from stambia resp = self.get_client().service.ODA_getProceduresSIH(dpap, typepro, datenais, nom, pren) - data = sudsobject_to_dict(resp) + data = sudsobject_to_dict(resp) # Counts procedures to get the last procedure nbProc = (len(data['procedures']['procedures']['procedure']) - 1) if 'procedures' in data else '' # recupere la liste des droits en cours de toutes les procedures d'un individu droits = [] - if 'procedures' in data : - for procedure in data['procedures']['procedures']['procedure'] : - for etape in procedure['etapes']['etape'] : - if 'taches' in etape : - for tache in etape['taches']['tache'] : - if tache['idtypetache'] == 2 and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now(): - for nb in [3,6] : - deltafindroit = nb if (datetime.strptime(tache['datearret'], "%d/%m/%Y") - dateutil.relativedelta.relativedelta(months=nb)) == datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) else 0 - droits.append({"libl":tache['produit']['libl'], "dateproposition":tache['dateproposition'], "datearret":tache['datearret'], "dateeffet":tache['dateeffet'], "deltafindroit":deltafindroit}) + if 'procedures' in data: + for procedure in data['procedures']['procedures']['procedure']: + for etape in procedure['etapes']['etape']: + if 'taches' in etape: + for tache in etape['taches']['tache']: + if ( + tache['idtypetache'] == 2 + and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now() + ): + for nb in [3, 6]: + deltafindroit = ( + nb + if ( + datetime.strptime(tache['datearret'], "%d/%m/%Y") + - dateutil.relativedelta.relativedelta(months=nb) + ) + == datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + else 0 + ) + droits.append( + { + "libl": tache['produit']['libl'], + "dateproposition": tache['dateproposition'], + "datearret": tache['datearret'], + "dateeffet": tache['dateeffet'], + "deltafindroit": deltafindroit, + } + ) - return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()), - 'libl': data['procedures']['procedures']['procedure'][nbProc]['libl'] if 'procedures' in data else '', - 'etapes': sorted(data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'], key=lambda x: x['id'], reverse=True) if 'procedures' in data else '', - 'droits': droits, - 'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite'] if 'procedures' in data and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc] else '', - 'found': 1 if 'procedures' in data else 0} + return { + 'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()), + 'libl': data['procedures']['procedures']['procedure'][nbProc]['libl'] + if 'procedures' in data + else '', + 'etapes': sorted( + data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'], + key=lambda x: x['id'], + reverse=True, + ) + if 'procedures' in data + else '', + 'droits': droits, + 'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite'] + if 'procedures' in data + and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc] + else '', + 'found': 1 if 'procedures' in data else 0, + } @endpoint(perm='can_access') def getProceduresPA(self, request, nom, pren, datenais, typepro): # Params in the order required by the WSDL from stambia resp = self.get_client().service.ODA_getProceduresPA(typepro, datenais, nom, pren) - data = sudsobject_to_dict(resp) + data = sudsobject_to_dict(resp) # Counts procedures to get the last procedure - #nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else '' - procedures = [] + # nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else '' + procedures = [] nb = 0 - for proc in data['procedurespa']['procedurespa']['procedurepa'] : + for proc in data['procedurespa']['procedurespa']['procedurepa']: nb += 1 - if (nb < len(data['procedurespa']['procedurespa']['procedurepa']) and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']) or nb == len(data['procedurespa']['procedurespa']['procedurepa']) : + if ( + nb < len(data['procedurespa']['procedurespa']['procedurepa']) + and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl'] + ) or nb == len(data['procedurespa']['procedurespa']['procedurepa']): test = proc['libl'] procedures.append(proc) - return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()), - 'procedures': procedures, - 'found': 1 if 'procedurespa' in data else 0} + return { + 'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()), + 'procedures': procedures, + 'found': 1 if 'procedurespa' in data else 0, + } diff --git a/setup.py b/setup.py index fb983ad..8ecb983 100644 --- a/setup.py +++ b/setup.py @@ -2,12 +2,12 @@ import os import subprocess +from distutils.cmd import Command +from distutils.command.build import build as _build +from setuptools import find_packages, setup from setuptools.command.install_lib import install_lib as _install_lib from setuptools.command.sdist import sdist -from distutils.command.build import build as _build -from distutils.cmd import Command -from setuptools import setup, find_packages class eo_sdist(sdist): @@ -25,10 +25,10 @@ class eo_sdist(sdist): def get_version(): '''Use the VERSION, if absent generates a version with git describe, if not - tag exists, take 0.0- and add the length of the commit log. + tag exists, take 0.0- and add the length of the commit log. ''' if os.path.exists('VERSION'): - with open('VERSION', 'r') as v: + with open('VERSION') as v: return v.read() if os.path.exists('.git'): p = subprocess.Popen( @@ -59,5 +59,5 @@ setup( packages=find_packages(), cmdclass={ 'sdist': eo_sdist, - } + }, )