523 lines
20 KiB
Python
523 lines
20 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2018 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import urlparse
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
import jsonfield
|
|
|
|
from django.db import models
|
|
from django.utils import six
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from passerelle.utils import xml as xmlutils
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.conversion import to_ascii
|
|
from passerelle.utils.jsonresponse import APIError
|
|
from passerelle.base.models import BaseResource, HTTPResource
|
|
|
|
from . import utils
|
|
|
|
|
|
class Resource(BaseResource, HTTPResource):
|
|
category = _('Business Process Connectors')
|
|
|
|
webservice_base_url = models.URLField(_('Webservice Base URL'))
|
|
cod_rgp = models.CharField(_('Code RGP'), max_length=64, default='RGP_PUB')
|
|
|
|
class Meta:
|
|
verbose_name = _('ATOS Genesys')
|
|
|
|
@property
|
|
def base_url(self):
|
|
if 'WSUsagerPublik' in self.webservice_base_url:
|
|
return self.webservice_base_url.split('WSUsagerPublik')[0]
|
|
return self.webservice_base_url
|
|
|
|
@property
|
|
def select_codifications_url(self):
|
|
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectCodifications')
|
|
|
|
def xml_request(self, url, *args, **kwargs):
|
|
root, response = self._xml_request_base(url, *args, **kwargs)
|
|
row = root.find('ROWSET/ROW')
|
|
if row is None:
|
|
raise APIError('no ROWSET/ROW node', data={'content': response.text[:1024]})
|
|
return row
|
|
|
|
def xml_request_multiple(self, url, *args, **kwargs):
|
|
root, response = self._xml_request_base(url, *args, **kwargs)
|
|
rowset = root.find('ROWSET')
|
|
if rowset is None:
|
|
raise APIError('no ROWSET node', data={'content': response.text[:1024]})
|
|
rows = rowset.findall('ROW')
|
|
return rows
|
|
|
|
def _xml_request_base(self, url, *args, **kwargs):
|
|
try:
|
|
response = self.requests.get(url, *args, **kwargs)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
raise APIError('HTTP request failed', data={'exception': six.text_type(e)})
|
|
try:
|
|
root = ET.fromstring(response.content)
|
|
except ET.ParseError as e:
|
|
raise APIError('XML parsing failed', data={'exception': six.text_type(e)})
|
|
if root.tag != 'return':
|
|
raise APIError('root XML node is not return', data={'content': response.text[:1024]})
|
|
return root, response
|
|
|
|
def call_select_codifications(self):
|
|
root = self.xml_request(self.select_codifications_url)
|
|
categories = {}
|
|
for category in root.findall('CATEGORIES/CATEGORIES_ROW'):
|
|
code = category.find('CD_CAT_CODIF')
|
|
label = category.find('LB_CAT_CODIF')
|
|
if None in (code, label):
|
|
self.logger.warning('invalid category: %s', ET.tostring(category))
|
|
continue
|
|
categories[xmlutils.text_content(code)] = {
|
|
'label': xmlutils.text_content(label),
|
|
'codifications': []
|
|
}
|
|
for codification in root.findall('CODIFICATIONS/CODIFICATIONS_ROW'):
|
|
code = codification.find('CD_CODIF')
|
|
label = codification.find('LB_CODIF')
|
|
in_val = codification.find('IN_VAL_CODIF')
|
|
category_cod = codification.find('CD_CAT_CODIF')
|
|
if None in (code, label, category_cod):
|
|
self.logger.warning('invalid codification: %s', ET.tostring(codification))
|
|
continue
|
|
category_cod = xmlutils.text_content(category_cod)
|
|
if category_cod not in categories:
|
|
self.logger.warning('unknown category: %s', category_cod)
|
|
continue
|
|
categories[category_cod]['codifications'].append({
|
|
'code': xmlutils.text_content(code),
|
|
'label': xmlutils.text_content(label),
|
|
'enabled': xmlutils.text_content(in_val).strip().lower() == 'o' if in_val is not None else True,
|
|
})
|
|
return categories
|
|
|
|
def get_codifications(self):
|
|
cache = utils.RowLockedCache(
|
|
function=self.call_select_codifications,
|
|
row=self,
|
|
key_prefix='atos-genesys-codifications',
|
|
logger=self.logger)
|
|
return cache()
|
|
|
|
@endpoint(name='codifications',
|
|
description=_('List of codifications categories'))
|
|
def codifications(self, request):
|
|
codifications = self.get_codifications()
|
|
|
|
items = []
|
|
for code, category in codifications.items():
|
|
items.append({
|
|
'id': code,
|
|
'label': category['label'],
|
|
})
|
|
items.sort(key=lambda c: c['label'])
|
|
return {'data': items}
|
|
|
|
@endpoint(name='codifications',
|
|
pattern=r'^(?P<category>[\w-]+)/$',
|
|
example_pattern='{category}/',
|
|
description=_('List of codifications'),
|
|
parameters={
|
|
'category': {
|
|
'description': _('Category of codification'),
|
|
'example_value': u'MOT_APA',
|
|
}
|
|
})
|
|
def codifications_list(self, request, category):
|
|
codifications = self.get_codifications().get(category, {}).get('codifications', [])
|
|
|
|
items = [{
|
|
'id': codification['code'],
|
|
'text': codification['label']
|
|
} for codification in codifications]
|
|
return {'data': items}
|
|
|
|
def check_status(self):
|
|
return bool(self.call_select_codifications())
|
|
|
|
@property
|
|
def select_appairage_url(self):
|
|
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectAppairage')
|
|
|
|
def call_select_appairage(self, login, password, email):
|
|
row = self.xml_request(self.select_appairage_url, params={
|
|
'login': login,
|
|
'pwd': password,
|
|
'email': email,
|
|
})
|
|
row_d = xmlutils.to_json(row)
|
|
id_per = row_d.get('ID_PER', '').strip()
|
|
code = row_d.get('CD_RET', '').strip()
|
|
label = row_d.get('LB_RET', '').strip()
|
|
|
|
error = None
|
|
if code not in ['1', '2', '3', '4', '5', '6']:
|
|
error = 'invalid CD_RET: %s' % code,
|
|
if code in ['2', '3', '5'] and not id_per:
|
|
error = 'missing ID_PER'
|
|
if error:
|
|
raise APIError(error, data={'response': repr(ET.tostring(row))})
|
|
return code, label, id_per
|
|
|
|
@endpoint(name='link',
|
|
methods=['post'],
|
|
description=_('Create link with an extranet account'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID':{
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'email': {
|
|
'description': _('Publik known email'),
|
|
'example_value': 'john.doe@example.com',
|
|
},
|
|
'login': {
|
|
'description': _('ATOS Genesys extranet login'),
|
|
'example_value': '1234',
|
|
},
|
|
'password': {
|
|
'description': _('ATOS Genesys extranet password'),
|
|
'example_value': 'password',
|
|
}
|
|
})
|
|
def link(self, request, NameID, email, login, password):
|
|
code, label, id_per = self.call_select_appairage(login, password, email)
|
|
if code in ['2', '3', '5']:
|
|
link, created = Link.objects.get_or_create(
|
|
resource=self,
|
|
name_id=NameID,
|
|
id_per=id_per)
|
|
return {'link_id': link.pk, 'new': created, 'code': code, 'label': label}
|
|
elif code == '6':
|
|
raise APIError('unknown-login', data={'code': code, 'label': label})
|
|
elif code in ['4', '1']:
|
|
raise APIError('invalid-password', data={'code': code, 'label': label})
|
|
|
|
@endpoint(name='unlink',
|
|
methods=['post'],
|
|
description=_('Delete link with an extranet account'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID':{
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'link_id': {
|
|
'description': _('Identifier of the link'),
|
|
'example_value': '1',
|
|
},
|
|
})
|
|
def unlink(self, request, NameID, link_id):
|
|
try:
|
|
link_id = int(link_id.strip())
|
|
except ValueError:
|
|
raise APIError('invalid link_id')
|
|
|
|
qs = Link.objects.filter(
|
|
resource=self,
|
|
name_id=NameID,
|
|
pk=link_id)
|
|
count = qs.count()
|
|
qs.delete()
|
|
return {'deleted': count}
|
|
|
|
@property
|
|
def select_usager_url(self):
|
|
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectUsager')
|
|
|
|
def call_select_usager(self, id_per):
|
|
row = self.xml_request(self.select_usager_url, params={
|
|
'idPer': id_per,
|
|
'codRgp': self.cod_rgp,
|
|
})
|
|
return self._select_usager_row_to_json(row)
|
|
|
|
def _select_usager_row_to_json(self, row):
|
|
d = xmlutils.to_json(row)
|
|
# sort demandes and droits using COD_APPLI
|
|
demandes = {}
|
|
for demande in d.get('DEMANDES', []):
|
|
cod_appli = demande.get('COD_APPLI', '')
|
|
demandes.setdefault(cod_appli, []).append(demande)
|
|
d['DEMANDES'] = demandes
|
|
droits = {}
|
|
for droit in d.get('DROITS', []):
|
|
cod_appli = droit.get('COD_APPLI', '')
|
|
droits.setdefault(cod_appli, []).append(droit)
|
|
d['DROITS'] = droits
|
|
# create CIVILITE
|
|
for identification in d.get('IDENTIFICATION', []):
|
|
sexe = identification.get('SEXE', '')
|
|
identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe, '')
|
|
return d
|
|
|
|
@endpoint(name='dossiers',
|
|
description=_('Get datas for all links'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID':{
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
})
|
|
def dossiers(self, request, NameID, link_id=None):
|
|
qs = Link.objects.filter(
|
|
resource=self,
|
|
name_id=NameID)
|
|
if link_id:
|
|
try:
|
|
link_id = int(link_id)
|
|
except ValueError:
|
|
raise APIError('invalid-link-id')
|
|
qs = qs.filter(id=link_id)
|
|
data = []
|
|
for link in qs:
|
|
cache = utils.RowLockedCache(
|
|
function=self.call_select_usager,
|
|
row=link,
|
|
key_prefix='atos-genesys-usager',
|
|
logger=self.logger)
|
|
dossier = cache(link.id_per)
|
|
# build text as "id_per - prenom - no
|
|
text_parts = [str(link.id_per), '-']
|
|
identifications = dossier.get('IDENTIFICATION') or [{}]
|
|
identification = identifications[0]
|
|
prenom = identification.get('PRENOM')
|
|
nom = identification.get('NOM')
|
|
if prenom:
|
|
text_parts.append(prenom.title())
|
|
if nom:
|
|
text_parts.append(nom.upper())
|
|
data.append({
|
|
'id': str(link.id),
|
|
'text': u' '.join(text_parts),
|
|
'id_per': link.id_per,
|
|
'dossier': dossier,
|
|
})
|
|
if link_id:
|
|
return {'data': data[0] if data else None}
|
|
return {'data': data}
|
|
|
|
@property
|
|
def select_usager_by_ref_url(self):
|
|
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/selectUsagerByRef')
|
|
|
|
def call_select_usager_by_ref(self, ref_per):
|
|
row = self.xml_request(self.select_usager_by_ref_url, params={
|
|
'refPer': ref_per,
|
|
'codRgp': self.cod_rgp,
|
|
})
|
|
return self._select_usager_row_to_json(row)
|
|
|
|
@property
|
|
def cherche_beneficiaire_url(self):
|
|
return urlparse.urljoin(self.base_url, 'WSUsagerPublik/services/PublikService/chercheBeneficiaire')
|
|
|
|
def call_cherche_beneficiaire(self, prenom, nom, dob):
|
|
rows = self.xml_request_multiple(self.cherche_beneficiaire_url, params={
|
|
'nmPer': nom,
|
|
'prPer': prenom,
|
|
'dtNaissance': dob.strftime('%d/%m/%Y'),
|
|
})
|
|
beneficiaires = [xmlutils.to_json(row) for row in rows]
|
|
return beneficiaires
|
|
|
|
@endpoint(name='search',
|
|
description=_('Search for beneficiaries'),
|
|
perm='can_access',
|
|
parameters={
|
|
'first_name': {
|
|
'description': _('Beneficiary first name'),
|
|
'example_value': 'John',
|
|
},
|
|
'last_name': {
|
|
'description': _('Beneficiary last name'),
|
|
'example_value': 'Doe',
|
|
},
|
|
'date_of_birth': {
|
|
'description': _('Beneficiary date of birth'),
|
|
'example_value': '1987-10-23',
|
|
}
|
|
})
|
|
def search(self, request, first_name, last_name, date_of_birth, NameID=None, commune_naissance=None):
|
|
try:
|
|
date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d').date()
|
|
except (ValueError, TypeError):
|
|
raise APIError('invalid date_of_birth: %r' % date_of_birth)
|
|
if commune_naissance:
|
|
# convert commune_naissance to ASCII
|
|
commune_naissance = to_ascii(commune_naissance).lower()
|
|
beneficiaires = self.call_cherche_beneficiaire(
|
|
prenom=first_name,
|
|
nom=last_name,
|
|
dob=date_of_birth)
|
|
data = []
|
|
dossiers = []
|
|
# get dossiers of found beneficiaries
|
|
for beneficiaire in beneficiaires:
|
|
id_per = beneficiaire.get('ID_PER')
|
|
if not id_per:
|
|
self.logger.warning('no ID_PER')
|
|
continue
|
|
try:
|
|
dob = beneficiaire['DATE_NAISSANCE']
|
|
except KeyError:
|
|
self.logger.warning('id_per %s: no DATE_NAISSANCE', id_per)
|
|
continue
|
|
try:
|
|
dob = datetime.datetime.strptime(dob, '%d/%m/%Y').date()
|
|
except (ValueError, TypeError):
|
|
self.logger.warning('id_per %s: invalid DATE_NAISSANCE', id_per)
|
|
continue
|
|
if dob != date_of_birth:
|
|
self.logger.debug('ignoring id_per %s different dob %s != %s', id_per, dob, date_of_birth)
|
|
continue
|
|
dossier = self.call_select_usager(id_per)
|
|
try:
|
|
identification = dossier['IDENTIFICATION'][0]
|
|
except KeyError:
|
|
self.logger.debug('id_per %s: dossier is empty', id_per)
|
|
continue
|
|
if not identification['ID_PER'] == id_per:
|
|
self.logger.warning('id_per %s: ID_PER differs', id_per)
|
|
continue
|
|
if commune_naissance:
|
|
cmu_nais = to_ascii(identification.get('CMU_NAIS', '')).lower()
|
|
if cmu_nais and commune_naissance != cmu_nais:
|
|
self.logger.debug(u'id_per %s: CMU_NAIS(%s) does not match commune_naissance(%s)',
|
|
id_per, cmu_nais, commune_naissance)
|
|
continue
|
|
dossiers.append(dossier)
|
|
|
|
# there must be only one
|
|
if len(dossiers) == 0:
|
|
raise APIError('not-found')
|
|
if len(dossiers) > 1:
|
|
raise APIError('too-many')
|
|
|
|
# get contact informations
|
|
identification = dossiers[0]['IDENTIFICATION'][0]
|
|
id_per = identification['ID_PER']
|
|
nom = identification.get('NOM', '')
|
|
prenom = identification.get('PRENOM', '')
|
|
nom_naissance = identification.get('NOM_NAISSANCE', '')
|
|
tel1 = ''.join(c for c in identification.get('TEL_MOBILE', '') if c.isdigit())
|
|
tel2 = ''.join(c for c in identification.get('TEL_FIXE', '') if c.isdigit())
|
|
email = identification.get('MAIL', '').strip()
|
|
if tel1 and tel1[:2] in ('06', '07'):
|
|
data.append({
|
|
'id': 'tel1',
|
|
'text': 'par SMS vers ' + tel1[:2] + '*****' + tel1[-3:],
|
|
'phone': tel1,
|
|
|
|
'id_per': id_per,
|
|
'nom': nom,
|
|
'prenom': prenom,
|
|
'nom_naissance': nom_naissance,
|
|
})
|
|
if tel2 and tel2[:2] in ('06', '07'):
|
|
data.append({
|
|
'id': 'tel2',
|
|
'text': 'par SMS vers ' + tel2[:2] + '*****' + tel2[-3:],
|
|
'phone': tel2,
|
|
|
|
'id_per': id_per,
|
|
'nom': nom,
|
|
'prenom': prenom,
|
|
'nom_naissance': nom_naissance,
|
|
})
|
|
if email:
|
|
data.append({
|
|
'id': 'email1',
|
|
'text': 'par courriel vers ' + email[:2] + '***@***' + email[-3:],
|
|
'email': email,
|
|
|
|
'id_per': id_per,
|
|
'nom': nom,
|
|
'prenom': prenom,
|
|
'nom_naissance': nom_naissance,
|
|
})
|
|
if len(data) == 0:
|
|
self.logger.debug('id_per %s: no contact information, ignored', id_per)
|
|
raise APIError('no-contacts')
|
|
try:
|
|
link = NameID and Link.objects.get(resource=self, name_id=NameID, id_per=id_per)
|
|
except Link.DoesNotExist:
|
|
link = None
|
|
return {
|
|
'data': data,
|
|
'already_paired': link is not None,
|
|
'link_id': link and link.id,
|
|
}
|
|
|
|
@endpoint(name='link-by-id-per',
|
|
methods=['post'],
|
|
description=_('Create link with an extranet account'),
|
|
perm='can_access',
|
|
parameters={
|
|
'NameID': {
|
|
'description': _('Publik NameID'),
|
|
'example_value': 'xyz24d934',
|
|
},
|
|
'id_per': {
|
|
'description': _('ATOS Genesys ID_PER'),
|
|
'example_value': '767676',
|
|
}
|
|
})
|
|
def link_by_id_per(self, request, NameID, id_per):
|
|
dossier = self.call_select_usager(id_per)
|
|
link, created = Link.objects.get_or_create(
|
|
resource=self,
|
|
name_id=NameID,
|
|
id_per=id_per)
|
|
return {'link_id': link.pk, 'new': created}
|
|
|
|
|
|
class Link(models.Model):
|
|
resource = models.ForeignKey(
|
|
Resource,
|
|
on_delete=models.CASCADE)
|
|
name_id = models.CharField(
|
|
verbose_name=_('NameID'),
|
|
blank=False,
|
|
max_length=256)
|
|
id_per = models.CharField(
|
|
verbose_name=_('ID Per'),
|
|
blank=False,
|
|
max_length=64)
|
|
created = models.DateTimeField(
|
|
verbose_name=_('Creation date'),
|
|
auto_now_add=True)
|
|
extra = jsonfield.JSONField(
|
|
verbose_name=_('Anything'),
|
|
null=True)
|
|
|
|
class Meta:
|
|
unique_together = (
|
|
'resource', 'name_id', 'id_per',
|
|
)
|
|
ordering = ['created']
|