diff --git a/grandlyon_elyx/formdata.py b/grandlyon_elyx/formdata.py new file mode 100644 index 0000000..20361e3 --- /dev/null +++ b/grandlyon_elyx/formdata.py @@ -0,0 +1,88 @@ +# Copyright (C) 2016 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from datetime import datetime + + +def is_required(value): + if not value: + raise ValueError('is required') + return value + +def to_datetime(value): + if not value: + return + return datetime.strptime(value[:19], '%Y-%m-%dT%H:%M:%S') + +def default_to_now(value): + if not value: + return datetime.now() + return value + + +CREATION_SCHEMA = ( +) + +def list_schema_fields(schema): + for fieldname in schema: + yield fieldname[0] if isinstance(fieldname, tuple) else fieldname + +class FormData(object): + + def __init__(self, formdata, schema): + if not isinstance(formdata, dict): + raise ValueError('formdata must be a dict') + + if 'fields' in formdata and isinstance(formdata['fields'], dict): + values = formdata['fields'] + if 'extra' in formdata: + values.update(formdata['extra']) + else: + values = formdata + + # extract/create/validate fields according to schema + self.fields = {} + for fieldname in schema: + if isinstance(fieldname, tuple): + value = values.get(fieldname[0]) + for modifier in fieldname[1:]: + try: + value = modifier(value) + except ValueError as e: + raise ValueError('%s: %s' % (fieldname[0], e.message)) + fieldname = fieldname[0] + else: + value = values.get(fieldname) + if value is not None: + self.fields[fieldname] = value + + # keep all form values + self.values = values + ''' + for key, value in values.items(): + print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.now(), key, ": ", value + ''' + + # extract attachments + self.attachments = {} + attachments = { + key: value + for key, value in values.items() + if isinstance(value, dict) and ('filename' in value and + 'content_type' in value and + 'content' in value) + } + for key in sorted(attachments.keys()): + self.attachments[key] = attachments[key] diff --git a/grandlyon_elyx/migrations/0001_initial.py b/grandlyon_elyx/migrations/0001_initial.py new file mode 100644 index 0000000..0d6169d --- /dev/null +++ b/grandlyon_elyx/migrations/0001_initial.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('base', '0008_auto_20180227_1502'), + ] + + operations = [ + migrations.CreateModel( + name='TestConnector', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('title', models.CharField(max_length=50, verbose_name='Title')), + ('description', models.TextField(verbose_name='Description')), + ('slug', models.SlugField(unique=True)), + ('log_level', models.CharField(default=b'INFO', max_length=10, verbose_name='Log Level', choices=[(b'NOTSET', b'NOTSET'), (b'DEBUG', b'DEBUG'), (b'INFO', b'INFO'), (b'WARNING', b'WARNING'), (b'ERROR', b'ERROR'), (b'CRITICAL', b'CRITICAL'), (b'FATAL', b'FATAL')])), + ('token_url', models.URLField(max_length=256, verbose_name='Token URL')), + ('token_authorization', models.CharField(max_length=128, verbose_name='Token Authorization')), + ('wsdl_url', models.CharField(max_length=256, verbose_name='WSDL URL')), + ('verify_cert', models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity')), + ('users', models.ManyToManyField(to='base.ApiUser', blank=True)), + ], + options={ + 'verbose_name': 'Connecteur Elyx Grand Lyon', + }, + ), + ] diff --git a/grandlyon_elyx/migrations/__init__.py b/grandlyon_elyx/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/grandlyon_elyx/models.py b/grandlyon_elyx/models.py new file mode 100644 index 0000000..5f4d7ef --- /dev/null +++ b/grandlyon_elyx/models.py @@ -0,0 +1,150 @@ +# coding: utf8 + +import base64 +import requests +import json +import datetime + +from django.db import models +from django.utils.translation import ugettext_lazy as _ +from django.core.cache import cache + +from passerelle.base.models import BaseResource +from passerelle.utils.api import endpoint +from passerelle.utils.jsonresponse import APIError + +from .formdata import FormData, CREATION_SCHEMA, list_schema_fields + +class ParameterTypeError(Exception): + http_status = 400 + log_error = False + + +class grandlyonElyx(BaseResource): + token_url = models.URLField(_('Token URL'), max_length=256) + token_authorization = models.CharField(_('Token Authorization'), max_length=128) + wsdl_url = models.CharField(_('WSDL URL'), max_length=256) # not URLField, it can be file:// + verify_cert = models.BooleanField(default=True, + verbose_name=_('Check HTTPS Certificate validity')) + + category = _('Business Process Connectors') + + class Meta: + verbose_name = 'Connecteur Elyx Grand Lyon' + + def get_token(self, renew=False): + cache_key = 'elyx-%s-token' % self.id + if not renew: + token = cache.get(cache_key) + if token: + return token + headers = {'Authorization': 'Basic %s' % self.token_authorization} + resp = self.requests.post(self.token_url, headers=headers, + data={'grant_type': 'password', 'username': 'svc-elxws-dev', 'password': 't5ZMrzgB'}, + verify=self.verify_cert).json() + token = '%s %s' % (resp.get('token_type'), resp.get('access_token')) + timeout = int(resp.get('expires_in')) + cache.set(cache_key, token, timeout) + self.logger.debug('new token: %s (timeout %ss)', token, timeout) + return token + + def send_get(self, request): + request.headers['Authorization'] = self.get_token() + resp = self.requests.get(request.url, data=request.data, + headers=request.headers, + verify=self.verify_cert) + if resp.status_code == 401: + # ask for a new token, and retry + request.headers['Authorization'] = self.get_token(renew=True) + resp = self.instance.requests.get(request.url, data=request.data, + headers=request.headers, + verify=self.verify_cert) + return resp + + def send_post(self, request): + request.headers['Authorization'] = self.get_token() + resp = self.requests.post(request.url, data=request.data, + headers=request.headers, + verify=self.verify_cert) + if resp.status_code == 401: + # ask for a new token, and retry + request.headers['Authorization'] = self.get_token(renew=True) + resp = self.instance.requests.post(request.url, data=request.data, + headers=request.headers, + verify=self.verify_cert) + return resp + + def none_to_str(self, input): + if input is None: + input = '' + return input + + @endpoint() + def info(self, request): + return {'hello': datetime.datetime.now().strftime('%Y%m%d%H%M%S')} + + @endpoint(perm='can_access', methods=['post']) + def create(self, request): + # get creation fields from payload + try: + formdata = FormData(json.loads(request.body), CREATION_SCHEMA) + except ValueError as e: + raise ParameterTypeError(e.message) + + timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + + print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "(['boite_postale'])", formdata.values['boite_postale'] + print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "self.none_to_str(formdata.values['boite_postale']):", self.none_to_str(formdata.values['boite_postale']) + + liste_parcelles = '' + for parcelle in formdata.values['parcelles']: + liste_parcelles += parcelle[0] + ',' + + # Par defaut si user non connecte + user_email = 'gnm@grandlyon.com' + if 'user_email' in formdata.values: + user_email = formdata.values['user_email'] + + print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "user_email:", user_email + + send_request = requests.Request() + send_request.headers['Content-Type'] = 'application/json' + #TODO: param + send_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rr/Demandes?operation=envoi' + #TODO: reference, login, civilite, plusieurs parcelles, mail + send_request.data = u'{"objet_json":"demandePortail","ID_REQ_SIG":"'+user_email+'_'+timestamp+'",' + send_request.data += u'"ID_REQ_DEMANDE":"'+formdata.values['type_de_demande']+'-'+formdata.values['tracking_code']+'","TYPE":"'+formdata.values['type_de_demande']+'","DATE_DEM":"'+datetime.datetime.now().strftime('%d/%m/%Y')+'",' + send_request.data += u'"CIVILITE":"'+self.none_to_str(formdata.values['civilite_code'])+'","NOM":"'+self.none_to_str(formdata.values['nom'])+'","PRENOM":"'+self.none_to_str(formdata.values['prenom'])+'","ADRESSE":"'+self.none_to_str(formdata.values['numero_voie'])+' '+self.none_to_str(formdata.values['voie'])+'","BP":"'+self.none_to_str(formdata.values['boite_postale']) + send_request.data += u'","CP":"'+self.none_to_str(formdata.values['code_postal'])+'","VILLE":"'+self.none_to_str(formdata.values['commune'])+'",' + send_request.data += u'"REFERENCE":"'+self.none_to_str(formdata.values['reference'])+'","CLASSE_OBJETS":"ICPARCEL","LISTE_PARCELLES":"'+liste_parcelles+'","MAIL_DEMANDEUR":"'+user_email+'",' + send_request.data += u'"TYPE_CERTIF":"'+formdata.values['type_certificat'].replace(u'é',u'e').upper()+'",' + send_request.data += u'"ENV":"'+formdata.values['env']+'"}' + + send_request.data = send_request.data.encode('utf-8') + print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "send_request.data:", send_request.data + + resp = self.send_post(send_request) + return {'data': json.loads(resp.content)} + + @endpoint(perm='can_access') + def status(self, request, iddemande): + file = None + + send_request = requests.Request() + #TODO: param + send_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rr/Demandes?operation=suivi&id='+iddemande + resp = self.send_get(send_request) + data = json.loads(resp.content) + + + if data['EtatDemande'] == "DEMANDE_DISPO": + file_request = requests.Request() + #TODO: param + file_request.url = 'https://api-rec.grandlyon.com/mercure_dev/1.0/bureau_notaires/1.0/rs/ReadResource?url='+data['Url'] + #print >> open('/home/grandlyon/log/elyx.debug', 'a+'), datetime.datetime.now(), "file url:", file_request.url + file_resp = self.send_get(file_request) + file = {'b64_content': base64.b64encode(file_resp.content), 'filename': 'certificat.pdf'} + + #print >> open('/home/grandlyon/log/elyx.pdf', 'a+'), datetime.datetime.now(), file_resp.content + + return {'data': data, 'file': file}