trivial: apply pre-commit hooks
This commit is contained in:
parent
d50a590ae3
commit
eace7885ba
|
@ -2,7 +2,11 @@ Source: passerelle-grandlyon-iodas
|
|||
Maintainer: Frederic Peters <fpeters@entrouvert.com>
|
||||
Section: python
|
||||
Priority: optional
|
||||
Build-Depends: debhelper-compat (= 12), dh-python, python3-setuptools, python3-all, python3-django
|
||||
Build-Depends: debhelper-compat (= 12),
|
||||
dh-python,
|
||||
python3-all,
|
||||
python3-django,
|
||||
python3-setuptools,
|
||||
Standards-Version: 3.9.1
|
||||
|
||||
Package: python3-passerelle-grandlyon-iodas
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
"""
|
||||
Vérifie qu'il n'existe pas déja un formulaire soumis par l'utilisateur connecté ayant le meme dpap (numéro de dossier papier)
|
||||
"""
|
||||
from wcs.qommon.storage import (Equal, NotEqual)
|
||||
from wcs.qommon.storage import Equal, NotEqual
|
||||
|
||||
# id du champ contenant le DPAP
|
||||
field_id = [x for x in form_objects.formdef.fields if x.varname == 'dpap'][0].id
|
||||
|
@ -13,6 +13,15 @@ if form.user is None:
|
|||
else:
|
||||
# Si user connecté on check un eventuel doublon : False si doublon, True si pas doublon
|
||||
criterias = [Equal('user_id', str(form.user.id)), NotEqual('status', 'draft')]
|
||||
noDuplicate = bool(len([x for x in form_objects.formdef.data_class().select(criterias) if x.data.get(field_id) == form_var_dpap]) == 0)
|
||||
noDuplicate = bool(
|
||||
len(
|
||||
[
|
||||
x
|
||||
for x in form_objects.formdef.data_class().select(criterias)
|
||||
if x.data.get(field_id) == form_var_dpap
|
||||
]
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
result = noDuplicate
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
|
@ -14,15 +11,37 @@ class Migration(migrations.Migration):
|
|||
migrations.CreateModel(
|
||||
name='GrandlyonIodas',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
(
|
||||
'id',
|
||||
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
|
||||
),
|
||||
('title', models.CharField(max_length=50, verbose_name='Title')),
|
||||
('description', models.TextField(verbose_name='Description')),
|
||||
('slug', models.SlugField(unique=True)),
|
||||
('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])),
|
||||
(
|
||||
'log_level',
|
||||
models.CharField(
|
||||
default=b'INFO',
|
||||
max_length=10,
|
||||
verbose_name='Log Level',
|
||||
choices=[
|
||||
(b'NOTSET', b'NOTSET'),
|
||||
(b'DEBUG', b'DEBUG'),
|
||||
(b'INFO', b'INFO'),
|
||||
(b'WARNING', b'WARNING'),
|
||||
(b'ERROR', b'ERROR'),
|
||||
(b'CRITICAL', b'CRITICAL'),
|
||||
(b'FATAL', b'FATAL'),
|
||||
],
|
||||
),
|
||||
),
|
||||
('token_url', models.URLField(max_length=256, verbose_name='Token URL')),
|
||||
('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')),
|
||||
('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')),
|
||||
('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')),
|
||||
(
|
||||
'verify_cert',
|
||||
models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity'),
|
||||
),
|
||||
('users', models.ManyToManyField(to='base.ApiUser', blank=True)),
|
||||
],
|
||||
options={
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.12 on 2018-11-19 13:53
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
|
|
@ -1,22 +1,21 @@
|
|||
import requests
|
||||
import json
|
||||
import hashlib
|
||||
from suds.transport.http import HttpAuthenticated
|
||||
from suds.client import Client
|
||||
from suds.transport import Reply
|
||||
import suds.sudsobject
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
import dateutil.relativedelta
|
||||
import requests
|
||||
import suds.sudsobject
|
||||
from django.core.cache import cache
|
||||
from django.db import models
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.core.cache import cache
|
||||
|
||||
from passerelle.base.models import BaseResource
|
||||
from passerelle.utils.api import endpoint
|
||||
from passerelle.utils.jsonresponse import APIError
|
||||
from suds.client import Client
|
||||
from suds.transport import Reply
|
||||
from suds.transport.http import HttpAuthenticated
|
||||
|
||||
from datetime import datetime
|
||||
import dateutil.relativedelta
|
||||
|
||||
def sudsobject_to_dict(sudsobject):
|
||||
out = {}
|
||||
|
@ -38,9 +37,8 @@ def sudsobject_to_dict(sudsobject):
|
|||
class GrandlyonIodas(BaseResource):
|
||||
token_url = models.URLField(_('Token URL'), max_length=256)
|
||||
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
|
||||
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
|
||||
verify_cert = models.BooleanField(default=True,
|
||||
verbose_name=_('Check HTTPS Certificate validity'))
|
||||
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
|
||||
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
|
||||
|
||||
category = _('Business Process Connectors')
|
||||
|
||||
|
@ -54,9 +52,12 @@ class GrandlyonIodas(BaseResource):
|
|||
if token:
|
||||
return token
|
||||
headers = {'Authorization': 'Basic %s' % self.token_authorization}
|
||||
resp = self.requests.post(self.token_url, headers=headers,
|
||||
data={'grant_type': 'client_credentials'},
|
||||
verify=self.verify_cert).json()
|
||||
resp = self.requests.post(
|
||||
self.token_url,
|
||||
headers=headers,
|
||||
data={'grant_type': 'client_credentials'},
|
||||
verify=self.verify_cert,
|
||||
).json()
|
||||
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
|
||||
timeout = int(resp.get('expires_in'))
|
||||
cache.set(cache_key, token, timeout)
|
||||
|
@ -71,15 +72,21 @@ class GrandlyonIodas(BaseResource):
|
|||
|
||||
def send(self, request):
|
||||
request.headers['Authorization'] = self.instance.get_token()
|
||||
resp = self.instance.requests.post(request.url, data=request.message,
|
||||
headers=request.headers,
|
||||
verify=self.instance.verify_cert)
|
||||
resp = self.instance.requests.post(
|
||||
request.url,
|
||||
data=request.message,
|
||||
headers=request.headers,
|
||||
verify=self.instance.verify_cert,
|
||||
)
|
||||
if resp.status_code == 401:
|
||||
# ask for a new token, and retry
|
||||
request.headers['Authorization'] = self.instance.get_token(renew=True)
|
||||
resp = self.instance.requests.post(request.url, data=request.message,
|
||||
headers=request.headers,
|
||||
verify=self.instance.verify_cert)
|
||||
resp = self.instance.requests.post(
|
||||
request.url,
|
||||
data=request.message,
|
||||
headers=request.headers,
|
||||
verify=self.instance.verify_cert,
|
||||
)
|
||||
|
||||
return Reply(resp.status_code, resp.headers, resp.content)
|
||||
|
||||
|
@ -89,42 +96,79 @@ class GrandlyonIodas(BaseResource):
|
|||
def getProcedures(self, request, nom, pren, datenais, dpap, typepro):
|
||||
# Params in the order required by the WSDL from stambia
|
||||
resp = self.get_client().service.ODA_getProceduresSIH(dpap, typepro, datenais, nom, pren)
|
||||
data = sudsobject_to_dict(resp)
|
||||
data = sudsobject_to_dict(resp)
|
||||
# Counts procedures to get the last procedure
|
||||
nbProc = (len(data['procedures']['procedures']['procedure']) - 1) if 'procedures' in data else ''
|
||||
# recupere la liste des droits en cours de toutes les procedures d'un individu
|
||||
droits = []
|
||||
if 'procedures' in data :
|
||||
for procedure in data['procedures']['procedures']['procedure'] :
|
||||
for etape in procedure['etapes']['etape'] :
|
||||
if 'taches' in etape :
|
||||
for tache in etape['taches']['tache'] :
|
||||
if tache['idtypetache'] == 2 and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now():
|
||||
for nb in [3,6] :
|
||||
deltafindroit = nb if (datetime.strptime(tache['datearret'], "%d/%m/%Y") - dateutil.relativedelta.relativedelta(months=nb)) == datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) else 0
|
||||
droits.append({"libl":tache['produit']['libl'], "dateproposition":tache['dateproposition'], "datearret":tache['datearret'], "dateeffet":tache['dateeffet'], "deltafindroit":deltafindroit})
|
||||
if 'procedures' in data:
|
||||
for procedure in data['procedures']['procedures']['procedure']:
|
||||
for etape in procedure['etapes']['etape']:
|
||||
if 'taches' in etape:
|
||||
for tache in etape['taches']['tache']:
|
||||
if (
|
||||
tache['idtypetache'] == 2
|
||||
and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now()
|
||||
):
|
||||
for nb in [3, 6]:
|
||||
deltafindroit = (
|
||||
nb
|
||||
if (
|
||||
datetime.strptime(tache['datearret'], "%d/%m/%Y")
|
||||
- dateutil.relativedelta.relativedelta(months=nb)
|
||||
)
|
||||
== datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
else 0
|
||||
)
|
||||
droits.append(
|
||||
{
|
||||
"libl": tache['produit']['libl'],
|
||||
"dateproposition": tache['dateproposition'],
|
||||
"datearret": tache['datearret'],
|
||||
"dateeffet": tache['dateeffet'],
|
||||
"deltafindroit": deltafindroit,
|
||||
}
|
||||
)
|
||||
|
||||
return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
|
||||
'libl': data['procedures']['procedures']['procedure'][nbProc]['libl'] if 'procedures' in data else '',
|
||||
'etapes': sorted(data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'], key=lambda x: x['id'], reverse=True) if 'procedures' in data else '',
|
||||
'droits': droits,
|
||||
'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite'] if 'procedures' in data and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc] else '',
|
||||
'found': 1 if 'procedures' in data else 0}
|
||||
return {
|
||||
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
|
||||
'libl': data['procedures']['procedures']['procedure'][nbProc]['libl']
|
||||
if 'procedures' in data
|
||||
else '',
|
||||
'etapes': sorted(
|
||||
data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'],
|
||||
key=lambda x: x['id'],
|
||||
reverse=True,
|
||||
)
|
||||
if 'procedures' in data
|
||||
else '',
|
||||
'droits': droits,
|
||||
'recevabilite': data['procedures']['procedures']['procedure'][nbProc]['recevabilite']
|
||||
if 'procedures' in data
|
||||
and 'recevabilite' in data['procedures']['procedures']['procedure'][nbProc]
|
||||
else '',
|
||||
'found': 1 if 'procedures' in data else 0,
|
||||
}
|
||||
|
||||
@endpoint(perm='can_access')
|
||||
def getProceduresPA(self, request, nom, pren, datenais, typepro):
|
||||
# Params in the order required by the WSDL from stambia
|
||||
resp = self.get_client().service.ODA_getProceduresPA(typepro, datenais, nom, pren)
|
||||
data = sudsobject_to_dict(resp)
|
||||
data = sudsobject_to_dict(resp)
|
||||
# Counts procedures to get the last procedure
|
||||
#nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else ''
|
||||
procedures = []
|
||||
# nbProc = (len(data['procedurespa']['procedurespa']['procedurepa']) - 1) if 'procedurespa' in data else ''
|
||||
procedures = []
|
||||
nb = 0
|
||||
for proc in data['procedurespa']['procedurespa']['procedurepa'] :
|
||||
for proc in data['procedurespa']['procedurespa']['procedurepa']:
|
||||
nb += 1
|
||||
if (nb < len(data['procedurespa']['procedurespa']['procedurepa']) and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']) or nb == len(data['procedurespa']['procedurespa']['procedurepa']) :
|
||||
if (
|
||||
nb < len(data['procedurespa']['procedurespa']['procedurepa'])
|
||||
and proc['libl'] != data['procedurespa']['procedurespa']['procedurepa'][nb]['libl']
|
||||
) or nb == len(data['procedurespa']['procedurespa']['procedurepa']):
|
||||
test = proc['libl']
|
||||
procedures.append(proc)
|
||||
return {'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
|
||||
'procedures': procedures,
|
||||
'found': 1 if 'procedurespa' in data else 0}
|
||||
return {
|
||||
'hash': force_text(hashlib.sha224(force_bytes(json.dumps(data))).hexdigest()),
|
||||
'procedures': procedures,
|
||||
'found': 1 if 'procedurespa' in data else 0,
|
||||
}
|
||||
|
|
12
setup.py
12
setup.py
|
@ -2,12 +2,12 @@
|
|||
|
||||
import os
|
||||
import subprocess
|
||||
from distutils.cmd import Command
|
||||
from distutils.command.build import build as _build
|
||||
|
||||
from setuptools import find_packages, setup
|
||||
from setuptools.command.install_lib import install_lib as _install_lib
|
||||
from setuptools.command.sdist import sdist
|
||||
from distutils.command.build import build as _build
|
||||
from distutils.cmd import Command
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
class eo_sdist(sdist):
|
||||
|
@ -25,10 +25,10 @@ class eo_sdist(sdist):
|
|||
|
||||
def get_version():
|
||||
'''Use the VERSION, if absent generates a version with git describe, if not
|
||||
tag exists, take 0.0- and add the length of the commit log.
|
||||
tag exists, take 0.0- and add the length of the commit log.
|
||||
'''
|
||||
if os.path.exists('VERSION'):
|
||||
with open('VERSION', 'r') as v:
|
||||
with open('VERSION') as v:
|
||||
return v.read()
|
||||
if os.path.exists('.git'):
|
||||
p = subprocess.Popen(
|
||||
|
@ -59,5 +59,5 @@ setup(
|
|||
packages=find_packages(),
|
||||
cmdclass={
|
||||
'sdist': eo_sdist,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue