passerelle-grandlyon-elyx/grandlyon_elyx/models.py

154 lines
7.5 KiB
Python

# coding: utf8
import base64
import requests
import json
import datetime
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from .formdata import FormData, CREATION_SCHEMA, list_schema_fields
class ParameterTypeError(Exception):
http_status = 400
log_error = False
class GrandlyonElyx(BaseResource):
token_url = models.URLField(_('Token URL'), max_length=256)
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
service_url = models.URLField('URL du service', max_length=256)
username = models.CharField(_('Username'), max_length=128)
password = models.CharField(_('Password'), max_length=128)
scope = models.CharField('Scope du service', max_length=128)
verify_cert = models.BooleanField(default=True,
verbose_name=_('Check HTTPS Certificate validity'))
category = _('Business Process Connectors')
class Meta:
verbose_name = 'Connecteur Elyx Grand Lyon'
def get_token(self, renew=False):
cache_key = 'elyx-%s-token' % self.id
if not renew:
token = cache.get(cache_key)
if token:
return token
headers = {'Authorization': 'Basic %s' % self.token_authorization}
resp = self.requests.post(self.token_url, headers=headers,
data={'grant_type': 'password', 'username': self.username, 'password': self.password, 'scope': self.scope},
verify=self.verify_cert).json()
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
timeout = int(resp.get('expires_in'))
cache.set(cache_key, token, timeout)
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
return token
def send_get(self, request):
request.headers['Authorization'] = self.get_token()
resp = self.requests.get(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.get_token(renew=True)
resp = self.requests.get(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
return resp
def send_post(self, request):
request.headers['Authorization'] = self.get_token()
resp = self.requests.post(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert,
timeout=60)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.get_token(renew=True)
resp = self.requests.post(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert,
timeout=60)
return resp
def none_to_str(self, input):
if input is None:
input = ''
return input
@endpoint()
def info(self, request):
return {'hello': datetime.datetime.now().strftime('%Y%m%d%H%M%S')}
@endpoint(perm='can_access', methods=['post'])
def create(self, request):
# get creation fields from payload
try:
formdata = FormData(json.loads(request.body), CREATION_SCHEMA)
except ValueError as e:
raise ParameterTypeError(e.message)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
liste_parcelles = ''
for numero in formdata.values['numero_parcelles']:
liste_parcelles += formdata.values['commune_parcelle_raw'] + formdata.values['section'].upper() + numero[0] + ','
# Par defaut si user non connecte
user_email = 'gnm@grandlyon.com'
if 'user_email' in formdata.values and formdata.values['user_email'] is not None:
user_email = formdata.values['user_email']
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "user_email:", user_email
send_request = requests.Request()
send_request.headers['Content-Type'] = 'application/json'
send_request.url = self.service_url+'/rr/Demandes?operation=envoi'
send_request.data = u'{"objet_json":"demandePortail","ID_REQ_SIG":"'+user_email+'_'+formdata.values['type_de_demande']+'_'+formdata.values['tracking_code']+'_'+timestamp+'",'
send_request.data += u'"ID_REQ_DEMANDE":"'+formdata.values['type_de_demande']+'-'+formdata.values['tracking_code']+'","TYPE":"'+formdata.values['type_de_demande']+'","DATE_DEM":"'+datetime.datetime.now().strftime('%d/%m/%Y')+'",'
send_request.data += u'"CIVILITE":"'+self.none_to_str(formdata.values['civilite_code'])+'","NOM":"'+self.none_to_str(formdata.values['nom'])+'","PRENOM":"'+self.none_to_str(formdata.values['prenom'])+'","ADRESSE":"'+self.none_to_str(formdata.values['numero_voie'])+' '+self.none_to_str(formdata.values['voie'])+'","BP":"'+self.none_to_str(formdata.values['boite_postale'])
send_request.data += u'","CP":"'+self.none_to_str(formdata.values['code_postal'])+'","VILLE":"'+self.none_to_str(formdata.values['commune'])+'",'
send_request.data += u'"REFERENCE":"'+self.none_to_str(formdata.values['reference'])+'","CLASSE_OBJETS":"ICPARCEL","LISTE_PARCELLES":"'+liste_parcelles+'","MAIL_DEMANDEUR":"'+user_email+'",'
if formdata.values['type_de_demande'] == 'RU':
send_request.data += u'"PLAN":true,'
send_request.data += u'"MODE_SYNTHETIQUE":true,'
send_request.data += u'"ADRESSE_DEFAUT":"",'
send_request.data += u'"ENV":"'+formdata.values['env']+'"}'
send_request.data = send_request.data.encode('utf-8')
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "send_request.data:", send_request.data
resp = self.send_post(send_request)
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "resp.content:", resp.content
return {'data': json.loads(resp.content)}
@endpoint(perm='can_access')
def status(self, request, iddemande):
file = None
send_request = requests.Request()
send_request.url = self.service_url+'/rr/Demandes?operation=suivi&id='+iddemande
resp = self.send_get(send_request)
data = json.loads(resp.content)
if data['EtatDemande'] == "DEMANDE_DISPO":
file_request = requests.Request()
#file_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rs/ReadResource?url='+data['Url']
file_request.url = self.service_url+'/rs/ReadResource?url='+data['Url']
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "file url:", file_request.url
file_resp = self.send_get(file_request)
file = {'b64_content': base64.b64encode(file_resp.content), 'filename': 'certificat.pdf'}
#print >> open('/home/grandlyon/log/elyx.pdf', 'a+'), datetime.datetime.now(), file_resp.content
return {'data': data, 'file': file}