init depot GL
This commit is contained in:
commit
6fadd7eea7
|
@ -0,0 +1,10 @@
|
||||||
|
Metadata-Version: 1.0
|
||||||
|
Name: grandlyon-iodas
|
||||||
|
Version: 0.0.0
|
||||||
|
Summary: UNKNOWN
|
||||||
|
Home-page: http://example.net/
|
||||||
|
Author: Grand Lyon
|
||||||
|
Author-email: toto@example.net
|
||||||
|
License: UNKNOWN
|
||||||
|
Description: UNKNOWN
|
||||||
|
Platform: UNKNOWN
|
|
@ -0,0 +1,8 @@
|
||||||
|
README
|
||||||
|
setup.py
|
||||||
|
grandlyon_iodas/__init__.py
|
||||||
|
grandlyon_iodas/models.py
|
||||||
|
grandlyon_iodas.egg-info/PKG-INFO
|
||||||
|
grandlyon_iodas.egg-info/SOURCES.txt
|
||||||
|
grandlyon_iodas.egg-info/dependency_links.txt
|
||||||
|
grandlyon_iodas.egg-info/top_level.txt
|
|
@ -0,0 +1 @@
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
grandlyon_iodas
|
Binary file not shown.
|
@ -0,0 +1,32 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('base', '0008_auto_20180227_1502'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.CreateModel(
|
||||||
|
name='grandlyonIodas',
|
||||||
|
fields=[
|
||||||
|
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||||
|
('title', models.CharField(max_length=50, verbose_name='Title')),
|
||||||
|
('description', models.TextField(verbose_name='Description')),
|
||||||
|
('slug', models.SlugField(unique=True)),
|
||||||
|
('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])),
|
||||||
|
('token_url', models.URLField(max_length=256, verbose_name='Token URL')),
|
||||||
|
('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')),
|
||||||
|
('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')),
|
||||||
|
('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')),
|
||||||
|
('users', models.ManyToManyField(to='base.ApiUser', blank=True)),
|
||||||
|
],
|
||||||
|
options={
|
||||||
|
'verbose_name': 'Connecteur IODAS test',
|
||||||
|
},
|
||||||
|
),
|
||||||
|
]
|
Binary file not shown.
|
@ -0,0 +1,109 @@
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import hashlib
|
||||||
|
from suds.transport.http import HttpAuthenticated
|
||||||
|
from suds.client import Client
|
||||||
|
from suds.transport import Reply
|
||||||
|
import suds.sudsobject
|
||||||
|
|
||||||
|
from django.db import models
|
||||||
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
from django.core.cache import cache
|
||||||
|
|
||||||
|
from passerelle.base.models import BaseResource
|
||||||
|
from passerelle.utils.api import endpoint
|
||||||
|
from passerelle.utils.jsonresponse import APIError
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
import dateutil.relativedelta
|
||||||
|
|
||||||
|
def sudsobject_to_dict(sudsobject):
|
||||||
|
out = {}
|
||||||
|
for key, value in suds.sudsobject.asdict(sudsobject).iteritems():
|
||||||
|
if hasattr(value, '__keylist__'):
|
||||||
|
out[key] = sudsobject_to_dict(value)
|
||||||
|
elif isinstance(value, list):
|
||||||
|
out[key] = []
|
||||||
|
for item in value:
|
||||||
|
if hasattr(item, '__keylist__'):
|
||||||
|
out[key].append(sudsobject_to_dict(item))
|
||||||
|
else:
|
||||||
|
out[key].append(item)
|
||||||
|
else:
|
||||||
|
out[key] = value
|
||||||
|
return out
|
||||||
|
|
||||||
|
class grandlyonIodas(BaseResource):
|
||||||
|
token_url = models.URLField(_('Token URL'), max_length=256)
|
||||||
|
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
|
||||||
|
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
|
||||||
|
verify_cert = models.BooleanField(default=True,
|
||||||
|
verbose_name=_('Check HTTPS Certificate validity'))
|
||||||
|
|
||||||
|
category = _('Business Process Connectors')
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
verbose_name = 'Connecteur IODAS Grand Lyon'
|
||||||
|
|
||||||
|
def get_token(self, renew=False):
|
||||||
|
cache_key = 'iodas-%s-token' % self.id
|
||||||
|
if not renew:
|
||||||
|
token = cache.get(cache_key)
|
||||||
|
if token:
|
||||||
|
return token
|
||||||
|
headers = {'Authorization': 'Basic %s' % self.token_authorization}
|
||||||
|
resp = self.requests.post(self.token_url, headers=headers,
|
||||||
|
data={'grant_type': 'client_credentials'},
|
||||||
|
verify=self.verify_cert).json()
|
||||||
|
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
|
||||||
|
timeout = int(resp.get('expires_in'))
|
||||||
|
cache.set(cache_key, token, timeout)
|
||||||
|
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
|
||||||
|
return token
|
||||||
|
|
||||||
|
def get_client(self):
|
||||||
|
class Transport(HttpAuthenticated):
|
||||||
|
def __init__(self, instance):
|
||||||
|
self.instance = instance
|
||||||
|
HttpAuthenticated.__init__(self)
|
||||||
|
|
||||||
|
def send(self, request):
|
||||||
|
#request.message = request.message.replace("contentType", "xm:contentType")
|
||||||
|
request.headers['Authorization'] = self.instance.get_token()
|
||||||
|
resp = self.instance.requests.post(request.url, data=request.message,
|
||||||
|
headers=request.headers,
|
||||||
|
verify=self.instance.verify_cert)
|
||||||
|
if resp.status_code == 401:
|
||||||
|
# ask for a new token, and retry
|
||||||
|
request.headers['Authorization'] = self.instance.get_token(renew=True)
|
||||||
|
resp = self.instance.requests.post(request.url, data=request.message,
|
||||||
|
headers=request.headers,
|
||||||
|
verify=self.instance.verify_cert)
|
||||||
|
|
||||||
|
return Reply(resp.status_code, resp.headers, resp.content)
|
||||||
|
|
||||||
|
return Client(url=self.wsdl_url, transport=Transport(self))
|
||||||
|
|
||||||
|
@endpoint(perm='can_access')
|
||||||
|
def getProcedures(self, request, nom, pren, datenais, dpap, typepro):
|
||||||
|
# Params in the order required by the WSDL from stambia
|
||||||
|
resp = self.get_client().service.ODA_getProcedures(dpap, typepro, pren, nom, datenais)
|
||||||
|
data = sudsobject_to_dict(resp)
|
||||||
|
# Counts procedures to get the last procedure
|
||||||
|
nbProc = (len(data['procedures']['procedures']['procedure']) - 1) if 'procedures' in data else ''
|
||||||
|
# recupere la liste des droits en cours de toutes les procedures d'un individu
|
||||||
|
droits = []
|
||||||
|
if 'procedures' in data :
|
||||||
|
for procedure in data['procedures']['procedures']['procedure'] :
|
||||||
|
for etape in procedure['etapes']['etape'] :
|
||||||
|
if 'taches' in etape :
|
||||||
|
for tache in etape['taches']['tache'] :
|
||||||
|
if tache['idtypetache'] == 2 and datetime.strptime(tache['datearret'], "%d/%m/%Y") > datetime.now():
|
||||||
|
moisfin = 1 if (datetime.strptime(tache['datearret'], "%d/%m/%Y") - dateutil.relativedelta.relativedelta(months=1)) == datetime.now() else 0
|
||||||
|
droits.append({"libl":tache['produit']['libl'], "dateproposition":tache['dateproposition'], "datearret":tache['datearret'], "dateeffet":tache['dateeffet'], "moisfin":moisfin})
|
||||||
|
|
||||||
|
return {'hash': hashlib.sha224(json.dumps(data)).hexdigest(),
|
||||||
|
'libl': data['procedures']['procedures']['procedure'][nbProc]['libl'] if 'procedures' in data else '',
|
||||||
|
'etapes': sorted(data['procedures']['procedures']['procedure'][nbProc]['etapes']['etape'], key=lambda x: x['id'], reverse=True) if 'procedures' in data else '',
|
||||||
|
'droits': droits,
|
||||||
|
'found': 1 if 'procedures' in data else 0}
|
Binary file not shown.
Loading…
Reference in New Issue