passerelle/passerelle/contrib/toulouse_maelis/models.py

514 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import urljoin
import zeep
from django.core.cache import cache
from django.db import models
from django.utils.translation import ugettext_lazy as _
from zeep.helpers import serialize_object
from zeep.wsse.username import UsernameToken
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
from . import schemas
class ToulouseMaelis(BaseResource, HTTPResource):
base_wsdl_url = models.CharField(
max_length=128,
blank=False,
verbose_name=_('Base WSDL URL'),
help_text=_('Toulouse Maelis base WSDL URL'),
default='https://demo-toulouse.sigec.fr/maelisws-toulouse/services/',
)
zeep_wsse_username = models.CharField(
max_length=64, blank=True, default='', verbose_name=_('WSSE Username')
)
zeep_wsse_password = models.CharField(
max_length=64, blank=True, default='', verbose_name=_('WSSE Password')
)
category = _('Business Process Connectors')
_category_ordering = [_('Family'), _('Activities')]
class Meta:
verbose_name = _('Toulouse Maelis')
def get_client(self, wsdl_short_name):
wsse = UsernameToken(self.zeep_wsse_username, self.zeep_wsse_password)
wsdl_name = wsdl_short_name + 'Service?wsdl'
wsdl_url = urljoin(self.base_wsdl_url, wsdl_name)
return self.soap_client(wsdl_url=wsdl_url, wsse=wsse)
def call(self, wsdl_short_name, service, **kwargs):
client = self.get_client(wsdl_short_name)
method = getattr(client.service, service)
try:
return method(**kwargs)
except zeep.exceptions.Fault as e:
raise APIError(e.message, err_code='%s-%s-%s' % (wsdl_short_name, service, e.code))
def check_status(self):
assert self.call('Family', 'isWSRunning')
assert self.call('Activity', 'isWSRunning')
def get_referential(self, referential_name):
# local referentials
if referential_name == 'Complement':
response = [
{'id': 'B', 'text': 'bis'},
{'id': 'T', 'text': 'ter'},
{'id': 'Q', 'text': 'quater'},
]
return {'list': response, 'dict': {x['id']: x['text'] for x in response}}
elif referential_name == 'Sex':
response = [
{'id': 'M', 'text': 'Masculin'},
{'id': 'F', 'text': 'Féminin'},
]
return {'list': response, 'dict': {x['id']: x['text'] for x in response}}
# remote referentials
cache_key = 'maelis-%s-%s' % (self.pk, referential_name)
data = cache.get(cache_key)
if data is None:
response = self.call('Family', 'read' + referential_name + 'List')
data = {
'list': [{'id': x.code, 'text': x.libelle} for x in response],
'dict': {x.code: x.libelle for x in response},
}
# put in cache for two hours
cache.set(cache_key, data, 3600 * 2)
return data
def get_referential_value(self, referential_name, key):
try:
return self.get_referential(referential_name)['dict'][key]
except KeyError:
# Maelis DB not properly configurated
self.logger.warning("No '%s' key into Maelis '%s' referential", key, referential_name)
return key
def get_link(self, NameID):
try:
return self.link_set.get(name_id=NameID)
except Link.DoesNotExist:
raise APIError('User not linked to family', err_code='not-linked')
def get_family_raw(self, family_id):
response = self.call('Family', 'readFamily', dossierNumber=family_id)
data = serialize_object(response)
# make emergencyPersonList keys egal to authorizedPersonList
for person in data['emergencyPersonList']:
person['numPerson'] = person['num']
person['dateBirth'] = '1970-01-01' # I need a date
del person['num']
del person['id']
return data
def get_family(self, family_id):
data = self.get_family_raw(family_id)
def add_text_value(referential_name, data, keys):
last_key = keys.pop()
for key in keys:
if not isinstance(data, dict) or not key in data:
return
data = data[key]
if isinstance(data, dict) and last_key in data and data[last_key] is not None:
data[last_key + '_text'] = self.get_referential_value(referential_name, data[last_key])
# add text from referentials
add_text_value('Category', data, ['category'])
add_text_value('Situation', data, ['situation'])
for rlg in 'RL1', 'RL2':
add_text_value('Civility', data, [rlg, 'civility'])
add_text_value('Quality', data, [rlg, 'quality'])
add_text_value('Complement', data, [rlg, 'adresse', 'numComp'])
add_text_value('CSP', data, [rlg, 'profession', 'codeCSP'])
for child in data['childList']:
add_text_value('Sex', child, ['sexe'])
for kind in ('authorized', 'emergency'):
for person in data[kind + 'PersonList']:
add_text_value('Civility', person, ['civility'])
add_text_value('Quality', person, ['quality'])
return data
def replace_null_values(self, dico):
'''send null fields as empty SOAP tag to tell maelis to empty the value'''
for key, value in dico.items():
if isinstance(value, dict):
self.replace_null_values(value)
if value is None:
dico[key] = ''
@endpoint(
display_category=_('Family'),
description='Liste des catégories',
name='read-category-list',
perm='can_access',
)
def read_category_list(self, request):
return {'data': self.get_referential('Category')['list']}
@endpoint(
display_category=_('Family'),
description='Liste des civilités',
name='read-civility-list',
perm='can_access',
)
def read_civility_list(self, request):
return {'data': self.get_referential('Civility')['list']}
@endpoint(
display_category=_('Family'),
description='Liste des compléments du numéro de voie',
name='read-complement-list',
perm='can_access',
)
def read_complement_list(self, request):
return {'data': self.get_referential('Complement')['list']}
@endpoint(
display_category=_('Family'),
description='liste des catégories socio-professionnelles',
name='read-csp-list',
perm='can_access',
)
def read_csp_list(self, request):
data = self.get_referential('CSP')['list']
# remove redundant codes
uniq_text = set()
uniq_data = []
for item in data:
item['text'] = item['text'].strip()
if item['text'] not in uniq_text:
uniq_data.append(item)
uniq_text.add(item['text'])
return {'data': uniq_data}
@endpoint(
display_category=_('Family'),
description='liste des qualités du référenciel',
name='read-quality-list',
perm='can_access',
)
def read_quality_list(self, request):
return {'data': self.get_referential('Quality')['list']}
@endpoint(
display_category=_('Family'),
description='Liste des sexes',
name='read-sex-list',
perm='can_access',
)
def read_sex_list(self, request):
return {'data': self.get_referential('Sex')['list']}
@endpoint(
display_category=_('Family'),
description='liste des situations',
name='read-situation-list',
perm='can_access',
)
def read_situation_list(self, request):
return {'data': self.get_referential('Situation')['list']}
@endpoint(
display_category=_('Family'),
description=_('Create link between user and family'),
perm='can_access',
parameters={'NameID': {'description': _('Publik ID')}},
post={'request_body': {'schema': {'application/json': schemas.LINK_SCHEMA}}},
)
def link(self, request, NameID, post_data):
family_id = post_data['family_id']
response = self.call('Family', 'readFamily', dossierNumber=family_id)
if not (
response['RL1']['firstname'] == post_data['firstname'].upper()
and response['RL1']['lastname'] == post_data['lastname'].upper()
and response['RL1']['dateBirth'].strftime('%Y-%m-%d') == post_data['dateBirth']
):
raise APIError("RL1 does not match '%s' family" % family_id, err_code='not-found')
Link.objects.update_or_create(resource=self, name_id=NameID, defaults={'family_id': family_id})
return {'data': 'ok'}
@endpoint(
display_category=_('Family'),
description=_('Delete link between user and family'),
methods=['post'],
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
},
)
def unlink(self, request, NameID):
link = self.get_link(NameID)
link.delete()
return {'data': 'ok'}
@endpoint(
display_category=_('Family'),
description='Informations sur la famille',
perm='can_access',
name='read-family',
parameters={'NameID': {'description': _('Publik ID')}},
)
def read_family(self, request, NameID):
family_id = self.get_link(NameID).family_id
data = self.get_family(family_id)
return {'data': data}
@endpoint(
display_category=_('Family'),
description="Informations sur un responsable légal",
perm='can_access',
name='read-rl',
parameters={
'NameID': {'description': _('Publik ID')},
'rl_id': {'description': 'Numéro du représentant légal'},
},
)
def read_rl(self, request, NameID, rl_id):
family_id = self.get_link(NameID).family_id
data = self.get_family(family_id)
if data['RL1']['num'] == rl_id:
data = data['RL1']
elif data['RL2'] and data['RL2']['num'] == rl_id:
data = data['RL2']
else:
raise APIError("no '%s' RL on '%s' family" % (rl_id, family_id), err_code='not-found')
return {'data': data}
@endpoint(
display_category=_('Family'),
description="Informations sur une personne autorisée à récupérer les enfants ou à prévenir en cas d'urgence",
perm='can_access',
name='read-person',
parameters={
'NameID': {'description': _('Publik ID')},
'person_id': {'description': 'Numéro de la personne'},
'kind': {'description': "'authorized' (par defaut) ou 'emergency'"},
},
)
def read_person(self, request, NameID, person_id, kind='authorized'):
if kind not in ('authorized', 'emergency'):
raise APIError("wrong '%s' value for kind parameter" % kind)
family_id = self.get_link(NameID).family_id
data = self.get_family(family_id)
for person in data[kind + 'PersonList']:
if str(person['numPerson']) == person_id:
break
else:
raise APIError(
"no '%s' %s person on '%s' family" % (person_id, kind, family_id), err_code='not-found'
)
return {'data': person}
@endpoint(
display_category=_('Family'),
description="Vérifier qu'un responsable légal existe en base",
perm='can_access',
name='is-rl-exists',
post={'request_body': {'schema': {'application/json': schemas.ISRLEXISTS_SCHEMA}}},
)
def is_rl_exists(self, request, post_data):
response = self.call('Family', 'isRLExists', **post_data)
return {'data': response}
@endpoint(
display_category=_('Family'),
description='Création de la famille',
name='create-family',
perm='can_access',
parameters={'NameID': {'description': _('Publik ID')}},
post={'request_body': {'schema': {'application/json': schemas.CREATE_FAMILY_SCHEMA}}},
)
def create_family(self, request, NameID, post_data):
if self.link_set.filter(name_id=NameID).exists():
raise APIError('User already linked to family', err_code='already-linked')
response = self.call('Family', 'createFamily', **post_data)
data = serialize_object(response)
family_id = data.get('number')
if not family_id:
errors = data.get('rl1ErrorList') + data.get('childErrorList')
err_codes = [x.split(':')[0][:4] for x in errors]
raise APIError(' ; '.join(errors), err_code=', '.join(err_codes))
Link.objects.create(resource=self, name_id=NameID, family_id=family_id)
return {'data': data}
@endpoint(
display_category=_('Family'),
description='Modification de la famille',
name='update-family',
perm='can_access',
parameters={'NameID': {'description': _('Publik ID')}},
post={'request_body': {'schema': {'application/json': schemas.UPDATE_FAMILY_SCHEMA}}},
)
def update_family(self, request, NameID, post_data):
family_id = self.get_link(NameID).family_id
self.replace_null_values(post_data)
response = self.call('Family', 'updateFamily', dossierNumber=family_id, **post_data)
data = serialize_object(response)
family_id = data.get('number')
if not family_id:
errors = data.get('rl1ErrorList') + data.get('childErrorList')
err_codes = [x.split(':')[0][:4] for x in errors]
raise APIError(' ; '.join(errors), err_code=', '.join(err_codes))
return {'data': data}
@endpoint(
display_category=_('Family'),
description="Mise à jour des coordonnées d'une personne",
name='update-coordinate',
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'rl_id': {'description': 'Numéro du représentant légal'},
},
post={'request_body': {'schema': {'application/json': schemas.UPDATE_COORDINATE_SCHEMA}}},
)
def update_coordinate(self, request, NameID, rl_id, post_data):
family_id = self.get_link(NameID).family_id
self.replace_null_values(post_data)
self.call('Family', 'updateCoordinate', numDossier=family_id, numPerson=rl_id, **post_data)
return {'data': 'ok'}
@endpoint(
display_category=_('Family'),
description="Création d'une personne autorisée à récupérer les enfants ou à prévenir en cas d'urgence",
name='create-person',
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'kind': {'description': "'authorized' (par defaut) ou 'emergency'"},
},
post={'request_body': {'schema': {'application/json': schemas.FAMILYPERSON_SCHEMA}}},
)
def create_person(self, request, NameID, post_data, kind='authorized'):
if kind not in ('authorized', 'emergency'):
raise APIError("wrong '%s' value for kind parameter" % kind)
family_id = self.get_link(NameID).family_id
family = self.get_family_raw(family_id)
self.replace_null_values(post_data)
personList = family[kind + 'PersonList']
personList.append(post_data)
payload = {
'dossierNumber': family_id,
'categorie': family['category'],
'situation': family['situation'],
kind + 'PersonList': [{'personList': personList}],
}
self.call('Family', 'updateFamily', **payload)
return {'data': 'ok'}
@endpoint(
display_category=_('Family'),
description="Mise à jour d'une personne autorisée à récupérer les enfants ou à prévenir en cas d'urgence",
name='update-person',
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'person_id': {'description': 'Numéro de la personne'},
'kind': {'description': "'authorized' (par defaut) ou 'emergency'"},
},
post={'request_body': {'schema': {'application/json': schemas.FAMILYPERSON_SCHEMA}}},
)
def update_person(self, request, NameID, person_id, post_data, kind='authorized'):
if kind not in ('authorized', 'emergency'):
raise APIError("wrong '%s' value for kind parameter" % kind)
family_id = self.get_link(NameID).family_id
family = self.get_family_raw(family_id)
self.replace_null_values(post_data)
personList = family[kind + 'PersonList']
for i, person in enumerate(personList):
if str(person['numPerson']) == person_id:
personList[i] = post_data
personList[i]['numPerson'] = person_id
break
else:
raise APIError(
"no '%s' authorized person on '%s' family" % (person_id, family_id), err_code='not-found'
)
payload = {
'dossierNumber': family_id,
'categorie': family['category'],
'situation': family['situation'],
kind + 'PersonList': [{'personList': personList}],
}
self.call('Family', 'updateFamily', **payload)
return {'data': 'ok'}
@endpoint(
display_category=_('Family'),
description="Suppression d'une personne autorisée à récupérer les enfants ou à prévenir en cas d'urgence",
name='delete-person',
perm='can_access',
parameters={
'NameID': {'description': _('Publik ID')},
'person_id': {'description': 'Numéro de la personne'},
'kind': {'description': "'authorized' (par defaut) ou 'emergency'"},
},
methods=['post'],
)
def delete_person(self, request, NameID, person_id, kind='authorized'):
if kind not in ('authorized', 'emergency'):
raise APIError("wrong '%s' value for kind parameter" % kind)
family_id = self.get_link(NameID).family_id
family = self.get_family_raw(family_id)
personList = family[kind + 'PersonList']
for i, person in enumerate(personList):
if str(person['numPerson']) == person_id:
del personList[i]
break
else:
raise APIError(
"no '%s' authorized person on '%s' family" % (person_id, family_id), err_code='not-found'
)
payload = {
'dossierNumber': family_id,
'categorie': family['category'],
'situation': family['situation'],
kind + 'PersonList': [{'personList': personList}],
}
self.call('Family', 'updateFamily', **payload)
return {'data': 'ok'}
class Link(models.Model):
resource = models.ForeignKey(ToulouseMaelis, on_delete=models.CASCADE)
name_id = models.CharField(blank=False, max_length=256)
family_id = models.CharField(blank=False, max_length=128)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
unique_together = ('resource', 'name_id')