zoo/zoo/zoo_nanterre/saga.py

264 lines
9.8 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import decimal
import urllib.parse
import xml.etree.ElementTree as ET
from django.conf import settings
from django.utils import timezone
import requests
from collections import namedtuple
# Some POPOs!
Facture = namedtuple('Facture', ['date_facture', 'date_limite_recouvrement', 'etat',
'incident_paiement', 'montant_initial', 'num', 'reste_a_payer',
'creances', 'commentaire', 'extra'])
Creance = namedtuple('Creance', [
'imputation',
'libelle',
'montant',
'num_creance',
'commentaire',
'champs_dynamiques'])
class Saga(object):
def __init__(self, url, ns=None, base_uri=None, num_service=None, timeout=None):
self.url = url
self.base_uri = base_uri or 'saga_web_testrsu'
self.num_service = num_service or '67'
self.ns = ns or 'ns'
self.timeout = timeout or settings.ZOO_NANTERRE_RSU_TIMEOUT
@property
def creance_url(self):
return urllib.parse.urljoin(
self.url,
'/%s/services/etat_facture_creance_literal' % self.base_uri)
@property
def paiement_url(self):
return urllib.parse.urljoin(
self.url,
'/%s/services/paiement_internet_ws_literal' % self.base_uri)
def soap_call(self, url, body, content_tag, timeout=None, **kwargs):
wrapper = '''<soap11:Envelope xmlns:soap11="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:tns1="{tns1}"
xmlns:tns0="{tns0}"
xmlns="ns">
<soap11:Body>
{body}
</soap11:Body>
</soap11:Envelope>'''
body = body.format(**kwargs)
body = wrapper.format(body=body, tns0=self.creance_url, tns1=self.paiement_url)
if timeout is None:
timeout = self.timeout
try:
response = requests.post(url, body,
timeout=timeout,
headers={
'Content-Type': 'application/xml',
'SOAPAction': "'\"'\"",
})
except requests.RequestException as e:
return None, u'SAGA connexion impossible: %r' % e
if response.status_code != 200:
return None, u'SAGA response is not 200: %s %r' % (response.status_code,
response.text[:1024])
try:
et = ET.fromstring(response.text)
except Exception:
return None, u'SAGA invalid XML content: %r' % response.text[:1024]
content_node = et.find('.//{%s}%s' % (self.ns, content_tag))
if content_node is None:
return None, u'SAGA no content node: %r' % response.text[:1024]
# pluging XML inside XML is so stupid :(
content = content_node.text
if 'ISO-8859-1' in content:
encoded_content = content.encode('latin1')
else:
encoded_content = content.encode('utf-8')
tree = ET.fromstring(encoded_content)
if tree.tag == 'erreur':
return None, u'SAGA erreur: %s' % tree.text
return tree, None
def resolve_code_tiers(self, code_tiers):
body = '''
<codeTiersFedere>
<num_tiers>{code_tiers}</num_tiers>
</codeTiersFedere>'''
tree, error = self.soap_call(self.creance_url, body, 'codeTiersFedereReturn', ns=self.ns,
code_tiers=code_tiers)
if tree is None:
return None, error
if tree.tag != 'code_tiers_federe':
return None, u'SAGA no code_tiers_federe node: %r' % ET.tostring(tree)
return tree.text, None
def get_child_content(self, tree, child_name):
t = tree.find(child_name)
if t:
return t.text
def factures(self, federation, debut=None, fin=None, timeout=None):
'''
federation - string
debut - datetime
fin - datetime
'''
body = '''
<etatFactureParTiersFedere>
<num_tiers>{federation}</num_tiers>
<num_service>{num_service}</num_service>
<type_facture>toute</type_facture>
<periode_debut>{periode_debut}</periode_debut>
<periode_fin>{periode_fin}</periode_fin>
<detail>oui</detail>
</etatFactureParTiersFedere>'''
tree, error = self.soap_call(
self.creance_url, body, 'etatFactureParTiersFedereReturn',
timeout=timeout,
num_service=self.num_service,
periode_debut=debut.strftime('%d/%m/%Y') if debut else '',
periode_fin=fin.strftime('%d/%m/%Y') if fin else '',
federation=federation, ns=self.ns)
if tree is None:
return None, error
def helper():
for t in tree.findall('.//facture'):
a = t.attrib
def helper2():
for c in t.findall('.//creance'):
a = c.attrib
champs_dynamiques = {}
for champ_dynamique in c.findall('.//champ_dynamique'):
identifiant = champ_dynamique.attrib['identifiant']
valeur = champ_dynamique.attrib['valeur']
champs_dynamiques[identifiant] = valeur
yield Creance(
imputation=a['imputation'],
libelle=a['libelle'],
montant=decimal.Decimal(a['montant']),
num_creance=a['num_creance'],
commentaire=self.get_child_content(c, 'commentaire'),
champs_dynamiques=champs_dynamiques)
def parse_date(d):
return datetime.datetime.strptime(d, '%d/%m/%Y').date()
etat = a['etat']
date_limite_recouvrement = parse_date(a['date_limite_recouvrement'])
if timezone.now().date() > date_limite_recouvrement and etat == 'en cours':
etat = u'dépassée'
facture = Facture(
date_facture=parse_date(a['date_facture']),
date_limite_recouvrement=date_limite_recouvrement,
etat=etat,
incident_paiement=a['incident_paiement'] != 'non',
montant_initial=decimal.Decimal(a['montant_initial']),
num=a['num'],
reste_a_payer=decimal.Decimal(a['reste_a_payer']),
commentaire=self.get_child_content(t, 'commentaire'),
creances=list(helper2()),
extra={})
yield facture
return list(helper()), None
def transaction(self, factures, urlretour_asynchrone, urlretour_synchrone, email):
body = '''
<Transaction>
<num_service>{num_service}</num_service>
<id_facture>{id_facture}</id_facture>
<montant>{montant}</montant>
<urlretour_asynchrone>{urlretour_asynchrone}</urlretour_asynchrone>
<email>{email}</email>
<urlretour_synchrone>{urlretour_synchrone}</urlretour_synchrone>
</Transaction>'''
assert factures, u'factures ne doit pas être vide'
id_facture = u'--'.join(str(facture.num) for facture in factures)
montant = sum(facture.reste_a_payer for facture in factures)
tree, error = self.soap_call(
self.paiement_url, body, 'TransactionReturn',
num_service=self.num_service,
id_facture=id_facture,
montant=montant,
urlretour_asynchrone=urlretour_asynchrone,
urlretour_synchrone=urlretour_synchrone,
email=email)
if tree is None:
return None, error
if tree.tag != 'url':
return None, u'SAGA tag is not url: %r' % ET.tostring(tree)
return tree.text, None
def page_retour_asynchrone(self, idop):
body = '''
<PageRetourAsynchrone>
<idop>{idop}</idop>
</PageRetourAsynchrone>'''
tree, error = self.soap_call(
self.paiement_url, body, 'PageRetourAsynchroneReturn',
idop=idop)
if tree is None:
return None, error
if tree.tag != 'ok':
return None, u'SAGA tag is not ok: %r' % ET.tostring(tree)
result = {
'code_tiers': tree.attrib.get('code_tiers'),
'etat': tree.attrib['etat'],
'email': tree.attrib.get('email'),
'num_service': tree.attrib.get('num_service'),
'factures': [{
'num': t.attrib.get('num'),
'montant_initial': t.attrib.get('montant_initial'),
} for t in tree.findall('.//facture')],
}
return result, None
def page_retour_synchrone(self, idop):
body = '''
<PageRetourSynchrone>
<idop>{idop}</idop>
</PageRetourSynchrone>'''
tree, error = self.soap_call(
self.paiement_url, body, 'PageRetourSynchroneReturn',
idop=idop)
if tree is None:
return None, error
if tree.tag != 'ok':
return None, u'SAGA tag is not ok: %r' % ET.tostring(tree)
result = {
'code_tiers': tree.attrib.get('code_tiers'),
'etat': tree.attrib['etat'],
'email': tree.attrib.get('email'),
'num_service': tree.attrib.get('num_service'),
'factures': [{
'num': t.attrib.get('num'),
'montant_initial': decimal.Decimal(t.attrib.get('montant_initial')),
} for t in tree.findall('.//facture')],
}
return result, None