154 lines
7.4 KiB
Python
154 lines
7.4 KiB
Python
# coding: utf8
|
|
|
|
import base64
|
|
import requests
|
|
import json
|
|
import datetime
|
|
|
|
from django.db import models
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.core.cache import cache
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
|
|
from .formdata import FormData, CREATION_SCHEMA, list_schema_fields
|
|
|
|
class ParameterTypeError(Exception):
|
|
http_status = 400
|
|
log_error = False
|
|
|
|
|
|
class GrandlyonElyx(BaseResource):
|
|
token_url = models.URLField(_('Token URL'), max_length=256)
|
|
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
|
|
service_url = models.URLField('URL du service', max_length=256)
|
|
username = models.CharField(_('Username'), max_length=128)
|
|
password = models.CharField(_('Password'), max_length=128)
|
|
scope = models.CharField('Scope du service', max_length=128)
|
|
verify_cert = models.BooleanField(default=True,
|
|
verbose_name=_('Check HTTPS Certificate validity'))
|
|
|
|
category = _('Business Process Connectors')
|
|
|
|
class Meta:
|
|
verbose_name = 'Connecteur Elyx Grand Lyon'
|
|
|
|
def get_token(self, renew=False):
|
|
cache_key = 'elyx-%s-token' % self.id
|
|
if not renew:
|
|
token = cache.get(cache_key)
|
|
if token:
|
|
return token
|
|
headers = {'Authorization': 'Basic %s' % self.token_authorization}
|
|
resp = self.requests.post(self.token_url, headers=headers,
|
|
data={'grant_type': 'password', 'username': self.username, 'password': self.password, 'scope': self.scope},
|
|
verify=self.verify_cert).json()
|
|
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
|
|
timeout = int(resp.get('expires_in'))
|
|
cache.set(cache_key, token, timeout)
|
|
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
|
|
return token
|
|
|
|
def send_get(self, request):
|
|
request.headers['Authorization'] = self.get_token()
|
|
resp = self.requests.get(request.url, data=request.data,
|
|
headers=request.headers,
|
|
verify=self.verify_cert)
|
|
if resp.status_code == 401:
|
|
# ask for a new token, and retry
|
|
request.headers['Authorization'] = self.get_token(renew=True)
|
|
resp = self.instance.requests.get(request.url, data=request.data,
|
|
headers=request.headers,
|
|
verify=self.verify_cert)
|
|
return resp
|
|
|
|
def send_post(self, request):
|
|
request.headers['Authorization'] = self.get_token()
|
|
resp = self.requests.post(request.url, data=request.data,
|
|
headers=request.headers,
|
|
verify=self.verify_cert)
|
|
if resp.status_code == 401:
|
|
# ask for a new token, and retry
|
|
request.headers['Authorization'] = self.get_token(renew=True)
|
|
resp = self.requests.post(request.url, data=request.data,
|
|
headers=request.headers,
|
|
verify=self.verify_cert)
|
|
return resp
|
|
|
|
def none_to_str(self, input):
|
|
if input is None:
|
|
input = ''
|
|
return input
|
|
|
|
@endpoint()
|
|
def info(self, request):
|
|
return {'hello': datetime.datetime.now().strftime('%Y%m%d%H%M%S')}
|
|
|
|
@endpoint(perm='can_access', methods=['post'])
|
|
def create(self, request):
|
|
# get creation fields from payload
|
|
try:
|
|
formdata = FormData(json.loads(request.body), CREATION_SCHEMA)
|
|
except ValueError as e:
|
|
raise ParameterTypeError(e.message)
|
|
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
|
|
|
|
liste_parcelles = ''
|
|
for numero in formdata.values['numero_parcelles']:
|
|
liste_parcelles += formdata.values['commune_parcelle_raw'] + formdata.values['section'] + numero[0] + ','
|
|
|
|
# Par defaut si user non connecte
|
|
user_email = 'gnm@grandlyon.com'
|
|
if 'user_email' in formdata.values:
|
|
user_email = formdata.values['user_email']
|
|
|
|
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "user_email:", user_email
|
|
|
|
send_request = requests.Request()
|
|
send_request.headers['Content-Type'] = 'application/json'
|
|
send_request.url = self.service_url+'/rr/Demandes?operation=envoi'
|
|
send_request.data = u'{"objet_json":"demandePortail","ID_REQ_SIG":"'+user_email+'_'+timestamp+'",'
|
|
send_request.data += u'"ID_REQ_DEMANDE":"'+formdata.values['type_de_demande']+'-'+formdata.values['tracking_code']+'","TYPE":"'+formdata.values['type_de_demande']+'","DATE_DEM":"'+datetime.datetime.now().strftime('%d/%m/%Y')+'",'
|
|
send_request.data += u'"CIVILITE":"'+self.none_to_str(formdata.values['civilite_code'])+'","NOM":"'+self.none_to_str(formdata.values['nom'])+'","PRENOM":"'+self.none_to_str(formdata.values['prenom'])+'","ADRESSE":"'+self.none_to_str(formdata.values['numero_voie'])+' '+self.none_to_str(formdata.values['voie'])+'","BP":"'+self.none_to_str(formdata.values['boite_postale'])
|
|
send_request.data += u'","CP":"'+self.none_to_str(formdata.values['code_postal'])+'","VILLE":"'+self.none_to_str(formdata.values['commune'])+'",'
|
|
send_request.data += u'"REFERENCE":"'+self.none_to_str(formdata.values['reference'])+'","CLASSE_OBJETS":"ICPARCEL","LISTE_PARCELLES":"'+liste_parcelles+'","MAIL_DEMANDEUR":"'+user_email+'",'
|
|
if formdata.values['type_de_demande'] == 'CA':
|
|
send_request.data += u'"TYPE_CERTIF":"ATTESTATION",'
|
|
if formdata.values['type_de_demande'] == 'RU':
|
|
send_request.data += u'"PLAN":true,'
|
|
send_request.data += u'"MODE_SYNTHETIQUE":true,'
|
|
send_request.data += u'"ADRESSE_DEFAUT":"",'
|
|
send_request.data += u'"ENV":"'+formdata.values['env']+'"}'
|
|
|
|
send_request.data = send_request.data.encode('utf-8')
|
|
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "send_request.data:", send_request.data
|
|
|
|
resp = self.send_post(send_request)
|
|
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "resp.content:", resp.content
|
|
return {'data': json.loads(resp.content)}
|
|
|
|
@endpoint(perm='can_access')
|
|
def status(self, request, iddemande):
|
|
file = None
|
|
|
|
send_request = requests.Request()
|
|
send_request.url = self.service_url+'/rr/Demandes?operation=suivi&id='+iddemande
|
|
resp = self.send_get(send_request)
|
|
data = json.loads(resp.content)
|
|
|
|
|
|
if data['EtatDemande'] == "DEMANDE_DISPO":
|
|
file_request = requests.Request()
|
|
#file_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rs/ReadResource?url='+data['Url']
|
|
file_request.url = self.service_url+'/rs/ReadResource?url='+data['Url']
|
|
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "file url:", file_request.url
|
|
file_resp = self.send_get(file_request)
|
|
file = {'b64_content': base64.b64encode(file_resp.content), 'filename': 'certificat.pdf'}
|
|
|
|
#print >> open('/home/grandlyon/log/elyx.pdf', 'a+'), datetime.datetime.now(), file_resp.content
|
|
|
|
return {'data': data, 'file': file}
|