341 lines
13 KiB
Python
341 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import urlparse
|
|
import datetime
|
|
import re
|
|
|
|
import requests
|
|
|
|
from django.db import models, transaction
|
|
from django.utils import six
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from passerelle.utils.jsonresponse import APIError, to_json
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.base.models import BaseResource, HTTPResource
|
|
|
|
|
|
class MDPH13Resource(BaseResource, HTTPResource):
|
|
category = _('Business Process Connectors')
|
|
|
|
webservice_base_url = models.URLField(_('Webservice Base URL'))
|
|
|
|
EMAIL_RE = re.compile(r'^[^@\s]+@[^@\s]+\.[^@\s]+$')
|
|
|
|
class Meta:
|
|
verbose_name = _('MDPH CD13')
|
|
|
|
def situation_dossier_url(self, file_number):
|
|
return urlparse.urljoin(self.webservice_base_url, 'situation/dossier/%s' % file_number)
|
|
|
|
def url_get(self, *args, **kwargs):
|
|
try:
|
|
response = None
|
|
response = self.requests.get(*args, **kwargs)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
data = {'exception': six.text_type(e)}
|
|
if response is not None:
|
|
try:
|
|
if response.json():
|
|
data['body'] = response.json()
|
|
except ValueError:
|
|
body = response.content[:1000]
|
|
if len(response.content) > 1000:
|
|
body += b'<SNIPPED>'
|
|
data['body'] = repr(body)
|
|
data['status_code'] = response.status_code
|
|
raise APIError('HTTP request failed', data=data)
|
|
try:
|
|
content = response.json()
|
|
except ValueError as e:
|
|
data = {'exception': six.text_type(e)}
|
|
data['body'] = '%r<SNIPPED>' % response.content[:1000]
|
|
raise APIError('HTTP request did not respond with JSON', data=data)
|
|
return content
|
|
|
|
def call_situation_dossier(self, file_number, secret, dob, email=None):
|
|
url = self.situation_dossier_url(file_number)
|
|
email = email or 'appel-sans-utilisateur@cd13.fr'
|
|
error_mapping = {
|
|
'dossier-inconnu': 'dossier-inconnu',
|
|
'secret-invalide': 'secret-invalide',
|
|
'dateNaissance-erronee': 'date-de-naissance-erronee',
|
|
}
|
|
try:
|
|
content = self.url_get(
|
|
url,
|
|
headers={
|
|
'X-CD13-Secret': base64.b64encode(secret.encode('utf-8')).decode('ascii'),
|
|
'X-CD13-Email': email,
|
|
'X-CD13-DateNaissBenef': dob.isoformat(),
|
|
})
|
|
except APIError as e:
|
|
if e.data and e.data.get('status_code') == 404 and isinstance(e.data.get('body'), dict):
|
|
content = e.data['body']
|
|
if content.get('err') != 0:
|
|
error_code = content.get('err_code')
|
|
err_desc = error_mapping.get(error_code, 'err != 0')
|
|
raise APIError(err_desc, data=e.data)
|
|
raise
|
|
|
|
if content.get('err') != 0:
|
|
error_code = content.get('err_code')
|
|
err_desc = error_mapping.get(error_code, 'err != 0')
|
|
raise APIError(err_desc, data=content)
|
|
|
|
data = content.get('data')
|
|
if not isinstance(data, dict):
|
|
raise APIError('data-must-be-a-dict', data=content)
|
|
|
|
if str(data.get('numero')) != str(file_number):
|
|
raise APIError('numero-must-match-numero-dossier', date=content)
|
|
|
|
# Reorganize entourage
|
|
beneficiaire = data.get('beneficiaire', {})
|
|
entourage = beneficiaire.get('entourage')
|
|
if entourage is not None:
|
|
if not isinstance(entourage, list):
|
|
raise APIError('entourage-must-be-a-list', data=content)
|
|
if not all(isinstance(person, dict) for person in entourage):
|
|
raise APIError('demandes-content-must-be-dicts', data=content)
|
|
new_entourage = {}
|
|
for person in entourage:
|
|
if not isinstance(person, dict):
|
|
raise APIError('entourage-content-must-be-dicts', data=content)
|
|
if person.get('role') in [u'Père', u'Mère']:
|
|
new_entourage.setdefault('parents', []).append(person)
|
|
else:
|
|
new_entourage.setdefault('aidants', []).append(person)
|
|
beneficiaire['entourage'] = new_entourage
|
|
|
|
# Reorganize demandes
|
|
demandes = data.get('demandes')
|
|
if demandes:
|
|
if not isinstance(demandes, list):
|
|
raise APIError('demandes-must-be-a-list', data=content)
|
|
if not all(isinstance(demande, dict) for demande in demandes):
|
|
raise APIError('demandes-content-must-be-dicts', data=content)
|
|
new_demandes = {}
|
|
typologies = {
|
|
u'demande en cours': 'en_cours',
|
|
u'traitée et expédiée': 'historique',
|
|
u'traitée non expédiée': 'historique',
|
|
}
|
|
if not all(isinstance(demande.get('typologie'), six.text_type) for demande in demandes):
|
|
raise APIError('typologie-must-be-a-string', data=content)
|
|
if not all(demande['typologie'].lower() in typologies for demande in demandes):
|
|
unknowns = set([demande['typologie'].lower() for demande in demandes]) - set(typologies.keys())
|
|
raise APIError('typologie-is-unknown',
|
|
data={
|
|
'unknowns': list(unknowns),
|
|
'choices': typologies.keys()
|
|
})
|
|
for demande in demandes:
|
|
new_demandes.setdefault(
|
|
typologies[demande['typologie'].lower()],
|
|
[]
|
|
).append(demande)
|
|
data['demandes'] = new_demandes
|
|
return data
|
|
|
|
def check_status(self):
|
|
try:
|
|
link = Link.objects.latest('created')
|
|
except Link.DoesNotExist:
|
|
return
|
|
# no email passed, it's a background check
|
|
link.get_file()
|
|
|
|
@endpoint(name='link',
|
|
methods=['post'],
|
|
description=_('Create link with an extranet account'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID': {
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'numero_dossier': {
|
|
'description': _('MDPH13 beneficiary file number'),
|
|
'example_value': '1234',
|
|
},
|
|
'secret': {
|
|
'description': _('MDPH13 beneficiary secret'),
|
|
'example_value': 'secret',
|
|
},
|
|
'date_de_naissance': {
|
|
'description': _('MDPH13 beneficiary date of birth'),
|
|
'example_value': '1992-03-05',
|
|
},
|
|
'email': {
|
|
'description': _('Publik known email'),
|
|
'example_value': 'john.doe@example.com',
|
|
},
|
|
})
|
|
def link(self, request, NameID, numero_dossier, secret, date_de_naissance, email):
|
|
file_number = numero_dossier.strip()
|
|
try:
|
|
int(file_number)
|
|
except ValueError:
|
|
raise APIError('numero_dossier must be a number', http_status=400)
|
|
try:
|
|
dob = datetime.datetime.strptime(date_de_naissance.strip(), '%Y-%m-%d').date()
|
|
except ValueError:
|
|
raise APIError('date_de_naissance must be a date YYYY-MM-DD', http_status=400)
|
|
email = email.strip()
|
|
if not self.EMAIL_RE.match(email):
|
|
raise APIError('email is not valid', http_status=400)
|
|
with transaction.atomic():
|
|
link, created = Link.objects.get_or_create(
|
|
resource=self,
|
|
name_id=NameID,
|
|
file_number=file_number,
|
|
defaults={
|
|
'secret': secret,
|
|
'dob': dob
|
|
})
|
|
updated = False
|
|
if not created:
|
|
if link.secret != secret or link.dob != dob:
|
|
link.secret = secret
|
|
link.dob = dob
|
|
updated = True
|
|
# email is necessary for audit purpose
|
|
link.get_file(email=email)
|
|
if updated:
|
|
link.save()
|
|
return {'link_id': link.pk, 'created': created, 'updated': updated}
|
|
|
|
@endpoint(name='unlink',
|
|
methods=['post', 'delete'],
|
|
description=_('Delete link with an extranet account'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID': {
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'link_id': {
|
|
'description': _('Identifier of the link'),
|
|
'example_value': '1',
|
|
},
|
|
})
|
|
def unlink(self, request, NameID, link_id):
|
|
qs = Link.objects.filter(resource=self, name_id=NameID)
|
|
if link_id == 'all':
|
|
pass # unlink of all links
|
|
else:
|
|
try:
|
|
link_id = int(link_id.strip())
|
|
except ValueError:
|
|
raise APIError('link_id-must-be-a-number')
|
|
qs = qs.filter(pk=link_id)
|
|
count = qs.count()
|
|
qs.delete()
|
|
return {'deleted': count}
|
|
|
|
@endpoint(name='dossiers',
|
|
description=_('Get datas for all links, or for a specified one'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID': {
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'email': {
|
|
'description': _('Publik known email'),
|
|
'example_value': 'john.doe@example.com',
|
|
},
|
|
'link_id': {
|
|
'description': _('Link identifier'),
|
|
'example_value': '1',
|
|
}
|
|
})
|
|
def dossiers(self, request, NameID, email, link_id=None):
|
|
email = email.strip()
|
|
if not self.EMAIL_RE.match(email):
|
|
raise APIError('email is not valid', http_status=400)
|
|
|
|
qs = Link.objects.filter(
|
|
resource=self,
|
|
name_id=NameID)
|
|
if link_id:
|
|
try:
|
|
link_id = int(link_id)
|
|
except ValueError:
|
|
raise APIError('invalid-link-id', http_status=400)
|
|
qs = qs.filter(id=link_id)
|
|
data = []
|
|
for link in qs:
|
|
file_data = {
|
|
'id': str(link.id),
|
|
'err': 0,
|
|
}
|
|
try:
|
|
mdph_file = link.get_file(email=email)
|
|
except Exception as e:
|
|
if link_id:
|
|
raise
|
|
file_data.update(to_json().err_to_response(e))
|
|
else:
|
|
file_data.update({
|
|
'numero_dossier': link.file_number,
|
|
'date_de_naissance': link.dob.isoformat(),
|
|
'dossier': mdph_file,
|
|
})
|
|
data.append(file_data)
|
|
if link_id:
|
|
return {'data': data[0] if data else None}
|
|
return {'data': data}
|
|
|
|
|
|
class Link(models.Model):
|
|
resource = models.ForeignKey(
|
|
MDPH13Resource,
|
|
on_delete=models.CASCADE)
|
|
name_id = models.CharField(
|
|
verbose_name=_('NameID'),
|
|
max_length=256)
|
|
file_number = models.CharField(
|
|
max_length=64,
|
|
verbose_name=_('MDPH beneficiary file number'))
|
|
secret = models.CharField(
|
|
verbose_name=_('MDPH beneficiary secret'),
|
|
max_length=64)
|
|
dob = models.DateField(
|
|
verbose_name=_('MDPH beneficiary date of birth'))
|
|
created = models.DateTimeField(
|
|
verbose_name=_('Creation date'),
|
|
auto_now_add=True)
|
|
|
|
def get_file(self, email=None):
|
|
# email is necessary for audit purpose
|
|
return self.resource.call_situation_dossier(
|
|
file_number=self.file_number,
|
|
secret=self.secret,
|
|
dob=self.dob,
|
|
email=email)
|
|
|
|
class Meta:
|
|
unique_together = (
|
|
'resource', 'name_id', 'file_number',
|
|
)
|
|
ordering = ['file_number']
|