passerelle/passerelle/apps/api_entreprise/models.py

471 lines
18 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''Gateway to API-Entreprise web-service from SGMAP:
https://entreprise.api.gouv.fr
'''
import urllib.parse
import requests
from django.core import signing
from django.db import models
from django.http import Http404, HttpResponse
from django.urls import reverse
from django.utils.timezone import datetime, make_aware, timedelta
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError, exception_to_text
from passerelle.views import WrongParameter
from .utils import levenshtein_match, simple_match
DOCUMENT_SIGNATURE_MAX_AGE = timedelta(days=7)
def normalize_results(data):
timestamp_to_datetime = {}
for key in data:
if isinstance(data[key], dict):
normalize_results(data[key])
if isinstance(data[key], list):
if data[key] and isinstance(data[key][0], str):
# keep only the first value
data[key] = data[key][0]
else:
for item in data[key]:
if isinstance(item, dict):
normalize_results(item)
if key.startswith('date') and not key.endswith('timestamp'):
if isinstance(data[key], int):
try:
data[key] = datetime.fromtimestamp(int(data[key])).date()
except (ValueError, TypeError):
pass
if key.endswith('timestamp'):
# timestamps can be integers or strings or null
# convert only if it's a positive integer
try:
tstamp = int(data[key])
except (ValueError, TypeError):
pass
else:
if tstamp > 0:
try:
aware_date = make_aware(datetime.fromtimestamp(int(data[key])))
timestamp_to_datetime[key[: -len('timestamp')] + 'datetime'] = aware_date
except (ValueError, TypeError):
pass
# add converted timestamps to initial data
data.update(timestamp_to_datetime)
class APIEntreprise(BaseResource):
log_requests_errors = False
url = models.URLField(_('API URL'), max_length=256, default='https://entreprise.api.gouv.fr/')
token = models.CharField(max_length=1024, verbose_name=_('API token'))
recipient = models.CharField(
max_length=1024, verbose_name=_('Recipient'), blank=False, help_text=_('default value')
)
category = _('Business Process Connectors')
class Meta:
verbose_name = _('API Entreprise')
def get(self, path, raw=False, **kwargs):
params = {'token': self.token}
for param in ('context', 'object'):
if not kwargs.get(param):
raise WrongParameter([param], [])
params[param] = kwargs[param]
params['recipient'] = kwargs.get('recipient') or self.recipient
if kwargs.get('non_diffusables'):
params['non_diffusables'] = 'true'
url = urllib.parse.urljoin(self.url, path)
try:
response = self.requests.get(url, data=params, cache_duration=300)
except requests.RequestException as e:
raise APIError('API-entreprise connection error: %s' % exception_to_text(e), data=[])
try:
data = response.json()
except ValueError as e:
content = response.text[:1000]
raise APIError(
'API-entreprise returned non-JSON content with status %s: %s'
% (response.status_code, content),
data={
'status_code': response.status_code,
'exception': exception_to_text(e),
'content': content,
},
)
if response.status_code != 200:
if data.get('error') == 'not_found':
raise APIError(
data.get('message', 'not-found'),
data={
'status_code': response.status_code,
},
)
raise APIError(
'API-entreprise returned a non 200 status %s: %s' % (response.status_code, data),
data={
'status_code': response.status_code,
'content': data,
},
)
normalize_results(data)
if raw:
return data
return {
'err': 0,
'data': data,
}
# description of common endpoint parameters
ASSOCIATION_PARAM = {
'description': _('association SIRET or RNA/WALDEC number'),
'example_value': '44317013900036',
}
CONTEXT_PARAM = {'description': _('request context: MPS, APS...'), 'example_value': 'APS'}
MONTH_PARAM = {
'description': _('requested month'),
'example_value': '02',
}
OBJECT_PARAM = {
'description': _('request object: form number, file identifier...'),
'example_value': '42',
}
RECIPIENT_PARAM = {
'description': _('request recipient: usually customer number'),
'example_value': '44317013900036',
}
SIREN_PARAM = {
'description': _('firm SIREN number'),
'example_value': '443170139',
}
SIRET_PARAM = {'description': _('firms SIRET number'), 'example_value': '44317013900036'}
YEAR_PARAM = {
'description': _('requested year'),
'example_value': '2019',
}
FIRST_NAME_PARAM = {'description': _('matched user\'s first name'), 'example_value': 'John'}
LAST_NAME_PARAM = {'description': _('matched user\'s last name'), 'example_value': 'Doe'}
BIRTHDATE_PARAM = {'description': _('matched user\'s YYYYDDMM birthdate'), 'example_value': '19700131'}
METHOD_PARAM = {'description': _('method used for user identity matching'), 'example_value': 'simple'}
@endpoint(
perm='can_access',
pattern=r'(?P<association_id>\w+)/$',
example_pattern='{association_id}/',
description=_('Get association\'s documents'),
parameters={
'association_id': ASSOCIATION_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def documents_associations(self, request, association_id, **kwargs):
data = []
resp = self.get(
'v3/ministere_interieur/rna/associations/%s/documents' % association_id, raw=True, **kwargs
)
for _item in resp.get('data', []):
# ignore documents with no type
item = _item.get('data', {})
if not item.get('type'):
continue
signature_elements = {
'url': item['url'],
'context': kwargs['context'],
'object': kwargs['object'],
'recipient': kwargs['recipient'],
}
signature = signing.dumps(signature_elements)
document_url = request.build_absolute_uri(
reverse(
'generic-endpoint',
kwargs={
'connector': self.get_connector_slug(),
'slug': self.slug,
'endpoint': 'document',
'rest': '%s/%s/' % (association_id, signature),
},
)
)
item['id'] = item['timestamp']
item['text'] = item['type']
item['url'] = document_url
data.append(item)
# sort data by date
data.sort(key=lambda i: i['id'])
return {'err': 0, 'data': data}
@endpoint(
pattern=r'(?P<association_id>\w+)/(?P<document_id>[\:\w-]+)/$',
example_pattern='{association_id}/{document_id}/',
description=_('Get association\'s document'),
parameters={
'association_id': ASSOCIATION_PARAM,
'document_id': {
'description': _('document id'),
'example_value': 'A1500660325',
},
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def document(self, request, association_id, document_id, **kwargs):
try:
params = signing.loads(document_id, max_age=DOCUMENT_SIGNATURE_MAX_AGE)
except signing.BadSignature:
raise Http404('document not found')
response = self.requests.get(params['url'])
if response.ok:
return HttpResponse(response, content_type='application/pdf')
raise Http404('document not found')
@endpoint(
name='document_association',
pattern=r'(?P<association_id>\w+)/get-last/$',
example_pattern='{association_id}/get-last/',
description=_('Get association\'s last document of type'),
parameters={
'association_id': ASSOCIATION_PARAM,
'document_type': {
'description': _('document type'),
'example_value': 'Statuts',
},
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def get_last_document_of_type(self, request, association_id, document_type, **kwargs):
document = None
resp = self.get(
'v3/ministere_interieur/rna/associations/%s/documents' % association_id, raw=True, **kwargs
)
documents = [
item.get('data')
for item in resp.get('data', [])
if item.get('data', {}).get('type') == document_type
]
if documents:
documents.sort(key=lambda doc: doc['timestamp'])
document = documents[-1]
return {'data': document}
@endpoint(
perm='can_access',
pattern=r'(?P<siren>\w+)/$',
example_pattern='{siren}/',
description=_('Get firm\'s data from Infogreffe'),
parameters={
'siren': SIREN_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def extraits_rcs(self, request, siren, **kwargs):
raw_data = self.get('v3/infogreffe/rcs/unites_legales/%s/extrait_kbis' % siren, raw=True, **kwargs)
return {'data': raw_data['data']}
@endpoint(
perm='can_access',
pattern=r'(?P<association_id>\w+)/$',
example_pattern='{association_id}/',
description=_('Get association\'s related informations'),
parameters={
'association_id': ASSOCIATION_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def associations(self, request, association_id, **kwargs):
raw_data = self.get('v3/ministere_interieur/rna/associations/%s' % association_id, raw=True, **kwargs)
res = {}
res['association'] = raw_data['data']
res['association']['id'] = res['association']['rna_id']
return {'data': res}
@endpoint(
perm='can_access',
pattern=r'(?P<siren>\w+)/$',
example_pattern='{siren}/',
description=_('Get firm\'s related informations'),
parameters={
'siren': SIREN_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
'include_private': {'description': _('Include private informations'), 'example_value': 'true'},
'mandataires': {'description': _('Include mandataires informations'), 'example_value': 'true'},
},
)
def entreprises(self, request, siren, include_private=False, mandataires=False, **kwargs):
if len(siren) != 9:
raise APIError(_('invalid SIREN length (must be 9 characters)'))
unite_legale_url = 'v3/insee/sirene/unites_legales/diffusibles/%s' % siren
if include_private:
unite_legale_url = 'v3/insee/sirene/unites_legales/%s' % siren
data = self.get(unite_legale_url, raw=True, **kwargs).get('data')
forme_juridique = data.get('forme_juridique', {}).get('libelle', '')
forme_juridique_code = data.get('forme_juridique', {}).get('code', '')
data['forme_juridique'] = forme_juridique
data['forme_juridique_code'] = forme_juridique_code
naf = data.get('activite_principale', {}).get('code', '')
data['naf_entreprise'] = naf.replace('.', '')
data['naf_point_entreprise'] = naf
data['libelle_naf_entreprise'] = data.get('activite_principale', {}).get('libelle', '')
data['raison_sociale'] = data.get('personne_morale_attributs', {}).get('raison_sociale', '')
data['tranche_effectif_salarie_entreprise'] = data.get('tranche_effectif_salarie', {})
raw_siege_data = self.get('%s/siege_social' % unite_legale_url, raw=True, **kwargs)
siege_data = raw_siege_data.get('data')
adresse = siege_data.get('adresse', {})
adresse['nom_voie'] = adresse.get('libelle_voie', '')
adresse['localite'] = adresse.get('libelle_commune', '')
adresse['code_insee_localite'] = adresse.get('code_commune', '')
for i in range(1, 8):
adresse['l%s' % i] = adresse.get('acheminement_postal', {}).get('l%s' % i)
siege_data['adresse'] = adresse
meta = raw_siege_data.get('meta')
siege_data['date_mise_a_jour'] = meta.get('date_derniere_mise_a_jour', '')
mandataires_sociaux = []
if mandataires:
mandataires_data = self.get(
'v3/infogreffe/rcs/unites_legales/%s/mandataires_sociaux' % siren, raw=True, **kwargs
).get('data')
for mandataire in mandataires_data:
for key in ('nom', 'prenom', 'fonction'):
if key not in mandataire:
mandataire[key] = ''
mandataires_sociaux.append(mandataire)
data['mandataires_sociaux'] = mandataires_sociaux
return {'data': {'entreprise': data, 'etablissement_siege': siege_data}}
@endpoint(
perm='can_access',
methods=['get'],
pattern=r'(?P<siret>\w+)/$',
example_pattern='{siret}/',
description_get=_('Get firms\'s related informations'),
parameters={
'siret': SIRET_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def etablissements(self, request, siret, **kwargs):
raw_data = self.get('v3/insee/sirene/etablissements/diffusibles/%s' % siret, raw=True, **kwargs)
res = {}
data = raw_data.get('data')
naf = data.get('activite_principale', {}).get('code', '')
libelle_naf = data.get('activite_principale', {}).get('libelle', '')
data['naf'] = naf.replace('.', '')
data['naf_point'] = naf
data['libelle_naf'] = libelle_naf
data['date_creation_etablissement'] = data['date_creation']
localite = data.get('adresse', {}).get('libelle_commune', '')
code_insee_localite = data.get('adresse', {}).get('code_commune', '')
data['adresse']['localite'] = localite
data['adresse']['code_insee_localite'] = code_insee_localite
data['adresse']['nom_voie'] = data.get('adresse', {}).get('libelle_voie', '')
data['commune_implantation'] = {'code': code_insee_localite, 'value': localite}
data['tranche_effectif_salarie_etablissement'] = data.get('tranche_effectif_salarie', {})
for i in range(1, 8):
data['adresse']['l%s' % i] = data['adresse'].get('acheminement_postal', {}).get('l%s' % i)
res['etablissement'] = data
return {'data': res}
@endpoint(
perm='can_access',
methods=['get'],
pattern=r'(?P<siret>\w+)/$',
example_pattern='{siret}/',
description_get=_('Get firms\'s financial year informations'),
parameters={
'siret': SIRET_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def exercices(self, request, siret, **kwargs):
return self.get('v3/dgfip/etablissements/%s/chiffres_affaires' % siret, raw=True, **kwargs)
@endpoint(
perm='can_access',
pattern=r'(?P<siren>\w+)/$',
description=_(
'Match firm\'s society representative against local FranceConnect identity information'
),
parameters={
'siren': SIREN_PARAM,
'first_name': FIRST_NAME_PARAM,
'last_name': LAST_NAME_PARAM,
'birthdate': BIRTHDATE_PARAM,
'method': METHOD_PARAM,
'object': OBJECT_PARAM,
'context': CONTEXT_PARAM,
'recipient': RECIPIENT_PARAM,
},
)
def match_mandataire_social(
self, request, siren, first_name, last_name, birthdate, method='simple', **kwargs
):
mandataires = self.get(
'v3/infogreffe/rcs/unites_legales/%s/mandataires_sociaux' % siren, raw=True, **kwargs
).get('data', [])
methods = {
'simple': simple_match,
'levenshtein': levenshtein_match,
}
if method not in methods:
return {'err': 1, 'err_desc': 'method %s not implemented' % method}
for mandataire in mandataires:
if methods[method](mandataire, first_name, last_name, birthdate):
return {'err': 0, 'data': mandataire}
return {'err': 0, 'data': {}}