# -*- coding: utf-8 -*- # Copyright (C) 2022 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from urllib.parse import urljoin import zeep from django.core.cache import cache from django.db import models from django.utils.translation import ugettext_lazy as _ from zeep.helpers import serialize_object from zeep.wsse.username import UsernameToken from passerelle.base.models import BaseResource, HTTPResource from passerelle.utils.api import endpoint from passerelle.utils.jsonresponse import APIError from . import schemas class ToulouseMaelis(BaseResource, HTTPResource): base_wsdl_url = models.CharField( max_length=128, blank=False, verbose_name=_('Base WSDL URL'), help_text=_('Toulouse Maelis base WSDL URL'), default='https://demo-toulouse.sigec.fr/maelisws-toulouse/services/', ) zeep_wsse_username = models.CharField( max_length=64, blank=True, default='', verbose_name=_('WSSE Username') ) zeep_wsse_password = models.CharField( max_length=64, blank=True, default='', verbose_name=_('WSSE Password') ) category = _('Business Process Connectors') _category_ordering = [_('Family'), _('Activities')] class Meta: verbose_name = _('Toulouse Maelis') def get_client(self, wsdl_short_name): wsse = UsernameToken(self.zeep_wsse_username, self.zeep_wsse_password) wsdl_name = wsdl_short_name + 'Service?wsdl' wsdl_url = urljoin(self.base_wsdl_url, wsdl_name) return self.soap_client(wsdl_url=wsdl_url, wsse=wsse) def call(self, wsdl_short_name, service, **kwargs): client = self.get_client(wsdl_short_name) method = getattr(client.service, service) try: return method(**kwargs) except zeep.exceptions.Fault as e: raise APIError(e.message, err_code='%s-%s-%s' % (wsdl_short_name, service, e.code)) def check_status(self): assert self.call('Family', 'isWSRunning') assert self.call('Activity', 'isWSRunning') def get_referential(self, referential_name): # local referentials if referential_name == 'Complement': response = [ {'id': 'B', 'text': 'bis'}, {'id': 'T', 'text': 'ter'}, {'id': 'Q', 'text': 'quater'}, ] return {'list': response, 'dict': {x['id']: x['text'] for x in response}} elif referential_name == 'Sex': response = [ {'id': 'M', 'text': 'Masculin'}, {'id': 'F', 'text': 'Féminin'}, ] return {'list': response, 'dict': {x['id']: x['text'] for x in response}} # remote referentials cache_key = 'maelis-%s-%s' % (self.pk, referential_name) data = cache.get(cache_key) if data is None: response = self.call('Family', 'read' + referential_name + 'List') data = { 'list': [{'id': x.code, 'text': x.libelle} for x in response], 'dict': {x.code: x.libelle for x in response}, } # put in cache for two hours cache.set(cache_key, data, 3600 * 2) return data def get_referential_value(self, referential_name, key): try: return self.get_referential(referential_name)['dict'][key] except KeyError: # Maelis DB not properly configurated self.logger.warning("No '%s' key into Maelis '%s' referential", key, referential_name) return key def get_link(self, NameID): try: return self.link_set.get(name_id=NameID) except Link.DoesNotExist: raise APIError('User not linked to family', err_code='not-linked') def get_family(self, family_id): response = self.call('Family', 'readFamily', dossierNumber=family_id) data = serialize_object(response) def add_text_value(referential_name, data, keys): last_key = keys.pop() for key in keys: if not isinstance(data, dict) or not key in data: return data = data[key] if isinstance(data, dict) and last_key in data and data[last_key] is not None: data[last_key + '_text'] = self.get_referential_value(referential_name, data[last_key]) # add text from referentials add_text_value('Category', data, ['category']) add_text_value('Situation', data, ['situation']) for rlg in 'RL1', 'RL2': add_text_value('Civility', data, [rlg, 'civility']) add_text_value('Quality', data, [rlg, 'quality']) add_text_value('Complement', data, [rlg, 'adresse', 'numComp']) add_text_value('CSP', data, [rlg, 'profession', 'codeCSP']) for child in data['childList']: add_text_value('Sex', child, ['sexe']) return data @endpoint( display_category=_('Family'), description='Liste des catégories', name='read-category-list', perm='can_access', ) def read_category_list(self, request): return {'data': self.get_referential('Category')['list']} @endpoint( display_category=_('Family'), description='Liste des civilités', name='read-civility-list', perm='can_access', ) def read_civility_list(self, request): return {'data': self.get_referential('Civility')['list']} @endpoint( display_category=_('Family'), description='Liste des compléments du numéro de voie', name='read-complement-list', perm='can_access', ) def read_complement_list(self, request): return {'data': self.get_referential('Complement')['list']} @endpoint( display_category=_('Family'), description='liste des catégories socio-professionnelles', name='read-csp-list', perm='can_access', ) def read_csp_list(self, request): data = self.get_referential('CSP')['list'] # remove redundant codes uniq_text = set() uniq_data = [] for item in data: item['text'] = item['text'].strip() if item['text'] not in uniq_text: uniq_data.append(item) uniq_text.add(item['text']) return {'data': uniq_data} @endpoint( display_category=_('Family'), description='liste des qualités du référenciel', name='read-quality-list', perm='can_access', ) def read_quality_list(self, request): return {'data': self.get_referential('Quality')['list']} @endpoint( display_category=_('Family'), description='Liste des sexes', name='read-sex-list', perm='can_access', ) def read_sex_list(self, request): return {'data': self.get_referential('Sex')['list']} @endpoint( display_category=_('Family'), description='liste des situations', name='read-situation-list', perm='can_access', ) def read_situation_list(self, request): return {'data': self.get_referential('Situation')['list']} @endpoint( display_category=_('Family'), description=_('Create link between user and family'), perm='can_access', parameters={'NameID': {'description': _('Publik ID')}}, post={'request_body': {'schema': {'application/json': schemas.LINK_SCHEMA}}}, ) def link(self, request, NameID, post_data): family_id = post_data['family_id'] response = self.call('Family', 'readFamily', dossierNumber=family_id) if not ( response['RL1']['firstname'] == post_data['firstname'].upper() and response['RL1']['lastname'] == post_data['lastname'].upper() and response['RL1']['dateBirth'].strftime('%Y-%m-%d') == post_data['dateBirth'] ): raise APIError("RL1 does not match '%s' family" % family_id, err_code='not-found') Link.objects.update_or_create(resource=self, name_id=NameID, defaults={'family_id': family_id}) return {'data': 'ok'} @endpoint( display_category=_('Family'), description=_('Delete link between user and family'), methods=['post'], perm='can_access', parameters={ 'NameID': {'description': _('Publik ID')}, }, ) def unlink(self, request, NameID): link = self.get_link(NameID) link.delete() return {'data': 'ok'} @endpoint( display_category=_('Family'), description='Informations sur la famille', perm='can_access', name='read-family', parameters={'NameID': {'description': _('Publik ID')}}, ) def read_family(self, request, NameID): family_id = self.get_link(NameID).family_id data = self.get_family(family_id) return {'data': data} class Link(models.Model): resource = models.ForeignKey(ToulouseMaelis, on_delete=models.CASCADE) name_id = models.CharField(blank=False, max_length=256) family_id = models.CharField(blank=False, max_length=128) created = models.DateTimeField(auto_now_add=True) updated = models.DateTimeField(auto_now=True) class Meta: unique_together = ('resource', 'name_id')