passerelle/passerelle/apps/avis_imposition/models.py

479 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''Interface with "Service de vérification des avis" of impots.gouv.fr
See also:
https://cfsmsp.impots.gouv.fr/secavis/
https://www.economie.gouv.fr/particuliers/authenticite-avis-impot-svair
Original code:
https://github.com/betagouv/svair-api
https://github.com/bdauvergne/svair-api (fork if upstream disapear)
'''
import datetime
import logging
import re
import urllib.parse
import lxml.html
import requests
from django.conf import settings
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.json import unflatten
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.xml import text_content
AVIS_IMPOSITION_GOUV_FR_URL = 'https://cfsmsp.impots.gouv.fr/secavis/'
def remove_spaces(value):
return re.sub(r'\s', '', value, flags=re.U)
def simplify_spaces(value):
return re.sub(r'\s+', ' ', value, flags=re.U)
class NotFound(APIError):
def __init__(self):
super().__init__(_('These fiscal number and tax notice number are unknown.'))
VERIFY_SCHEMA_RESPONSE_NUMBERS = [
"dateEtablissement_year",
"dateRecouvrement_year",
"foyerFiscal__year",
"impotRevenuNetAvantCorrections",
"nombreParts",
"nombrePersonnesCharge",
"revenuBrutGlobal",
"revenuFiscalReference",
"revenuImposable",
]
ISO_DATE_PATTERN = {'type': 'string', 'pattern': r'\d\d\d\d-\d\d-\d\d'}
FR_DATE_PATTERN = {'type': 'string', 'pattern': r'\d\d/\d\d/\d\d\d\d'}
VERIFY_SCHEMA_OTHERS = {
"montantImpot": {
'oneOf': [
{'type': 'string'},
{'enum': ['Nonimposable']},
]
},
"situationFamille_simple": {
'enum': [
'pacs/mariage',
'divorce',
'celibataire',
]
},
"dateEtablissement_iso": ISO_DATE_PATTERN,
"dateRecouvrement_iso": ISO_DATE_PATTERN,
"declarant1__dateNaissance_iso": ISO_DATE_PATTERN,
"declarant2__dateNaissance_iso": ISO_DATE_PATTERN,
"dateEtablissement": FR_DATE_PATTERN,
"dateRecouvrement": FR_DATE_PATTERN,
"declarant1__dateNaissance": FR_DATE_PATTERN,
"declarant2__dateNaissance": FR_DATE_PATTERN,
}
VERIFY_SCHEMA_RESPONSE_STRINGS = [
'numero_fiscal',
'reference_avis',
"declarant1",
"declarant1__nom",
"declarant1__nomNaissance",
"declarant1__prenom",
"declarant2__nom",
"declarant2__nomNaissance",
"declarant2__prenom",
"foyerFiscal__adresse",
"foyerFiscal__adresse1",
"foyerFiscal__adresse2",
"foyerFiscal__adresse3",
"situationFamille",
"situationPartielle",
]
def generate_verify_schema_response():
schema = {
'type': 'object',
}
def set_key(name, value):
parts = name.split('__')
d = schema
for part in parts[:-1]:
d = d.setdefault('properties', {}).setdefault(part, {})
d['type'] = 'object'
d.setdefault('properties', {})[parts[-1]] = value
for key in VERIFY_SCHEMA_RESPONSE_NUMBERS:
set_key(key, {'type': 'number'})
for key in VERIFY_SCHEMA_RESPONSE_STRINGS:
set_key(key, {'type': 'string'})
for key, value in VERIFY_SCHEMA_OTHERS.items():
set_key(key, value)
return schema
VERIFY_RESPONSE_SCHEMA = generate_verify_schema_response()
# Map fields by tr/td indexes
FIELD_INDEXES = {
'declarant1/nom': (1, 1),
'declarant2/nom': (1, 2),
'declarant1/nomNaissance': (2, 1),
'declarant2/nomNaissance': (2, 2),
'declarant1/prenom': (3, 1),
'declarant2/prenom': (3, 2),
'declarant1/dateNaissance': (4, 1),
'declarant2/dateNaissance': (4, 2),
'declarant1/dateNaissance_iso': (4, 1),
'declarant2/dateNaissance_iso': (4, 2),
'foyerFiscal/adresse1': (5, 1),
'foyerFiscal/adresse2': (6, 1),
'foyerFiscal/adresse3': (7, 1),
'dateRecouvrement': (9, 1),
'dateEtablissement': (10, 1),
'dateRecouvrement_iso': (9, 1),
'dateEtablissement_iso': (10, 1),
'dateRecouvrement_year': (9, 1),
'dateEtablissement_year': (10, 1),
'nombreParts': (11, 1),
'situationFamille': (12, 1),
'situationFamille_simple': (12, 1),
'nombrePersonnesCharge': (13, 1),
'revenuBrutGlobal': (14, 1),
'revenuImposable': (15, 1),
'impotRevenuNetAvantCorrections': (16, 1),
'montantImpot': (17, 1),
'revenuFiscalReference': (18, 1),
}
def get_field(name, td_contents, tr_length):
# adjust tr/td indexes based on the number of address lines deduced from
# the number of tr elements
i, j = FIELD_INDEXES[name]
# address has 3 lines
if tr_length == 19:
return td_contents[(i, j)]
# address has 2 lines
if tr_length == 18:
if i < 5:
return td_contents[(i, j)]
if i == 5:
return ''
if i > 5:
return td_contents[(i - 1, j)]
# address has 1 line
if tr_length == 17:
if i < 5:
return td_contents[(i, j)]
if i in (5, 6):
return ''
if i > 6:
return td_contents[(i - 2, j)]
raise NotImplementedError
def nombre_de_parts(value):
try:
return float(value)
except (ValueError, TypeError):
return None
def euros(value):
if not value:
return value
value = remove_spaces(value)
value = value.rstrip('')
try:
return int(value)
except ValueError:
return value
def situation_familiale(value):
if value == 'Pacs\xe9(e)s':
return 'pacs/mariage'
if value == 'Mari\xe9(e)s':
return 'pacs/mariage'
if value == 'Divorc\xe9(e)':
return 'divorce'
if value == 'C\xe9libataire':
return 'celibataire'
raise NotImplementedError(value)
def date(value):
if not value:
return ''
try:
return datetime.datetime.strptime(value, '%d/%m/%Y').date()
except (ValueError, TypeError):
return ''
def year(value):
if not value:
return ''
try:
return datetime.datetime.strptime(value, '%d/%m/%Y').date().year
except (ValueError, TypeError):
return ''
ADAPTERS = {
'nombreParts': nombre_de_parts,
'revenuBrutGlobal': euros,
'revenuImposable': euros,
'impotRevenuNetAvantCorrections': euros,
'revenuFiscalReference': euros,
'montantImpot': euros,
'situationFamille_simple': situation_familiale,
'dateEtablissement_iso': date,
'dateRecouvrement_iso': date,
'dateEtablissement_year': year,
'dateRecouvrement_year': year,
'declarant1/dateNaissance_iso': date,
'declarant2/dateNaissance_iso': date,
'nombrePersonnesCharge': nombre_de_parts,
}
def get_form(session, logger=None):
cache_key = 'avis-imposition-form'
try:
action_url, data = cache.get(cache_key)
except TypeError:
pass
else:
return action_url, data
logger = logger or logging
try:
response = session.get(AVIS_IMPOSITION_GOUV_FR_URL, timeout=15)
response.raise_for_status()
except requests.RequestException as e:
raise APIError('service-is-down', data=str(e))
if 'Saisissez les identifiants' not in response.text:
logger.error(_('https://cfsmsp.impots.gouv.fr/secavis/: service has changed, %s'), response.text)
raise RuntimeError('service has changed')
html = lxml.html.fromstring(response.content)
data = {}
for form_elt in html.xpath('//form'):
if 'action' not in form_elt.attrib:
continue
action_url = form_elt.attrib['action']
break
else:
logger.error(
_('https://cfsmsp.impots.gouv.fr/secavis/: service has changed, form[action] not found, %s'),
response.text,
)
raise RuntimeError('service has changed')
logger.debug('using found action_url %s', action_url)
fields = ['j_id_7:spi', 'j_id_7:num_facture']
for input_elt in html.xpath('//input'):
if 'value' in input_elt.attrib or input_elt.attrib['name'] in fields:
data[input_elt.attrib['name']] = input_elt.attrib.get('value', '')
for field in fields:
if field not in data:
logger.error(
_('https://cfsmsp.impots.gouv.fr/secavis/: service has changed, field %s not found, %s'),
field,
response.text,
)
raise RuntimeError('service has changed')
cache.set(cache_key, (action_url, data), 180)
return action_url, data
def get_avis_imposition(session, numero_fiscal, reference_avis, logger=None):
logger = logger or logging
action_url, data = get_form(session, logger=logger)
data['j_id_7:spi'] = numero_fiscal
data['j_id_7:num_facture'] = reference_avis
url = urllib.parse.urljoin(AVIS_IMPOSITION_GOUV_FR_URL, action_url)
try:
response = session.post(url, params=data)
response.raise_for_status()
except requests.RequestException as e:
raise APIError('service-is-down', data=str(e))
if 'Saisissez les identifiants' in response.text:
raise NotFound
response_html = lxml.html.fromstring(response.content)
td_contents = {}
tr_elements = list(response_html.xpath('//tr'))
for i, tr_element in enumerate(tr_elements):
for j, td_element in enumerate(tr_element.xpath('td')):
td_contents[(i, j)] = simplify_spaces(text_content(td_element).strip())
logger.debug('got td_contents %s', td_contents)
data = {
# https://github.com/betagouv/svair-api/blob/master/utils/year.js
'foyerFiscal/year': int('20' + reference_avis[:2]),
}
for field, coordinates in FIELD_INDEXES.items():
try:
data[field] = get_field(field, td_contents, len(tr_elements))
except IndexError:
logger.error(
_(
'https://cfsmsp.impots.gouv.fr/secavis/: service has changed, data field %s(%s) not found, %s'
),
field,
coordinates,
response.text,
)
raise RuntimeError('service has changed')
if field in ADAPTERS:
data[field] = ADAPTERS[field](data[field])
for situation_partielle_elt in response_html.xpath('//*[@id="situationPartielle"]'):
data['situationPartielle'] = text_content(situation_partielle_elt).strip()
break
# add reference number to result
data['numero_fiscal'] = numero_fiscal
data['reference_avis'] = reference_avis
# restore structure to the data
unflattened = unflatten(data)
# flatten address for simple use case
foyer_fiscal = unflattened['foyerFiscal']
foyer_fiscal['adresse'] = ' '.join(
value for key, value in sorted(foyer_fiscal.items()) if key.startswith('adresse') and value
)
return unflattened
def get_fake_data(numero_fiscal, reference_avis):
fake_data = getattr(settings, 'AVIS_IMPOSITION_FAKE_DATA', [])
for data in fake_data or []:
if data['numero_fiscal'] == numero_fiscal and data['reference_avis'] == reference_avis:
return data
class AvisImposition(BaseResource):
log_requests_errors = True
requests_timeout = 10
requests_max_retries = {
'total': 5,
'backoff_factor': 0.5,
'allowed_methods': ['GET', 'POST'],
# retry after: 0.5, 1.5 and 3.5 seconds
'status_forcelist': [413, 429, 503, 504],
}
category = _('Business Process Connectors')
class Meta:
verbose_name = _('French tax notice validator on impots.gouv.fr')
@endpoint(
perm='can_access',
description=_('Get citizen\'s fiscal informations'),
parameters={
'numero_fiscal': {
'description': _('fiscal identifier'),
'example_value': '1562456789521',
},
'reference_avis': {
'description': _('tax notice number'),
'example_value': '1512456789521',
},
},
json_schema_response=VERIFY_RESPONSE_SCHEMA,
)
def verify(self, request, numero_fiscal, reference_avis):
numero_fiscal = remove_spaces(numero_fiscal)
reference_avis = remove_spaces(reference_avis)
if not (
# See.
# https://fr.wikipedia.org/wiki/Num%C3%A9ro_d'immatriculation_fiscale#France
# a 13 digits decimal number
len(numero_fiscal) == 13
and numero_fiscal.isascii()
and numero_fiscal.isdigit()
# No clear information on the format, it seems to be a 13 digits
# decimal number too from examples. As we aren´t sure we will ask
# for a more than 10 digits long alphanumeric string.
and len(reference_avis) >= 10
and reference_avis.isascii()
and reference_avis.isalnum()
):
raise APIError(_('Fiscal identifier or tax notice number are invalid.'))
cache_key = f'avis-imposition-{numero_fiscal}-{reference_avis}'
data = cache.get(cache_key)
if data == 'not-found':
raise NotFound
if not data:
# use test vectors...
data = get_fake_data(numero_fiscal=numero_fiscal, reference_avis=reference_avis)
# or the real thing.
try:
data = data or get_avis_imposition(
self.requests, numero_fiscal=numero_fiscal, reference_avis=reference_avis
)
except NotFound:
cache.set(cache_key, 'not-found', 60)
raise
# service is slow, cache successful response for 12 hours
cache.set(cache_key, data, 3600 * 12)
return {'data': data}
def check_status(self):
get_form(self.requests)
def get_test_data(self):
fake_data = getattr(settings, 'AVIS_IMPOSITION_FAKE_DATA', [])
result = []
for data in fake_data or []:
try:
numero_fiscal = data['numero_fiscal']
reference_avis = data['reference_avis']
revenu_fiscal_de_reference = data['revenuFiscalReference']
except Exception:
pass
result.append((numero_fiscal, reference_avis, revenu_fiscal_de_reference))
return result