passerelle/passerelle/contrib/mdph13/models.py

377 lines
14 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import re
from urllib import parse as urlparse
import requests
from django.db import models, transaction
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError, to_json
def json_walker(value, func, path=None):
"""Walk a JSON structure of objects, arrays and scalar values, call
func(value, path) on values."""
path = path or []
if isinstance(value, dict):
for key in value:
json_walker(value[key], func, path + [key])
elif isinstance(value, list):
for i, v in enumerate(value):
json_walker(v, func, path + ['[%s]' % i])
else:
func(value, path)
ERROR_MAPPING = {
'dossier-inconnu': 'dossier-inconnu',
'secret-invalide': 'secret-invalide',
'dateNaissance-erronee': 'date-de-naissance-erronee',
}
class MDPH13Resource(BaseResource, HTTPResource):
category = _('Business Process Connectors')
webservice_base_url = models.URLField(_('Webservice Base URL'))
EMAIL_RE = re.compile(r'^[^@\s]+@[^@\s]+\.[^@\s]+$')
DATE_RE = re.compile(r'^\d{4}-\d{2}-\d{2}$')
log_requests_errors = False
class Meta:
verbose_name = _('MDPH CD13')
def situation_dossier_url(self, file_number):
return urlparse.urljoin(self.webservice_base_url, 'situation/dossier/%s' % file_number)
def url_get(self, *args, **kwargs):
response = self.requests.get(*args, **kwargs)
try:
content = response.json()
except ValueError:
response.raise_for_status()
raise requests.RequestException('JSON expected', response=response)
if content.get('err') != 0:
err_desc = ERROR_MAPPING.get(content.get('err_code'), 'err != 0: missing or unknown error code')
raise APIError(err_desc, data=content)
response.raise_for_status()
return content
def call_situation_dossier(self, file_number, secret, dob, email=None, ip=None):
url = self.situation_dossier_url(file_number)
email = email or 'appel-sans-utilisateur@cd13.fr'
headers = {
'X-CD13-Secret': base64.b64encode(secret.encode('utf-8')).decode('ascii'),
'X-CD13-Email': email,
'X-CD13-DateNaissBenef': dob.isoformat(),
}
if ip:
headers['X-CD13-IP'] = ip
content = self.url_get(url, headers=headers)
data = content.get('data')
if not isinstance(data, dict):
raise APIError('data-must-be-a-dict', data=content)
if str(data.get('numero')) != str(file_number):
raise APIError('numero-must-match-numero-dossier', data=content)
# Reorganize entourage
beneficiaire = data.get('beneficiaire', {})
if not isinstance(beneficiaire, dict):
raise APIError('beneficiaire-must-be-a-dict', data=data)
entourage = beneficiaire.get('entourage')
if entourage is not None:
if not isinstance(entourage, list):
raise APIError('entourage-must-be-a-list', data=content)
if not all(isinstance(person, dict) for person in entourage):
raise APIError('demandes-content-must-be-dicts', data=content)
new_entourage = {}
for person in entourage:
if not isinstance(person, dict):
raise APIError('entourage-content-must-be-dicts', data=content)
if person.get('role') in ['Père', 'Mère']:
new_entourage.setdefault('parents', []).append(person)
else:
new_entourage.setdefault('aidants', []).append(person)
beneficiaire['entourage'] = new_entourage
# Reorganize demandes
demandes = data.get('demandes') or []
# ISO8601 dates are lexicographicaly orderable
demandes.sort(key=lambda demande: demande.get('date_demande') or '', reverse=True)
if demandes:
if not isinstance(demandes, list):
raise APIError('demandes-must-be-a-list', data=content)
if not all(isinstance(demande, dict) for demande in demandes):
raise APIError('demandes-content-must-be-dicts', data=content)
new_demandes = {}
typologies = {
'demande en cours': 'en_cours',
'traitée et expédiée': 'historique',
'traitée non expédiée': 'historique',
}
if not all(isinstance(demande.get('typologie'), str) for demande in demandes):
raise APIError('typologie-must-be-a-string', data=content)
if not all(demande['typologie'].lower() in typologies for demande in demandes):
unknowns = {demande['typologie'].lower() for demande in demandes} - set(typologies.keys())
raise APIError(
'typologie-is-unknown',
data={
'unknowns': list(unknowns),
'choices': typologies.keys(),
'response': content,
},
)
for demande in demandes:
new_demandes.setdefault(typologies[demande['typologie'].lower()], []).append(demande)
data['demandes'] = new_demandes
# Check some syntaxes
errors = []
def check(value, path):
if path[-1].startswith('date_'):
if isinstance(value, str) and not self.DATE_RE.match(value):
errors.append('%s is not a date string' % '.'.join(path))
json_walker(data, check)
if errors:
raise APIError('invalid-response-format', data={'errors': errors, 'response': content})
return data
@endpoint(
name='link',
methods=['post'],
description=_('Create link with an extranet account'),
perm='can_access',
parameters={
'NameID': {
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'numero_dossier': {
'description': _('MDPH13 beneficiary file number'),
'example_value': '1234',
},
'secret': {
'description': _('MDPH13 beneficiary secret'),
'example_value': 'secret',
},
'date_de_naissance': {
'description': _('MDPH13 beneficiary date of birth'),
'type': 'date',
'example_value': '1992-03-05',
},
'email': {
'description': _('Publik known email'),
'example_value': 'john.doe@example.com',
},
'ip': {
'description': _('Publik client IP'),
'example_value': '88.67.23.45',
},
},
)
def link(self, request, NameID, numero_dossier, secret, date_de_naissance, email, ip=None):
file_number = numero_dossier.strip()
try:
int(file_number)
except ValueError:
raise APIError('numero_dossier must be a number', http_status=400)
email = email.strip()
if not self.EMAIL_RE.match(email):
raise APIError('email is not valid', http_status=400)
link, created, updated = Link.create_or_update(
resource=self,
NameID=NameID,
file_number=file_number,
secret=secret,
dob=date_de_naissance,
email=email,
ip=ip,
)
return {'link_id': link.pk, 'created': created, 'updated': updated}
@endpoint(
name='unlink',
methods=['post', 'delete'],
description=_('Delete link with an extranet account'),
perm='can_access',
parameters={
'NameID': {
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'link_id': {
'description': _('Identifier of the link'),
'example_value': '1',
},
},
)
def unlink(self, request, NameID, link_id):
qs = Link.objects.filter(resource=self, name_id=NameID)
if link_id == 'all':
pass # unlink of all links
else:
try:
link_id = int(link_id.strip())
except ValueError:
raise APIError('link_id-must-be-a-number')
qs = qs.filter(pk=link_id)
count = qs.count()
qs.delete()
return {'deleted': count}
@endpoint(
name='dossiers',
description=_('Get datas for all links, or for a specified one'),
perm='can_access',
parameters={
'NameID': {
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'email': {
'description': _('Publik known email'),
'example_value': 'john.doe@example.com',
},
'link_id': {
'description': _('Link identifier'),
'example_value': '1',
},
'ip': {
'description': _('Publik client IP'),
'example_value': '88.67.23.45',
},
},
)
def dossiers(self, request, NameID, email, link_id=None, ip=None):
email = email.strip()
if not self.EMAIL_RE.match(email):
raise APIError('email is not valid', http_status=400)
qs = Link.objects.filter(resource=self, name_id=NameID)
if link_id:
try:
link_id = int(link_id)
except ValueError:
raise APIError('invalid-link-id', http_status=400)
qs = qs.filter(id=link_id)
data = []
for link in qs:
file_data = {
'id': str(link.id),
'numero_dossier': link.file_number,
'date_de_naissance': link.dob.isoformat(),
'err': 0,
}
try:
file_data['dossier'] = link.get_file(email=email, ip=ip)
except Exception as e:
if link_id:
raise
file_data.update(to_json().err_to_response(e))
# do it later as get_filter() can modify str(link)
file_data['text'] = str(link)
data.append(file_data)
if link_id:
return {'data': data[0] if data else None}
return {'data': data}
class Link(models.Model):
resource = models.ForeignKey(MDPH13Resource, on_delete=models.CASCADE)
name_id = models.CharField(verbose_name=_('NameID'), max_length=256)
file_number = models.CharField(max_length=64, verbose_name=_('MDPH beneficiary file number'))
secret = models.CharField(verbose_name=_('MDPH beneficiary secret'), max_length=64)
dob = models.DateField(verbose_name=_('MDPH beneficiary date of birth'))
created = models.DateTimeField(verbose_name=_('Creation date'), auto_now_add=True)
display_name = models.CharField(verbose_name=_('Display name'), max_length=128, blank=True)
def get_file(self, email=None, ip=None):
# email is necessary for audit purpose
mdph_file = self.resource.call_situation_dossier(
file_number=self.file_number, secret=self.secret, dob=self.dob, email=email, ip=ip
)
display_name = self._make_display_name(mdph_file)
if self.display_name != display_name:
self.display_name = display_name
self.save()
return mdph_file
@classmethod
def create_or_update(cls, resource, NameID, file_number, secret, dob, email=None, ip=None):
# email is necessary for audit purpose
mdph_file = resource.call_situation_dossier(
file_number=file_number, secret=secret, dob=dob, email=email, ip=ip
)
display_name = cls._make_display_name(mdph_file)
with transaction.atomic():
link, created = Link.objects.get_or_create(
resource=resource,
name_id=NameID,
file_number=file_number,
defaults={
'secret': secret,
'dob': dob,
'display_name': display_name,
},
)
updated = False
if link.secret != secret or link.dob != dob or link.display_name != display_name:
link.secret = secret
link.dob = dob
link.display_name = display_name
link.save()
updated = True
return link, created, updated
@classmethod
def _make_display_name(cls, mdph_file):
prenom = mdph_file.get('beneficiaire', {}).get('prenom')
nom = mdph_file.get('beneficiaire', {}).get('nom')
numero = mdph_file['numero']
parts = []
if prenom:
parts.append(prenom)
if nom:
parts.append(nom)
parts.append('#%s' % numero)
return ' '.join(parts)
def __str__(self):
return self.display_name or '#%s' % self.file_number
class Meta:
unique_together = (
'resource',
'name_id',
'file_number',
)
ordering = ['file_number']