78 lines
2.8 KiB
Python
78 lines
2.8 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2023 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import xmltodict
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import serialization
|
|
from django.db import models
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from passerelle.base.models import BaseResource, HTTPResource
|
|
from passerelle.utils.api import endpoint
|
|
|
|
|
|
class SNE(BaseResource, HTTPResource):
|
|
wsdl_url = models.URLField(
|
|
max_length=400, verbose_name=_('WSDL URL'), help_text=_('URL of the WSDL file')
|
|
)
|
|
certificate_name = models.CharField(max_length=128, verbose_name=_('Client certificate name'))
|
|
category = _('Business Process Connectors')
|
|
|
|
class Meta:
|
|
verbose_name = _('SNE')
|
|
|
|
@classmethod
|
|
def get_manager_form_class(cls, **kwargs):
|
|
form_class = super().get_manager_form_class(**kwargs)
|
|
form_class.base_fields['client_certificate'].required = True
|
|
return form_class
|
|
|
|
@property
|
|
def cert_public_bytes(self):
|
|
with self.client_certificate.open('rb') as f:
|
|
cert = x509.load_pem_x509_certificate(f.read())
|
|
return cert.public_bytes(encoding=serialization.Encoding.PEM)
|
|
|
|
def check_status(self):
|
|
response = self.requests.get(self.wsdl_url)
|
|
response.raise_for_status()
|
|
|
|
@endpoint(
|
|
name='get-demande-logement',
|
|
description=_('Get informations on housing demand'),
|
|
parameters={
|
|
'demand_id': {
|
|
'example_value': '1',
|
|
}
|
|
},
|
|
)
|
|
def get_demande_logement(self, request, demand_id, **kwargs):
|
|
client = self.soap_client(wsdl_url=self.wsdl_url, api_error=True)
|
|
cert_type = client.get_type('{http://ws.metier.nuu.application.i2/}base64Binary')
|
|
cert = cert_type(_value_1=self.cert_public_bytes)
|
|
res = client.service.getDemandeLogement(
|
|
numUnique=demand_id, nomCertificat=self.certificate_name, certificat=cert
|
|
)
|
|
namespaces = {
|
|
'http://nuu.application.i2/': None,
|
|
}
|
|
return {
|
|
'data': xmltodict.parse(
|
|
res['fichierDemande']['_value_1'], process_namespaces=True, namespaces=namespaces
|
|
)
|
|
}
|