passerelle/passerelle/apps/franceconnect_data/models.py

285 lines
11 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import uuid
from django.contrib.postgres.fields import JSONField
from django.core.exceptions import PermissionDenied
from django.db import models
from django.http import HttpResponseBadRequest, HttpResponseRedirect
from django.template import Context, Template
from django.template.response import TemplateResponse
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils import get_trusted_services
from passerelle.utils.api import endpoint
from passerelle.utils.origin import get_url_origin, is_same_origin
from . import fc
# from passerelle.utils.jsonresponse import APIError
class Resource(BaseResource):
category = _('Data Sources')
fc_platform_slug = models.CharField(
_('FranceConnect platform'),
max_length=4,
choices=[(platform.slug, platform.name) for platform in fc.PLATFORMS],
)
fc_client_id = models.CharField(_('FranceConnect client_id'), max_length=64)
fc_client_secret = models.CharField(_('FranceConnect client_secret'), max_length=64)
fc_scopes = models.TextField(_('FranceConnect scopes'), default='identite_pivot')
text_template = models.TextField(
_('Text template'),
default=(
'''{{ fc.given_name }} {{ fc.family_name }} '''
'''{% if fc.gender == 'male' %}né{% else %}née{% endif %} le {{ fc.birthdate }}'''
),
)
dgfip_api_base_url = models.URLField(
_('DGFIP API base URL'), max_length=256, default='https://gwfc.dgfip.finances.gouv.fr/'
)
dgfip_username = models.CharField(_('DGFIP API Username'), max_length=64, blank=True, null=True)
dgfip_password = models.CharField(_('DGFIP API Password'), max_length=64, blank=True, null=True)
dgfip_scopes = models.TextField(_('DGFIP API Scopes'), blank=True, null=True)
dgfip_id_teleservice = models.TextField(_('DGFIP API ID_Teleservice'), blank=True, null=True)
log_requests_errors = False
class Meta:
verbose_name = _('Data sources through FranceConnect')
@property
def fc_platform(self):
return fc.PLATFORMS_BY_SLUG[self.fc_platform_slug]
def build_callback_url(self, request, **kwargs):
redirect_uri = request.build_absolute_uri(
reverse(
'generic-endpoint',
kwargs={'slug': self.slug, 'connector': self.get_connector_slug(), 'endpoint': 'callback'},
)
)
if kwargs:
redirect_uri += '?' + urlencode(
{key: value for key, value in kwargs.items() if value is not None}
)
return redirect_uri
def is_trusted_origin(self, request, origin):
for service in get_trusted_services():
if is_same_origin(origin, service['url']):
return True
if is_same_origin(request.build_absolute_uri(), origin):
return True
return False
@endpoint(
description=_('Init request'),
parameters={
'mode': {
'description': _('What to retrieve, default to FranceConnect identity, can be "dgfip"'),
},
'origin': {
'description': _('Origin for returning results through window.postMessage'),
},
'test': {
'description': _('If set to one, activate the test callback view.'),
},
},
)
def init_request(self, request, origin, mode=None, test=None):
if not request.user.is_superuser and not self.is_trusted_origin(request, origin):
return HttpResponseBadRequest('Missing or invalid origin')
redirect_uri = self.build_callback_url(request, origin=origin, mode=mode, test=test)
franceconnect = fc.FranceConnect(
session=self.requests, logger=self.logger, dgfip_api_base_url=self.dgfip_api_base_url
)
return HttpResponseRedirect(
franceconnect.authorization_request(
platform=self.fc_platform,
client_id=self.fc_client_id,
scopes=self.fc_scopes,
redirect_uri=redirect_uri,
)
)
@endpoint(
description=_('FranceConnect callback (internal use)'),
parameters={
'origin': {
'description': _('HTTP Origin, needed to secure window.postMessage'),
},
'mode': {
'description': _('Mode'),
},
'test': {
'description': _('Use test mode (to see exchanges)'),
},
},
)
def callback(self, request, origin, mode=None, test=None, **kwargs):
if not request.user.is_superuser and not self.is_trusted_origin(request, origin):
return HttpResponseBadRequest('Missing or invalid origin.')
if test and not request.user.is_superuser:
return HttpResponseBadRequest('Only admin can use test mode.')
franceconnect = fc.FranceConnect(
session=self.requests, logger=self.logger, dgfip_api_base_url=self.dgfip_api_base_url
)
redirect_uri = self.build_callback_url(request, origin=origin, mode=mode, test=test)
context = {
'origin': origin,
'franceconnect': franceconnect,
'redirect_uri': redirect_uri,
'test': test,
}
try:
franceconnect.handle_authorization_response(
platform=self.fc_platform,
client_id=self.fc_client_id,
client_secret=self.fc_client_secret,
redirect_uri=redirect_uri,
code=request.GET.get('code'),
error=request.GET.get('error'),
error_description=request.GET.get('error_description'),
)
token = {'franceconnect': franceconnect.fc_user_info}
if mode == 'dgfip':
franceconnect.request_dgfip_access_token(
self.dgfip_username, self.dgfip_password, scope=self.dgfip_scopes
)
current_year = now().year
for year in range(current_year - 3, current_year):
franceconnect.request_dgfip_ir(str(year), id_teleservice=self.dgfip_id_teleservice)
token['dgfip_ir'] = franceconnect.dgfip_ressource_ir_response
try:
template = Template(self.text_template)
text_template_context = {
'fc': franceconnect.fc_user_info.copy(),
'dgfip': token.get('dgfip_ir'),
}
if franceconnect.fc_user_info.get('birthdate'):
birthdate = franceconnect.fc_user_info['birthdate']
try:
text_template_context['fc']['birthdate'] = datetime.datetime.strptime(
birthdate, '%Y-%m-%d'
).date()
except ValueError:
pass
token['text'] = template.render(Context(text_template_context))
except Exception:
token['text'] = '<failed to render>'
context['data'] = {'id': self.store(token), 'text': token['text']}
context['data_json'] = json.dumps({'id': self.store(token), 'text': token['text']})
except fc.FranceConnectError as e:
self.logger.warning('callback: error %s', e)
context['error'] = e
context['error_json'] = json.dumps(repr(e))
context['error_data_json'] = json.dumps(e.data)
return TemplateResponse(request, 'franceconnect_data/callback.html', context=context)
@endpoint(
description=_('Demo page (to check your configuration)'),
)
def demo(self, request, **kwargs):
if not request.user.is_superuser:
raise PermissionDenied
return TemplateResponse(
request,
'franceconnect_data/demo.html',
context={'origin': request.build_absolute_uri('/'), 'resource': self},
)
@endpoint(
description=_('Data source'),
)
def data_source(self, request, id=None, test=None, mode=None, **kwargs):
if id:
return {
'data': [
dict(self.retrieve(id), id=id),
]
}
url = request.build_absolute_uri(
reverse(
'generic-endpoint',
kwargs={
'slug': self.slug,
'connector': self.get_connector_slug(),
'endpoint': 'init_request',
},
)
)
params = {}
if mode == 'dgfip':
params['mode'] = 'dgfip'
if test:
params['test'] = '1'
if params:
url += '?' + urlencode(params)
return {
'data': [
{
'id': '',
'text': '',
'init_request_url': url,
'service_origin': get_url_origin(request.build_absolute_uri()),
}
]
}
def store(self, data):
# automatic cleaning of the cache
Token.objects.filter(timestamp__lt=now() - datetime.timedelta(days=60)).delete()
token = Token.objects.create(content=data)
return token.guid.hex
def retrieve(self, ref):
token = Token.objects.filter(guid=ref).first()
return token and token.content
class Token(models.Model):
guid = models.UUIDField(verbose_name=_('UUID'), primary_key=True, default=uuid.uuid4)
timestamp = models.DateTimeField(verbose_name=_('Timestamp'), auto_now_add=True)
content = JSONField(verbose_name=_('Content'))
class Meta:
verbose_name = _('FranceConnect data token')
verbose_name_plural = _('FranceConnect data tokens')