300 lines
11 KiB
Python
300 lines
11 KiB
Python
# barbacompta - invoicing for dummies
|
|
# Copyright (C) 2010-2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import dataclasses
|
|
import enum
|
|
import functools
|
|
import pprint
|
|
import re
|
|
import xml.dom.pulldom
|
|
import xml.etree.ElementTree as ET
|
|
import zipfile
|
|
|
|
import requests
|
|
|
|
|
|
class URL(enum.Enum):
|
|
TOKEN = 1
|
|
DEPOSER_FLUX = 2
|
|
TELECHARGER_ANNUAIRE_DESTINATAIRE = 3
|
|
CONSULTER_CR = 4
|
|
|
|
|
|
class ChorusAPI:
|
|
class Error(Exception):
|
|
response = None
|
|
|
|
def __init__(self, *args, response=None):
|
|
self.response = response
|
|
super().__init__(*args)
|
|
|
|
class TransportError(Error):
|
|
pass
|
|
|
|
PLATFORMS = {
|
|
'qualif': {
|
|
URL.TOKEN: 'https://sandbox-oauth.aife.economie.gouv.fr/api/oauth/token',
|
|
URL.DEPOSER_FLUX: 'https://sandbox-api.aife.economie.gouv.fr/cpro/factures/v1/deposer/flux',
|
|
URL.TELECHARGER_ANNUAIRE_DESTINATAIRE: 'https://sandbox-api.aife.economie.gouv.fr/cpro/transverses/v1/telecharger/annuaire/destinataire',
|
|
URL.CONSULTER_CR: 'https://sandbox-api.aife.economie.gouv.fr/cpro/transverses/v1/consulterCR',
|
|
},
|
|
'prod': {
|
|
URL.TOKEN: 'https://oauth.aife.economie.gouv.fr/api/oauth/token',
|
|
URL.DEPOSER_FLUX: 'https://api.aife.economie.gouv.fr/cpro/factures/v1/deposer/flux',
|
|
URL.TELECHARGER_ANNUAIRE_DESTINATAIRE: 'https://api.aife.economie.gouv.fr/cpro/transverses/v1/telecharger/annuaire/destinataire',
|
|
URL.CONSULTER_CR: 'https://api.aife.economie.gouv.fr/cpro/transverses/v1/consulterCR',
|
|
},
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
platform: str,
|
|
piste_client_id: str,
|
|
piste_client_secret: str,
|
|
chorus_tech_username: str,
|
|
chorus_tech_password: str,
|
|
session: requests.Session = None,
|
|
):
|
|
self._platform = platform
|
|
self.piste_client_id = piste_client_id
|
|
self.piste_client_secret = piste_client_secret
|
|
self.chorus_tech_username = chorus_tech_username
|
|
self.chorus_tech_password = chorus_tech_password
|
|
self.http_session = session or requests.Session()
|
|
|
|
@property
|
|
def _platform_urls(self):
|
|
return self.PLATFORMS[self._platform]
|
|
|
|
@functools.cached_property
|
|
def _piste_access_token(self):
|
|
response = self.http_session.post(
|
|
self._platform_urls[URL.TOKEN],
|
|
data={
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self.piste_client_id,
|
|
'client_secret': self.piste_client_secret,
|
|
'scope': 'openid',
|
|
},
|
|
)
|
|
return response.json()['access_token']
|
|
|
|
@property
|
|
def _cpro_account_token(self):
|
|
text = f'{self.chorus_tech_username}:{self.chorus_tech_password}'
|
|
return base64.b64encode(text.encode()).decode()
|
|
|
|
def _call_endpoint(self, *, url: str, payload: dict = None):
|
|
response = self.http_session.post(
|
|
url,
|
|
headers={
|
|
'cpro-account': self._cpro_account_token,
|
|
'authorization': f'Bearer {self._piste_access_token}',
|
|
'content-type': 'application/json;charset=utf-8',
|
|
'accept': 'application/json;charset=utf-8',
|
|
},
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
def deposer_flux_facturx(self, pdf: bytes, name: str):
|
|
payload = {
|
|
'fichierFlux': base64.b64encode(pdf).decode('ascii'),
|
|
'nomFichier': name,
|
|
'syntaxeFlux': 'IN_DP_E2_CII_FACTURX',
|
|
'avecSignature': False,
|
|
}
|
|
try:
|
|
response = self._call_endpoint(url=self._platform_urls[URL.DEPOSER_FLUX], payload=payload)
|
|
except requests.RequestException as e:
|
|
raise self.TransportError(str(e), response=getattr(e, 'response', None))
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
raise self.Error('response is not JSON {response.content[:1024]!r}', response=response)
|
|
|
|
def consulter_cr_facturx(self, numero_flux_depot: str, date_depot: str):
|
|
payload = {
|
|
'numeroFluxDepot': numero_flux_depot,
|
|
'dateDepot': date_depot,
|
|
'syntaxeFlux': 'IN_DP_E2_CII_FACTURX',
|
|
}
|
|
response = self._call_endpoint(url=self._platform_urls[URL.CONSULTER_CR], payload=payload)
|
|
return response.json()
|
|
|
|
class Annuaire:
|
|
@staticmethod
|
|
def camel_to_snake(string):
|
|
string = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', string)
|
|
string = re.sub('(.)([0-9]+)', r'\1_\2', string)
|
|
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', string).lower()
|
|
|
|
@dataclasses.dataclass
|
|
class Service:
|
|
code: str
|
|
gestion_egmt: bool
|
|
service_actif: bool
|
|
nom: str = dataclasses.field(default='') # when empty the code should be take as the name
|
|
adresse_postale: dict = dataclasses.field(default_factory=dict)
|
|
|
|
@dataclasses.dataclass
|
|
class Structure:
|
|
type_identifiant: int
|
|
identifiant: str
|
|
raison_sociale: str
|
|
emetteur_edi: bool
|
|
recepteur_edi: bool
|
|
gestion_statut_mise_en_paiement: bool
|
|
gestion_engagement: bool
|
|
gestion_service: bool
|
|
gestion_service_engagement: bool
|
|
est_moa: bool
|
|
est_moa_uniquement: bool
|
|
structure_active: bool
|
|
adresse_postale: dict = dataclasses.field(default_factory=dict)
|
|
services: list['Service'] = dataclasses.field(default_factory=list)
|
|
|
|
STRUCTURE_UNITAIRE_TAG_NAME = 'CPPStructurePartenaireUnitaire'
|
|
|
|
@classmethod
|
|
def _parse_structure(cls, structure):
|
|
d = {}
|
|
for node in structure:
|
|
if len(node) == 0:
|
|
value = node.text
|
|
elif len({sub.tag for sub in node}) != 1:
|
|
value = cls._parse_structure(node)
|
|
else:
|
|
value = [cls._parse_structure(sub) for sub in node]
|
|
key = cls.camel_to_snake(node.tag)
|
|
d[key] = value
|
|
return d
|
|
|
|
@classmethod
|
|
def parse(cls, fd):
|
|
with zipfile.ZipFile(fd) as zipf:
|
|
for name in zipf.namelist():
|
|
with zipf.open(name) as zip_fd:
|
|
yield from cls._parse_annuaire_destinataire(zip_fd)
|
|
|
|
@classmethod
|
|
def _decode_dict(cls, data, klass):
|
|
kwargs = {}
|
|
fields = {field.name: field for field in dataclasses.fields(klass)}
|
|
for key, value in data.items():
|
|
field = fields[key]
|
|
if field.type in (dict, str):
|
|
pass
|
|
elif field.type is int:
|
|
value = int(value)
|
|
elif field.type is bool:
|
|
value = value == 'true'
|
|
elif (
|
|
getattr(field.type, '__args__', None) and getattr(field.type, '__origin__', None) is list
|
|
):
|
|
field_klass = getattr(cls, field.type.__args__[0])
|
|
value = [cls._decode_dict(subvalue, field_klass) for subvalue in value]
|
|
else:
|
|
raise NotImplementedError(f'unsupported pair {key!r}: {value!r}')
|
|
kwargs[key] = value
|
|
return klass(**kwargs)
|
|
|
|
@classmethod
|
|
def _parse_annuaire_destinataire(cls, fd):
|
|
doc = xml.dom.pulldom.parse(fd)
|
|
for event, node in doc:
|
|
if event == xml.dom.pulldom.START_ELEMENT and node.tagName == cls.STRUCTURE_UNITAIRE_TAG_NAME:
|
|
doc.expandNode(node)
|
|
document = ET.fromstring(node.toxml())
|
|
structure = cls._parse_structure(document)
|
|
yield cls._decode_dict(structure, cls.Structure)
|
|
|
|
def telecharger_annuaire_destinataire(self):
|
|
response = self._call_endpoint(
|
|
url=self._platform_urls[URL.TELECHARGER_ANNUAIRE_DESTINATAIRE], payload={}
|
|
)
|
|
data = response.json()
|
|
if data['codeRetour'] != 0:
|
|
raise self.Error(f'{data["codeRetour"]=} {data["libelle"]=}')
|
|
annuaire_b64 = response.json()['fichierResultat']
|
|
return base64.b64decode(annuaire_b64)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import configparser
|
|
import os.path
|
|
|
|
import click
|
|
|
|
@click.group()
|
|
@click.pass_context
|
|
def chorus(ctx):
|
|
config = configparser.ConfigParser()
|
|
config.read([os.path.expanduser('~/.config/chorus.ini')])
|
|
try:
|
|
platform = config.get('piste', 'platform')
|
|
piste_client_id = config.get('piste', 'client_id')
|
|
piste_client_secret = config.get('piste', 'client_secret')
|
|
chorus_tech_username = config.get('chorus', 'tech_username')
|
|
chorus_tech_password = config.get('chorus', 'tech_password')
|
|
except (configparser.NoSectionError, KeyError) as e:
|
|
raise click.UsageError(f'~/.config/chorus.ini: not initialized, {e!r}')
|
|
if platform not in ChorusAPI.PLATFORMS:
|
|
raise click.UsageError(f'~/.config/chorus.ini: invalid platform value "{platform!r}')
|
|
|
|
chorus_api = ChorusAPI(
|
|
platform=platform,
|
|
piste_client_id=piste_client_id,
|
|
piste_client_secret=piste_client_secret,
|
|
chorus_tech_username=chorus_tech_username,
|
|
chorus_tech_password=chorus_tech_password,
|
|
)
|
|
ctx.ensure_object(dict)
|
|
ctx.obj['api'] = chorus_api
|
|
|
|
@chorus.command()
|
|
@click.argument('output', type=click.File(mode='wb'))
|
|
@click.pass_context
|
|
def download_directory(ctx, output):
|
|
output.write(ctx.obj['api'].telecharger_annuaire_destinataire())
|
|
|
|
@chorus.command()
|
|
@click.argument('directory_zip', type=click.File(mode='rb'))
|
|
@click.pass_context
|
|
def parse_directory(ctx, directory_zip):
|
|
for structure in ChorusAPI.Annuaire.parse(directory_zip):
|
|
if any(service.adresse_postale for service in structure.services):
|
|
pprint.pprint(structure)
|
|
|
|
@chorus.command()
|
|
@click.argument('numero_flux_depot')
|
|
@click.argument('date_depot')
|
|
@click.argument('filename', type=click.File(mode='wb'), required=False)
|
|
@click.pass_context
|
|
def consulter_cr(ctx, numero_flux_depot, date_depot, filename=None):
|
|
compte_rendu = ctx.obj['api'].consulter_cr_facturx(
|
|
numero_flux_depot=numero_flux_depot, date_depot=date_depot
|
|
)
|
|
fichier_cr = compte_rendu.pop('fichierCR', None)
|
|
pprint.pprint(compte_rendu)
|
|
if filename and fichier_cr:
|
|
filename.write(base64.b64decode(fichier_cr))
|
|
|
|
chorus() # pylint: disable=E1120
|