Version initiale

This commit is contained in:
Etienne Loupias 2018-09-25 12:06:56 +02:00
parent 1979ce4939
commit 2b2473d532
4 changed files with 270 additions and 0 deletions

View File

@ -0,0 +1,88 @@
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
def is_required(value):
if not value:
raise ValueError('is required')
return value
def to_datetime(value):
if not value:
return
return datetime.strptime(value[:19], '%Y-%m-%dT%H:%M:%S')
def default_to_now(value):
if not value:
return datetime.now()
return value
CREATION_SCHEMA = (
)
def list_schema_fields(schema):
for fieldname in schema:
yield fieldname[0] if isinstance(fieldname, tuple) else fieldname
class FormData(object):
def __init__(self, formdata, schema):
if not isinstance(formdata, dict):
raise ValueError('formdata must be a dict')
if 'fields' in formdata and isinstance(formdata['fields'], dict):
values = formdata['fields']
if 'extra' in formdata:
values.update(formdata['extra'])
else:
values = formdata
# extract/create/validate fields according to schema
self.fields = {}
for fieldname in schema:
if isinstance(fieldname, tuple):
value = values.get(fieldname[0])
for modifier in fieldname[1:]:
try:
value = modifier(value)
except ValueError as e:
raise ValueError('%s: %s' % (fieldname[0], e.message))
fieldname = fieldname[0]
else:
value = values.get(fieldname)
if value is not None:
self.fields[fieldname] = value
# keep all form values
self.values = values
'''
for key, value in values.items():
print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.now(), key, ": ", value
'''
# extract attachments
self.attachments = {}
attachments = {
key: value
for key, value in values.items()
if isinstance(value, dict) and ('filename' in value and
'content_type' in value and
'content' in value)
}
for key in sorted(attachments.keys()):
self.attachments[key] = attachments[key]

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('base', '0008_auto_20180227_1502'),
]
operations = [
migrations.CreateModel(
name='TestConnector',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('title', models.CharField(max_length=50, verbose_name='Title')),
('description', models.TextField(verbose_name='Description')),
('slug', models.SlugField(unique=True)),
('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])),
('token_url', models.URLField(max_length=256, verbose_name='Token URL')),
('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')),
('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')),
('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')),
('users', models.ManyToManyField(to='base.ApiUser', blank=True)),
],
options={
'verbose_name': 'Connecteur Elyx Grand Lyon',
},
),
]

View File

150
grandlyon_elyx/models.py Normal file
View File

@ -0,0 +1,150 @@
# coding: utf8
import base64
import requests
import json
import datetime
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from .formdata import FormData, CREATION_SCHEMA, list_schema_fields
class ParameterTypeError(Exception):
http_status = 400
log_error = False
class grandlyonElyx(BaseResource):
token_url = models.URLField(_('Token URL'), max_length=256)
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file://
verify_cert = models.BooleanField(default=True,
verbose_name=_('Check HTTPS Certificate validity'))
category = _('Business Process Connectors')
class Meta:
verbose_name = 'Connecteur Elyx Grand Lyon'
def get_token(self, renew=False):
cache_key = 'elyx-%s-token' % self.id
if not renew:
token = cache.get(cache_key)
if token:
return token
headers = {'Authorization': 'Basic %s' % self.token_authorization}
resp = self.requests.post(self.token_url, headers=headers,
data={'grant_type': 'password', 'username': 'svc-elxws-dev', 'password': 't5ZMrzgB'},
verify=self.verify_cert).json()
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
timeout = int(resp.get('expires_in'))
cache.set(cache_key, token, timeout)
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
return token
def send_get(self, request):
request.headers['Authorization'] = self.get_token()
resp = self.requests.get(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.get_token(renew=True)
resp = self.instance.requests.get(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
return resp
def send_post(self, request):
request.headers['Authorization'] = self.get_token()
resp = self.requests.post(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.get_token(renew=True)
resp = self.instance.requests.post(request.url, data=request.data,
headers=request.headers,
verify=self.verify_cert)
return resp
def none_to_str(self, input):
if input is None:
input = ''
return input
@endpoint()
def info(self, request):
return {'hello': datetime.datetime.now().strftime('%Y%m%d%H%M%S')}
@endpoint(perm='can_access', methods=['post'])
def create(self, request):
# get creation fields from payload
try:
formdata = FormData(json.loads(request.body), CREATION_SCHEMA)
except ValueError as e:
raise ParameterTypeError(e.message)
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "(['boite_postale'])", formdata.values['boite_postale']
print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "self.none_to_str(formdata.values['boite_postale']):", self.none_to_str(formdata.values['boite_postale'])
liste_parcelles = ''
for parcelle in formdata.values['parcelles']:
liste_parcelles += parcelle[0] + ','
# Par defaut si user non connecte
user_email = 'gnm@grandlyon.com'
if 'user_email' in formdata.values:
user_email = formdata.values['user_email']
print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "user_email:", user_email
send_request = requests.Request()
send_request.headers['Content-Type'] = 'application/json'
#TODO: param
send_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rr/Demandes?operation=envoi'
#TODO: reference, login, civilite, plusieurs parcelles, mail
send_request.data = u'{"objet_json":"demandePortail","ID_REQ_SIG":"'+user_email+'_'+timestamp+'",'
send_request.data += u'"ID_REQ_DEMANDE":"'+formdata.values['type_de_demande']+'-'+formdata.values['tracking_code']+'","TYPE":"'+formdata.values['type_de_demande']+'","DATE_DEM":"'+datetime.datetime.now().strftime('%d/%m/%Y')+'",'
send_request.data += u'"CIVILITE":"'+self.none_to_str(formdata.values['civilite_code'])+'","NOM":"'+self.none_to_str(formdata.values['nom'])+'","PRENOM":"'+self.none_to_str(formdata.values['prenom'])+'","ADRESSE":"'+self.none_to_str(formdata.values['numero_voie'])+' '+self.none_to_str(formdata.values['voie'])+'","BP":"'+self.none_to_str(formdata.values['boite_postale'])
send_request.data += u'","CP":"'+self.none_to_str(formdata.values['code_postal'])+'","VILLE":"'+self.none_to_str(formdata.values['commune'])+'",'
send_request.data += u'"REFERENCE":"'+self.none_to_str(formdata.values['reference'])+'","CLASSE_OBJETS":"ICPARCEL","LISTE_PARCELLES":"'+liste_parcelles+'","MAIL_DEMANDEUR":"'+user_email+'",'
send_request.data += u'"TYPE_CERTIF":"'+formdata.values['type_certificat'].replace(u'é',u'e').upper()+'",'
send_request.data += u'"ENV":"'+formdata.values['env']+'"}'
send_request.data = send_request.data.encode('utf-8')
print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "send_request.data:", send_request.data
resp = self.send_post(send_request)
return {'data': json.loads(resp.content)}
@endpoint(perm='can_access')
def status(self, request, iddemande):
file = None
send_request = requests.Request()
#TODO: param
send_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rr/Demandes?operation=suivi&id='+iddemande
resp = self.send_get(send_request)
data = json.loads(resp.content)
if data['EtatDemande'] == "DEMANDE_DISPO":
file_request = requests.Request()
#TODO: param
file_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rs/ReadResource?url='+data['Url']
#print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "file url:", file_request.url
file_resp = self.send_get(file_request)
file = {'b64_content': base64.b64encode(file_resp.content), 'filename': 'certificat.pdf'}
#print >> open('/home/grandlyon/log/elyx.pdf', 'a+'), datetime.datetime.now(), file_resp.content
return {'data': data, 'file': file}