passerelle/passerelle/apps/franceconnect_data/fc.py

210 lines
7.8 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import hashlib
import json
import urllib.parse
import uuid
import requests
from django.utils.translation import gettext_lazy as _
class FranceConnectError(Exception):
def __init__(self, message, **kwargs):
self.data = tuple(kwargs.items())
super().__init__(message)
class Test:
slug = 'test'
name = _('Testing')
authorize_url = 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize'
token_endpoint_url = 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token'
user_info_endpoint_url = 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo'
logout_url = 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout'
class Prod:
slug = 'prod'
name = _('Production')
authorize_url = 'https://app.franceconnect.gouv.fr/api/v1/authorize'
token_endpoint_url = 'https://app.franceconnect.gouv.fr/api/v1/token'
user_info_endpoint_url = 'https://app.franceconnect.gouv.fr/api/v1/userinfo'
logout_url = 'https://app.franceconnect.gouv.fr/api/v1/logout'
PLATFORMS = [Test, Prod]
PLATFORMS_BY_SLUG = {platform.slug: platform for platform in PLATFORMS}
def base64url_decode(input):
rem = len(input) % 4
if rem > 0:
input += b'=' * (4 - rem)
return base64.urlsafe_b64decode(input)
class FranceConnect:
def __init__(self, session, logger, dgfip_api_base_url):
self.session = session
self.logger = logger
self.dgfip_api_base_url = dgfip_api_base_url
self.items = []
self.correlation_id = str(uuid.uuid4())
def authorization_request(self, platform, client_id, scopes, redirect_uri, acr_values='eidas1'):
'''Launch an authorization request to FranceConnect'''
qs = urllib.parse.urlencode(
{
'response_type': 'code',
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': 'openid ' + scopes,
'state': str(uuid.uuid4()),
'nonce': str(uuid.uuid4()),
'acr_values': acr_values,
}
)
return '%s?%s' % (platform.authorize_url, qs)
def handle_authorization_response(
self, platform, client_id, client_secret, redirect_uri, code, error, error_description
):
if error:
raise FranceConnectError(
'No authorization code', error=error, error_description=error_description
)
data = {
'grant_type': 'authorization_code',
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
'code': code,
}
response_content = self.request('token endpoint', 'POST', platform.token_endpoint_url, data=data)
try:
self.add('fc_token_endpoint_response', response_content)
self.add('fc_access_token', response_content['access_token'])
self.add('fc_id_token', response_content['id_token'])
dummy, payload, dummy = self.fc_id_token.split('.')
self.add('fc_id_token_payload', json.loads(base64url_decode(payload.encode())))
except Exception as e:
raise FranceConnectError('Error in token endpoint response', sub_exception=repr(e))
fc_user_info = self.request(
'user_info endpoint',
'GET',
platform.user_info_endpoint_url,
headers={'Authorization': 'Bearer %s' % self.fc_access_token},
)
fc_user_info['hash'] = self.make_identite_pivot_sha256(fc_user_info)
self.add('fc_user_info', fc_user_info)
def make_identite_pivot_sha256(self, user_info):
parts = []
for key in ['given_name', 'family_name', 'birthdate', 'gender', 'birthplace', 'birthcountry']:
if not user_info.get(key):
return None
parts.append('%s=%s' % (key, user_info.get(key) or ''))
data = '\n'.join(parts)
return hashlib.sha256(data.encode()).hexdigest().lower()
def request_dgfip_access_token(self, dgfip_username, dgfip_password, scope=None):
data = {
'grant_type': 'client_credentials',
}
if scope:
data['scope'] = scope
dgfip_response = self.request(
'dgfip token endpoint',
'POST',
'token',
data=data,
auth=(dgfip_username, dgfip_password),
)
self.add('dgfip_token_endpoint_response', dgfip_response)
try:
dgfip_access_token = dgfip_response['access_token']
except (TypeError, KeyError) as e:
raise FranceConnectError('dgfip token endpoint error %s' % e, response=dgfip_response)
self.add('dgfip_access_token', dgfip_access_token)
def request_dgfip_ir(self, annrev, id_teleservice=None):
headers = {
'Authorization': 'Bearer %s' % self.dgfip_access_token,
'X-FranceConnect-OAuth': self.fc_access_token,
'X-Correlation-ID': str(uuid.uuid4()),
'Accept': 'application/prs.dgfip.part.situations.ir.assiettes.v1+json',
}
if id_teleservice:
headers['ID_Teleservice'] = id_teleservice
try:
dgfip_ressource_ir_response = self.request(
'ressource IR endpoint',
'GET',
'impotparticulier/1.0/situations/ir/assiettes/annrev/%s' % annrev,
headers=headers,
)
except FranceConnectError as e:
dgfip_ressource_ir_response = {'error_desc': str(e), 'error': e.data}
# accumulate data
try:
data = self.dgfip_ressource_ir_response
except AttributeError:
data = {}
data[annrev] = dgfip_ressource_ir_response
self.add('dgfip_ressource_ir_response', data)
def __getattr__(self, name):
try:
return dict(self.items)[name]
except KeyError:
raise AttributeError(name)
def add(self, key, value):
self.items.append((key, value))
def request(self, label, method, endpoint, *args, **kwargs):
self.logger.debug('request %s %s args:%s kwargs:%s', label, method, args, kwargs)
url = urllib.parse.urljoin(self.dgfip_api_base_url, endpoint)
self.add(label.replace(' ', '_') + '_request', [method, url, args, kwargs])
try:
response = getattr(self.session, method.lower())(url, *args, **kwargs)
try:
response_content = response.json()
except ValueError:
response_content = response.text[:1024]
response.raise_for_status()
raise
else:
response.raise_for_status()
except requests.HTTPError as e:
raise FranceConnectError('%s error %s' % (label, e), response=response_content)
except requests.RequestException as e:
raise FranceConnectError('%s error %s' % (label, e))
except ValueError as e:
raise FranceConnectError('%s error %s' % (label, e), response=response_content)
return response_content