passerelle/passerelle/contrib/teamnet_axel/models.py

357 lines
14 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import json
import logging
import xml.etree.ElementTree as ET
from datetime import datetime
from django.db import models
from django.db.models import JSONField
from django.http import HttpResponse, HttpResponseNotFound
from django.utils.encoding import smart_str
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.soap import client_to_jsondict
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.views import WrongParameter
from . import soap
from .utils import normalize_invoice, normalize_person
logger = logging.getLogger('passerelle.contrib.teamnet_axel')
ADULT1 = '1'
ADULT2 = '2'
CHILD = '3'
DATE_IN_FORMAT = '%Y-%m-%dT%H:%M:%S'
DATE_OUT_FORMAT = '%d/%m/%Y %H:%M:%S'
def get_name_id(request):
if 'NameID' not in request.GET:
raise WrongParameter(['NameID'], [])
return request.GET['NameID']
class TeamnetAxel(BaseResource):
wsdl_url = models.CharField(
max_length=128, blank=False, verbose_name=_('WSDL URL'), help_text=_('Teamnet Axel WSDL URL')
)
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
username = models.CharField(max_length=128, blank=True, verbose_name=_('Username'))
password = models.CharField(max_length=128, blank=True, verbose_name=_('Password'))
keystore = models.FileField(
upload_to='teamnet_axel',
null=True,
blank=True,
verbose_name=_('Keystore'),
help_text=_('Certificate and private key in PEM format'),
)
billing_regies = JSONField(_('Mapping between regie ids and billing ids'))
category = _('Business Process Connectors')
manager_view_template_name = 'passerelle/contrib/teamnet_axel/detail.html'
class Meta:
verbose_name = _('Teamnet Axel')
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
#
# Axel SOAP call: getData
#
def get_data(self, operation, args):
# args is a XML node (ElementTree.Element)
portail = ET.Element('PORTAIL')
portail.append(args)
streamId = operation
xmlParams = smart_str(ET.tostring(portail, encoding='UTF-8'))
user = ''
logger.debug('getData(streamId=%s, xmlParams=%s, user=%s)', streamId, xmlParams, user)
result = soap.get_client(self).service.getData(streamId, smart_str(xmlParams), user)
logger.debug('getData(%s) result: %s', streamId, smart_str(result))
xml_result = ET.fromstring(result)
if xml_result.find('RESULTAT/STATUS').text != 'OK':
msg = xml_result.find('RESULTAT/COMMENTAIRES').text
raise APIError(msg)
return xml_result.find('DATA')
# Axel authentication
def authenticate(self, login, pwd):
"""return False or an AXEL user dict:
{
"login": "23060A",
"estidentifie": true,
"estbloque": false,
"estchangement_mdp_requis": false,
"nbechec": "0",
"idpersonne": "47747",
"idfamille": "23060",
}
"""
xml_utilisateur = ET.Element('UTILISATEUR')
ET.SubElement(xml_utilisateur, 'LOGIN').text = login
ET.SubElement(xml_utilisateur, 'PWD').text = pwd
try:
data = self.get_data('ConnexionCompteFamille', xml_utilisateur)
except APIError:
return False
data = data.find('PORTAIL/UTILISATEUR')
data = soap.xml_to_dict(data)
for key, value in data.items():
if key.startswith('est'):
data[key] = value == 'true'
if data.get('estbloque'):
return False
if not data.get('estidentifie'):
return False
return data
def get_family_id(self, request):
nameid = get_name_id(request)
links = Link.objects.filter(resource=self, nameid=nameid)
if len(links) > 1:
raise APIError('multiple links')
if not links:
return None
user = self.authenticate(links[0].login, links[0].pwd)
if not user:
raise APIError('authentication failed')
if 'idfamille' not in user:
raise APIError('user without idfamille')
return user['idfamille']
def get_family_data(self, idfamille, annee=None):
xml_famille = ET.Element('FAMILLE')
ET.SubElement(xml_famille, 'IDFAMILLE').text = idfamille
if annee:
ET.SubElement(xml_famille, 'ANNEE').text = annee
data = self.get_data('DonneesFamille', xml_famille)
xml_individus = data.findall('PORTAIL/INDIVIDUS')
if not xml_individus:
raise APIError('PORTAIL/INDIVIDUS is empty')
individus = [{k.lower(): v for k, v in i.attrib.items()} for i in xml_individus]
for individu in individus:
individu['id'] = individu['idindividu']
individu['text'] = '%(prenom)s %(nom)s' % individu
adults = [normalize_person(i) for i in individus if i['indtype'] in (ADULT1, ADULT2)]
children = [normalize_person(i) for i in individus if i['indtype'] == CHILD]
return {'family': idfamille, 'adults': adults, 'children': children}
@endpoint(perm='can_access')
def ping(self, request, *args, **kwargs):
try:
client = soap.get_client(self)
except (Exception,) as exc:
raise APIError('Client Error: %s' % exc)
res = {'ping': 'pong'}
if 'debug' in request.GET:
res['client'] = client_to_jsondict(client)
return {'data': res}
@endpoint(perm='can_access')
def auth(self, request, *args, **kwargs):
login = request.GET.get('login')
pwd = request.GET.get('pwd')
return {'data': self.authenticate(login, pwd)}
@endpoint(name='family', perm='can_access')
def family_data(self, request, *args, **kwargs):
idfamille = self.get_family_id(request)
if not idfamille:
return {'data': None}
data = self.get_family_data(idfamille)
return {'data': data}
@endpoint(name='family', perm='can_access', pattern='^adults/$')
def family_adults(self, request, *args, **kwargs):
idfamille = self.get_family_id(request)
if not idfamille:
return {'data': None}
data = self.get_family_data(idfamille)
return {'data': data.get('adults')}
@endpoint(name='family', perm='can_access', pattern='^children/$')
def family_children(self, request, *args, **kwargs):
idfamille = self.get_family_id(request)
if not idfamille:
return {'data': None}
data = self.get_family_data(idfamille)
return {'data': data.get('children')}
@endpoint(name='family', perm='can_access', pattern='^link/$')
def family_link(self, request, *args, **kwargs):
nameid = get_name_id(request)
login = request.GET.get('login')
pwd = request.GET.get('password')
user = self.authenticate(login, pwd)
if not user:
raise APIError('authentication failed')
if 'idfamille' not in user:
raise APIError('user without idfamille')
famille = self.get_family_data(user['idfamille'])
Link.objects.update_or_create(resource=self, nameid=nameid, defaults={'login': login, 'pwd': pwd})
user['_famille'] = famille
user['_nameid'] = nameid
return {'data': user}
@endpoint(name='family', perm='can_access', pattern='^unlink/$')
def family_unlink(self, request, *args, **kwargs):
nameid = get_name_id(request)
logins = [v['login'] for v in Link.objects.filter(resource=self, nameid=nameid).values('login')]
Link.objects.filter(resource=self, nameid=nameid).delete()
if logins:
return {'data': {'login_was': logins}}
else:
return {'data': None}
def get_teamnet_payable_invoices(self, regie_id, family_id):
operation = 'FacturesApayerRegie'
xml_invoices = ET.Element('LISTFACTURE')
ET.SubElement(xml_invoices, 'IDREGIE').text = regie_id
ET.SubElement(xml_invoices, 'IDFAMILLE').text = family_id
data = self.get_data(operation, xml_invoices)
xml_invoices = data.findall('PORTAIL/FACTURES')
payable_invoices = {}
if xml_invoices:
for i in xml_invoices:
payable_invoices.update(normalize_invoice(i.attrib, family_id))
return payable_invoices
@endpoint(name='regie', pattern=r'^(?P<regie_id>\w+)/invoices/$')
def active_invoices(self, request, regie_id, **kwargs):
family_id = self.get_family_id(request)
if not family_id:
return {'data': []}
invoices = self.get_teamnet_payable_invoices(regie_id, family_id)
invoices = sorted((p for i, p in invoices.items()), key=lambda i: i['created'], reverse=True)
return {'data': invoices}
def get_teamnet_historical_invoices(self, regie_id, family_id):
"""
returns historical invoices for a given regie.
The list contains also payable invoices.
"""
operation = 'HistoriqueFacturesRegie'
xml_invoices = ET.Element('LISTFACTURE')
ET.SubElement(xml_invoices, 'IDREGIE').text = regie_id
ET.SubElement(xml_invoices, 'IDFAMILLE').text = family_id
ET.SubElement(xml_invoices, 'NBMOIS').text = '12'
data = self.get_data(operation, xml_invoices)
xml_invoices = data.findall('PORTAIL/FACTURES')
historical_invoices = {}
if xml_invoices:
for i in xml_invoices:
historical_invoices.update(normalize_invoice(i.attrib, family_id, historical=True))
return historical_invoices
@endpoint(name='regie', perm='can_access', pattern=r'^(?P<regie_id>\w+)/invoices/history/$')
def invoices_history(self, request, regie_id, **kwargs):
family_id = self.get_family_id(request)
if not family_id:
return {'data': []}
payable = self.get_teamnet_payable_invoices(regie_id, family_id)
historical = self.get_teamnet_historical_invoices(regie_id, family_id)
historical = [v for i, v in historical.items() if i not in payable]
invoices = sorted(historical, key=lambda i: i['created'], reverse=True)
return {'data': invoices}
@endpoint(
name='regie', perm='can_access', pattern=r'^(?P<regie_id>\w+)/invoice/(?P<invoice_id>[\w,-]+)/$'
)
def get_invoice_details(self, request, regie_id, invoice_id, **kwargs):
family_id, i = invoice_id.split('-', 1)
payable = self.get_teamnet_payable_invoices(regie_id, family_id)
if invoice_id in payable:
return {'data': payable[invoice_id]}
historical = self.get_teamnet_historical_invoices(regie_id, family_id)
if invoice_id in historical:
return {'data': historical[invoice_id]}
return {'data': None}
@endpoint(
name='regie', perm='can_access', pattern=r'^(?P<regie_id>\w+)/invoice/(?P<invoice_id>[\w,-]+)/pdf/$'
)
def invoice_pdf(self, request, regie_id, invoice_id, **kwargs):
family_id, invoice = invoice_id.split('-', 1)
invoice_xml = ET.Element('FACTUREPDF')
ET.SubElement(invoice_xml, 'IDFAMILLE').text = family_id
ET.SubElement(ET.SubElement(invoice_xml, 'FACTURES'), 'NOFACTURE').text = invoice
data = self.get_data('FacturesPDF', invoice_xml)
pdf = data.find('PORTAIL/PDF')
b64content = base64.b64decode(pdf.get('FILE'))
if not b64content:
return HttpResponseNotFound()
response = HttpResponse(content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % invoice_id
response.write(b64content)
return response
@endpoint(
name='regie',
methods=['post'],
perm='can_access',
pattern=r'^(?P<regie_id>\w+)/invoice/(?P<invoice_id>[\w,-]+)/pay/$',
)
def pay_invoice(self, request, regie_id, invoice_id, **kwargs):
data = json.loads(request.body)
transaction_id = data.get('transaction_id')
transaction_date = data.get('transaction_date')
email = data.get('email')
family_id, invoice = invoice_id.split('-', 1)
payable_invoices = self.get_teamnet_payable_invoices(regie_id, family_id)
if invoice_id not in payable_invoices:
return {'data': False}
invoice_to_pay = payable_invoices[invoice_id]
t_date = datetime.strptime(transaction_date, DATE_IN_FORMAT)
payment_xml = ET.Element('PAIEMENT')
ET.SubElement(payment_xml, 'IDDEMANDE')
ET.SubElement(payment_xml, 'IDFAMILLE').text = family_id
ET.SubElement(payment_xml, 'IDREGIEENC').text = self.billing_regies.get(regie_id)
ET.SubElement(payment_xml, 'MODEREGLEMENT').text = 'PAY'
ET.SubElement(payment_xml, 'MONTANT').text = str(invoice_to_pay['amount'])
ET.SubElement(payment_xml, 'URL')
if email:
ET.SubElement(payment_xml, 'COURRIEL').text = email
else:
ET.SubElement(payment_xml, 'COURRIEL')
ET.SubElement(payment_xml, 'REFPAIEMENT').text = transaction_id
ET.SubElement(payment_xml, 'DATEENVOI').text = t_date.strftime(DATE_OUT_FORMAT)
ET.SubElement(payment_xml, 'DATERETOUR').text = t_date.strftime(DATE_OUT_FORMAT)
ET.SubElement(payment_xml, 'CODERETOUR')
ET.SubElement(ET.SubElement(payment_xml, 'FACTURE'), 'NOFACTURE').text = invoice
self.get_data('PaiementFactures', payment_xml)
return {'data': True}
class Link(models.Model):
resource = models.ForeignKey(TeamnetAxel, on_delete=models.CASCADE)
nameid = models.CharField(blank=False, max_length=256)
login = models.CharField(blank=False, max_length=128)
pwd = models.CharField(blank=False, max_length=128)