passerelle/passerelle/apps/atos_genesys/models.py

432 lines
16 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import urlparse
import requests
import xml.etree.ElementTree as ET
import jsonfield
from django.db import models
from django.utils import six
from django.utils.translation import ugettext_lazy as _
from passerelle.utils import xml as xmlutils
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.api import endpoint
from passerelle.base.models import BaseResource, HTTPResource
from . import utils
class Resource(BaseResource, HTTPResource):
category = _('Business Process Connectors')
webservice_base_url = models.URLField(_('Webservice Base URL'))
cod_rgp = models.CharField(_('Code RGP'), max_length=64, default='RGP_PUB')
class Meta:
verbose_name = _('ATOS Genesys')
@property
def base_url(self):
if 'WSUsagerPublik' in self.webservice_base_url:
return self.webservice_base_url.split('WSUsagerPublik')[0]
return self.webservice_base_url
@property
def select_codifications_url(self):
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectCodifications')
def xml_request(self, url, *args, **kwargs):
root, response = self._xml_request_base(url, *args, **kwargs)
row = root.find('ROWSET/ROW')
if row is None:
raise APIError('no ROWSET/ROW node', data={'content': response.text[:1024]})
return row
def xml_request_multiple(self, url, *args, **kwargs):
root, response = self._xml_request_base(url, *args, **kwargs)
rowset = root.find('ROWSET')
if rowset is None:
raise APIError('no ROWSET node', data={'content': response.text[:1024]})
rows = rowset.findall('ROW')
return rows
def _xml_request_base(self, url, *args, **kwargs):
try:
response = self.requests.get(url, *args, **kwargs)
response.raise_for_status()
except requests.RequestException as e:
raise APIError('HTTP request failed', data={'exception': six.text_type(e)})
try:
root = ET.fromstring(response.content)
except ET.ParseError as e:
raise APIError('XML parsing failed', data={'exception': six.text_type(e)})
if root.tag != 'return':
raise APIError('root XML node is not return', data={'content': response.text[:1024]})
return root, response
def call_select_codifications(self):
root = self.xml_request(self.select_codifications_url)
categories = {}
for category in root.findall('CATEGORIES/CATEGORIES_ROW'):
code = category.find('CD_CAT_CODIF')
label = category.find('LB_CAT_CODIF')
if None in (code, label):
self.logger.warning('invalid category: %s', ET.tostring(category))
continue
categories[xmlutils.text_content(code)] = {
'label': xmlutils.text_content(label),
'codifications': []
}
for codification in root.findall('CODIFICATIONS/CODIFICATIONS_ROW'):
code = codification.find('CD_CODIF')
label = codification.find('LB_CODIF')
in_val = codification.find('IN_VAL_CODIF')
category_cod = codification.find('CD_CAT_CODIF')
if None in (code, label, category_cod):
self.logger.warning('invalid codification: %s', ET.tostring(codification))
continue
category_cod = xmlutils.text_content(category_cod)
if category_cod not in categories:
self.logger.warning('unknown category: %s', category_cod)
continue
categories[category_cod]['codifications'].append({
'code': xmlutils.text_content(code),
'label': xmlutils.text_content(label),
'enabled': xmlutils.text_content(in_val).strip().lower() == 'o' if in_val is not None else True,
})
return categories
def get_codifications(self):
cache = utils.RowLockedCache(
function=self.call_select_codifications,
row=self,
key_prefix='atos-genesys-codifications',
logger=self.logger)
return cache()
@endpoint(name='codifications',
description=_('List of codifications categories'))
def codifications(self, request):
codifications = self.get_codifications()
items = []
for code, category in codifications.items():
items.append({
'id': code,
'label': category['label'],
})
items.sort(key=lambda c: c['label'])
return {'data': items}
@endpoint(name='codifications',
pattern=r'^(?P<category>[\w-]+)/$',
example_pattern='{category}/',
description=_('List of codifications'),
parameters={
'category': {
'description': _('Category of codification'),
'example_value': u'MOT_APA',
}
})
def codifications_list(self, request, category):
codifications = self.get_codifications().get(category, {}).get('codifications', [])
items = [{
'id': codification['code'],
'text': codification['label']
} for codification in codifications]
return {'data': items}
def check_status(self):
return bool(self.call_select_codifications())
@property
def select_appairage_url(self):
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectAppairage')
def call_select_appairage(self, login, password, email):
row = self.xml_request(self.select_appairage_url, params={
'login': login,
'pwd': password,
'email': email,
})
row_d = xmlutils.to_json(row)
id_per = row_d.get('ID_PER', '').strip()
code = row_d.get('CD_RET', '').strip()
label = row_d.get('LB_RET', '').strip()
error = None
if code not in ['1', '2', '3', '4', '5', '6']:
error = 'invalid CD_RET: %s' % code,
if code in ['2', '3', '5'] and not id_per:
error = 'missing ID_PER'
if error:
raise APIError(error, data={'response': repr(ET.tostring(row))})
return code, label, id_per
@endpoint(name='link',
methods=['post'],
description=_('Create link with an extranet account'),
perm='can_access',
parameters={
'NameID':{
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'email': {
'description': _('Publik known email'),
'example_value': 'john.doe@example.com',
},
'login': {
'description': _('ATOS Genesys extranet login'),
'example_value': '1234',
},
'password': {
'description': _('ATOS Genesys extranet password'),
'example_value': 'password',
}
})
def link(self, request, NameID, email, login, password):
code, label, id_per = self.call_select_appairage(login, password, email)
if code in ['2', '3', '5']:
link, created = Link.objects.get_or_create(
resource=self,
name_id=NameID,
id_per=id_per)
return {'link_id': link.pk, 'new': created, 'code': code, 'label': label}
elif code == '6':
raise APIError('unknown-login', data={'code': code, 'label': label})
elif code in ['4', '1']:
raise APIError('invalid-password', data={'code': code, 'label': label})
@endpoint(name='unlink',
methods=['post'],
description=_('Delete link with an extranet account'),
perm='can_access',
parameters={
'NameID':{
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'link_id': {
'description': _('Identifier of the link'),
'example_value': '1',
},
})
def unlink(self, request, NameID, link_id):
try:
link_id = int(link_id.strip())
except ValueError:
raise APIError('invalid link_id')
qs = Link.objects.filter(
resource=self,
name_id=NameID,
pk=link_id)
count = qs.count()
qs.delete()
return {'deleted': count}
@property
def select_usager_url(self):
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectUsager')
def call_select_usager(self, id_per):
row = self.xml_request(self.select_usager_url, params={
'idPer': id_per,
'codRgp': self.cod_rgp,
})
return self._select_usager_row_to_json(row)
def _select_usager_row_to_json(self, row):
d = xmlutils.to_json(row)
# sort demandes and droits using COD_APPLI
demandes = {}
for demande in d.get('DEMANDES', []):
cod_appli = demande.get('COD_APPLI', '')
demandes.setdefault(cod_appli, []).append(demande)
d['DEMANDES'] = demandes
droits = {}
for droit in d.get('DROITS', []):
cod_appli = droit.get('COD_APPLI', '')
droits.setdefault(cod_appli, []).append(droit)
d['DROITS'] = droits
# create CIVILITE
for identification in d.get('IDENTIFICATION', []):
sexe = identification['SEXE']
identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe)
return d
@endpoint(name='dossiers',
description=_('Get datas for all links'),
perm='can_access',
parameters={
'NameID':{
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
})
def dossiers(self, request, NameID, link_id=None):
qs = Link.objects.filter(
resource=self,
name_id=NameID)
if link_id:
try:
link_id = int(link_id)
except ValueError:
raise APIError('invalid-link-id')
qs = qs.filter(id=link_id)
data = []
for link in qs:
cache = utils.RowLockedCache(
function=self.call_select_usager,
row=link,
key_prefix='atos-genesys-usager',
logger=self.logger)
dossier = cache(link.id_per)
# build text as "id_per - prenom - no
text_parts = [str(link.id_per), '-']
identifications = dossier.get('IDENTIFICATION') or [{}]
identification = identifications[0]
prenom = identification.get('PRENOM')
nom = identification.get('NOM')
if prenom:
text_parts.append(prenom.title())
if nom:
text_parts.append(nom.upper())
data.append({
'id': str(link.id),
'text': u' '.join(text_parts),
'id_per': link.id_per,
'dossier': dossier,
})
if link_id:
return {'data': data[0] if data else None}
return {'data': data}
@property
def select_usager_by_ref_url(self):
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectUsagerByRef')
def call_select_usager_by_ref(self, ref_per):
row = self.xml_request(self.select_usager_by_ref_url, params={
'refPer': ref_per,
'codRgp': self.cod_rgp,
})
return self._select_usager_row_to_json(row)
@property
def cherche_beneficiaire_url(self):
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/chercheBeneficiaire')
def call_cherche_beneficiaire(self, prenom, nom, dob):
rows = self.xml_request_multiple(self.cherche_beneficiaire_url, params={
'nmPer': nom,
'prPer': prenom,
'dtNaissance': dob.strftime('%d/%m/%Y'),
})
beneficiaires = [xmlutils.to_json(row) for row in rows]
return beneficiaires
@endpoint(name='search',
description=_('Search for beneficiaries'),
perm='can_access',
parameters={
'first_name': {
'description': _('Beneficiary first name'),
'example_value': 'John',
},
'last_name': {
'description': _('Beneficiary last name'),
'example_value': 'Doe',
},
'date_of_birth': {
'description': _('Beneficiary date of birth'),
'example_value': '1987-10-23',
}
})
def search(self, request, first_name, last_name, date_of_birth):
try:
date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d')
except (ValueError, TypeError):
raise APIError('invalid date_of_birth: %r' % date_of_birth)
beneficiaires = self.call_cherche_beneficiaire(
prenom=first_name,
nom=last_name,
dob=date_of_birth)
for beneficiaire in beneficiaires:
ref_per = beneficiaire.get('REF_PER')
if not ref_per:
continue
dossier = self.call_select_usager_by_ref(ref_per)
identification = dossier['IDENTIFICATION'][0]
beneficiaire['ID_PER'] = identification['ID_PER']
beneficiaire['TEL_FIXE'] = identification.get('TEL_FIXE', '')
beneficiaire['TEL_MOBILE'] = identification.get('TEL_MOBILE', '')
beneficiaire['MAIL'] = identification.get('MAIL', '')
return {'data': beneficiaires}
@endpoint(name='link-by-id-per',
methods=['post'],
description=_('Create link with an extranet account'),
perm='can_access',
parameters={
'NameID':{
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},
'id_per': {
'description': _('ATOS Genesys ID_PER'),
'example_value': '767676',
}
})
def link_by_id_per(self, request, NameID, id_per):
dossier = self.call_select_usager(id_per)
link, created = Link.objects.get_or_create(
resource=self,
name_id=NameID,
id_per=id_per)
return {'link_id': link.pk, 'new': created}
class Link(models.Model):
resource = models.ForeignKey(
Resource,
on_delete=models.CASCADE)
name_id = models.CharField(
verbose_name=_('NameID'),
blank=False,
max_length=256)
id_per = models.CharField(
verbose_name=_('ID Per'),
blank=False,
max_length=64)
created = models.DateTimeField(
verbose_name=_('Creation date'),
auto_now_add=True)
extra = jsonfield.JSONField(
verbose_name=_('Anything'),
null=True)
class Meta:
unique_together = (
'resource', 'name_id', 'id_per',
)
ordering = ['created']