passerelle/passerelle/apps/api_impot_particulier/models.py

307 lines
11 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import hashlib
import uuid
from urllib.parse import urljoin
import requests
from django.core.cache import cache
from django.db import models
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.timeout import Timeout
class ServiceIsDown(APIError):
def __init__(self):
super().__init__(_('API Impot Particulier service is unavailable'))
def __str__(self):
if self.__context__:
return f'{super().__str__()}: {self.__context__}'
return super().__str__()
class Resource(BaseResource):
api_url = models.URLField(
_('DGFIP API base URL'),
max_length=256,
default='https://gw.dgfip.finances.gouv.fr/impotparticulier/1.0',
)
oauth_username = models.CharField(_('DGFIP API Username'), max_length=128)
oauth_password = models.CharField(_('DGFIP API Password'), max_length=128)
oauth_scopes = models.CharField(_('DGFIP API Scopes'), max_length=128, blank=True)
id_teleservice = models.TextField(_('DGFIP API ID_Teleservice'), max_length=128, blank=True)
log_requests_errors = False
requests_timeout = 30
requests_max_retries = {
'total': 3,
'backoff_factor': 0.5,
'allowed_methods': ['GET', 'POST'],
# retry after: 0.5, 1.5 and 3.5 seconds
'status_forcelist': [413, 429, 503, 504],
}
category = _('Business Process Connectors')
class Meta:
verbose_name = _('API Impot Particulier')
@classmethod
def parse_numero_fiscal(cls, value):
value = value.strip().replace(' ', '')
if not (value and value.isascii() and value.isdigit()):
raise APIError(_('invalid numero_fiscal'))
return value
@classmethod
def parse_annee_de_revenu(cls, value):
try:
value = int(value)
except (TypeError, ValueError):
raise APIError(_('invalid annee_de_revenu'))
today = datetime.date.today()
if not (0 < today.year - value < 10):
raise APIError(_('invalid annee_de_revenu'))
return value
@endpoint(
name='spi-situations-ir-assiettes-annrev',
description=_('Provides revenue tax situation for a specific year.'),
parameters={
'numero_fiscal': {
'description': _('Tax number of the person'),
},
'annee_de_revenu': {
'description': _('Income year'),
},
},
)
def spi_situations_ir_assiettes_annrev(self, request, numero_fiscal, annee_de_revenu):
numero_fiscal = self.parse_numero_fiscal(numero_fiscal)
annee_de_revenu = self.parse_annee_de_revenu(annee_de_revenu)
return {
'data': self.get_spi_situations_ir_assiettes_annrev(
numero_fiscal=numero_fiscal, annee_de_revenu=annee_de_revenu, timeout=Timeout(20)
)
}
def get_spi_situations_ir_assiettes_annrev(self, numero_fiscal, annee_de_revenu, timeout=None):
return self.call(
name='spi-situations-ir-assiettes-deuxans',
endpoint_template='spi/{spi}/situations/ir/assiettes/annrev/{annrev}',
timeout=timeout,
spi=numero_fiscal,
annrev=annee_de_revenu,
accept='application/prs.dgfip.part.situations.ir.assiettes.v1+json',
)
@endpoint(
name='spi-situations-th-assiettes-principale-annrev',
description=_('Provides housing tax situation for a specific year.'),
parameters={
'numero_fiscal': {
'description': _('Tax number of the person'),
},
'annee_de_revenu': {
'description': _('Income year'),
},
},
)
def spi_situations_th_assiettes_principale_annrev(self, request, numero_fiscal, annee_de_revenu):
numero_fiscal = self.parse_numero_fiscal(numero_fiscal)
annee_de_revenu = self.parse_annee_de_revenu(annee_de_revenu)
return {
'data': self.get_spi_situations_th_assiettes_principale_annrev(
numero_fiscal=numero_fiscal, annee_de_revenu=annee_de_revenu, timeout=Timeout(20)
)
}
def get_spi_situations_th_assiettes_principale_annrev(self, numero_fiscal, annee_de_revenu, timeout=None):
return self.call(
name='spi-situations-th-assiettes-principale-deuxans',
endpoint_template='spi/{spi}/situations/th/assiettes/principale/annrev/{annrev}',
timeout=timeout,
spi=numero_fiscal,
annrev=annee_de_revenu,
accept='application/prs.dgfip.part.situations.th.assiettes.v1+json',
)
def call(self, name, endpoint_template, timeout=None, **kwargs):
correlation_id = str(uuid.uuid4().hex)
kwargs_formatted = ', '.join(f'{key}={value}' for key, value in kwargs.items())
try:
data = self.get_tax_data(
session=self.requests,
base_url=self.api_url,
access_token=self._get_access_token(timeout=timeout),
correlation_id=correlation_id,
endpoint_template=endpoint_template,
id_teleservice=self.id_teleservice,
timeout=timeout,
**kwargs,
)
except ServiceIsDown as e:
self.logger.warning(
'%s(%s) failed: %s',
name,
kwargs_formatted,
e,
extra={
'correlation_id': correlation_id,
'id_teleservice': self.id_teleservice,
'kwargs': kwargs,
},
)
raise
else:
self.logger.warning(
'%s(%s) success',
name,
kwargs_formatted,
extra={
'data': data,
'correlation_id': correlation_id,
'id_teleservice': self.id_teleservice,
'kwargs': kwargs,
},
)
return data
@classmethod
def get_tax_data(
cls,
session,
base_url,
access_token,
correlation_id,
endpoint_template,
accept,
id_teleservice=None,
headers=None,
timeout=None,
**kwargs,
):
headers = {
**(headers or {}),
'Authorization': f'Bearer {access_token}',
'X-Correlation-ID': correlation_id,
'Accept': accept,
}
if id_teleservice:
headers['ID_Teleservice'] = id_teleservice
endpoint = endpoint_template.format(**kwargs)
if not base_url.endswith('/'):
base_url += '/'
url = urljoin(base_url, endpoint)
if timeout is not None:
timeout = float(timeout)
# api-impot-particulier error reporting is byzantine, some errors are
# accompanied by a 4xx code, some others with a 20x code, some have a
# JSON content, other are only identified by a codeapp header on
# the response
try:
response = session.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
except requests.HTTPError:
try:
content = response.json()['erreur']
except (ValueError, KeyError):
try:
raise APIError(
'api-impot-particulier error', data={'codeapp': response.headers['codeapp']}
)
except KeyError:
pass
raise ServiceIsDown
raise APIError('api-impot-particulier-error', data=content)
except requests.RequestException:
raise ServiceIsDown
if response.status_code != 200:
try:
content = response.json()['erreur']
except (ValueError, KeyError):
try:
raise APIError(
'api-impot-particulier-error', data={'codeapp': response.headers['codeapp']}
)
except KeyError:
raise ServiceIsDown
raise APIError('api-impot-particulier error', data=content)
try:
response_data = response.json()
except ValueError:
raise ServiceIsDown
return response_data
def _get_access_token(self, timeout=None):
key = (
'dgfip-at-'
+ hashlib.sha256(
f'{self.oauth_username}-{self.oauth_password}-{self.api_url}'.encode()
).hexdigest()
)
access_token = cache.get(key)
if not access_token:
access_token = self.get_access_token(
session=self.requests,
base_url=self.api_url,
username=self.oauth_username,
password=self.oauth_password,
scope=self.oauth_scopes,
timeout=timeout,
)
cache.set(key, access_token, 300)
return access_token
@classmethod
def get_access_token(cls, session, base_url, username, password, scope, timeout=None):
data = {
'grant_type': 'client_credentials',
}
if scope:
data['scope'] = scope
url = urljoin(base_url, '/token')
if timeout is not None:
timeout = float(timeout)
try:
response = session.post(url, data=data, auth=(username, password), timeout=timeout)
response.raise_for_status()
except requests.RequestException:
raise ServiceIsDown
try:
response_data = response.json()
access_token = response_data['access_token']
response_data = response.json()
except (ValueError, KeyError, TypeError):
raise ServiceIsDown
return access_token