atos_genesys: make search endpoint a datasource of contact informations (#33492)

The datasource is only provisionned if the search find one and only one
dossier, otherwise it's empty. We also return if for the given NameID
the person is already paired.
This commit is contained in:
Benjamin Dauvergne 2019-05-28 14:38:54 +02:00
parent 46235973f0
commit b4aad9cd5f
2 changed files with 230 additions and 31 deletions

View File

@ -25,8 +25,9 @@ from django.utils import six
from django.utils.translation import ugettext_lazy as _
from passerelle.utils import xml as xmlutils
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import to_ascii
from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import BaseResource, HTTPResource
from . import utils
@ -269,8 +270,8 @@ class Resource(BaseResource, HTTPResource):
d['DROITS'] = droits
# create CIVILITE
for identification in d.get('IDENTIFICATION', []):
sexe = identification['SEXE']
identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe)
sexe = identification.get('SEXE', '')
identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe, '')
return d
@endpoint(name='dossiers',
@ -361,33 +362,123 @@ class Resource(BaseResource, HTTPResource):
'example_value': '1987-10-23',
}
})
def search(self, request, first_name, last_name, date_of_birth):
def search(self, request, first_name, last_name, date_of_birth, NameID=None, commune_naissance=None):
try:
date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d')
date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d').date()
except (ValueError, TypeError):
raise APIError('invalid date_of_birth: %r' % date_of_birth)
if commune_naissance:
# convert commune_naissance to ASCII
commune_naissance = to_ascii(commune_naissance).lower()
beneficiaires = self.call_cherche_beneficiaire(
prenom=first_name,
nom=last_name,
dob=date_of_birth)
data = []
dossiers = []
# get dossiers of found beneficiaries
for beneficiaire in beneficiaires:
ref_per = beneficiaire.get('REF_PER')
if not ref_per:
id_per = beneficiaire.get('ID_PER')
if not id_per:
self.logger.warning('no ID_PER')
continue
dossier = self.call_select_usager_by_ref(ref_per)
identification = dossier['IDENTIFICATION'][0]
beneficiaire['ID_PER'] = identification['ID_PER']
beneficiaire['TEL_FIXE'] = identification.get('TEL_FIXE', '')
beneficiaire['TEL_MOBILE'] = identification.get('TEL_MOBILE', '')
beneficiaire['MAIL'] = identification.get('MAIL', '')
return {'data': beneficiaires}
try:
dob = beneficiaire['DATE_NAISSANCE']
except KeyError:
self.logger.warning('id_per %s: no DATE_NAISSANCE', id_per)
continue
try:
dob = datetime.datetime.strptime(dob, '%d/%m/%Y').date()
except (ValueError, TypeError):
self.logger.warning('id_per %s: invalid DATE_NAISSANCE', id_per)
continue
if dob != date_of_birth:
self.logger.debug('ignoring id_per %s different dob %s != %s', id_per, dob, date_of_birth)
continue
dossier = self.call_select_usager(id_per)
try:
identification = dossier['IDENTIFICATION'][0]
except KeyError:
self.logger.debug('id_per %s: dossier is empty', id_per)
continue
if not identification['ID_PER'] == id_per:
self.logger.warning('id_per %s: ID_PER differs', id_per)
continue
if commune_naissance:
cmu_nais = to_ascii(identification.get('CMU_NAIS', '')).lower()
if cmu_nais and commune_naissance != cmu_nais:
self.logger.debug(u'id_per %s: CMU_NAIS(%s) does not match commune_naissance(%s)',
id_per, cmu_nais, commune_naissance)
continue
dossiers.append(dossier)
# there must be only one
if len(dossiers) == 0:
raise APIError('not-found')
if len(dossiers) > 1:
raise APIError('too-many')
# get contact informations
identification = dossiers[0]['IDENTIFICATION'][0]
id_per = identification['ID_PER']
nom = identification.get('NOM', '')
prenom = identification.get('PRENOM', '')
nom_naissance = identification.get('NOM_NAISSANCE', '')
tel1 = ''.join(c for c in identification.get('TEL_MOBILE', '') if c.isdigit())
tel2 = ''.join(c for c in identification.get('TEL_FIXE', '') if c.isdigit())
email = identification.get('MAIL', '').strip()
if tel1 and tel1[:2] in ('06', '07'):
data.append({
'id': 'tel1',
'text': 'par SMS vers ' + tel1[:2] + '*****' + tel1[-3:],
'phone': tel1,
'id_per': id_per,
'nom': nom,
'prenom': prenom,
'nom_naissance': nom_naissance,
})
if tel2 and tel2[:2] in ('06', '07'):
data.append({
'id': 'tel2',
'text': 'par SMS vers ' + tel2[:2] + '*****' + tel2[-3:],
'phone': tel2,
'id_per': id_per,
'nom': nom,
'prenom': prenom,
'nom_naissance': nom_naissance,
})
if email:
data.append({
'id': 'email1',
'text': 'par courriel vers ' + email[:2] + '***@***' + email[-3:],
'email': email,
'id_per': id_per,
'nom': nom,
'prenom': prenom,
'nom_naissance': nom_naissance,
})
if len(data) == 0:
self.logger.debug('id_per %s: no contact information, ignored', id_per)
raise APIError('no-contacts')
try:
link = NameID and Link.objects.get(resource=self, name_id=NameID, id_per=id_per)
except Link.DoesNotExist:
link = None
return {
'data': data,
'already_paired': link is not None,
'link_id': link and link.id,
}
@endpoint(name='link-by-id-per',
methods=['post'],
description=_('Create link with an extranet account'),
perm='can_access',
parameters={
'NameID':{
'NameID': {
'description': _('Publik NameID'),
'example_value': 'xyz24d934',
},

View File

@ -231,11 +231,7 @@ def test_row_locked_cache(genesys, freezer):
assert rlc() == 2
assert f.calls == 2
def test_ws_search(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
@ -243,29 +239,125 @@ def test_ws_search(app, genesys):
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
<ROW num="2">
<NOMPER>Johnny</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
<return><ROWSET>
<ROW num="1">
<IDENTIFICATION>
<IDENTIFICATION_ROW num="1">
<ID_PER>1234</ID_PER>
</IDENTIFICATION_ROW>
</IDENTIFICATION>
</ROW>
</ROWSET></return>'''
def test_ws_search(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsagerByRef',
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'NïCe',
}))
assert response.json['err'] == 0
assert len(response.json['data']) == 1
data = response.json['data'][0]
assert data['REF_PER'] == '951858'
assert data['ID_PER'] == '1234'
assert data['NOMPER'] == 'John'
assert data['PRENOMPER'] == 'Doe'
assert data['DATE_NAISSANCE'] == '01/01/1925'
assert data['TEL_FIXE'] == '06.44.44.44.44'
assert data['TEL_MOBILE'] == '06.55.55.55.55'
assert data['MAIL'] == 'test@sirus.fr'
assert response.json['already_paired'] is False
assert response.json['link_id'] is None
assert len(response.json['data']) == 3
data = response.json['data']
assert data == [
{
u'id': u'tel1',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'phone': u'0655555555',
u'prenom': u'John',
u'text': u'par SMS vers 06*****555'},
{
u'id': u'tel2',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'phone': u'0644444444',
u'prenom': u'John',
u'text': u'par SMS vers 06*****444'
},
{
u'email': u'test@sirus.fr',
u'id': u'email1',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'prenom': u'John',
u'text': u'par courriel vers te***@***.fr'
}
]
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'Cassis',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-02',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire',
RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER_NO_CONTACTS):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no-contacts'
def test_ws_link_by_id_per(app, genesys):
@ -285,3 +377,19 @@ def test_ws_link_by_id_per(app, genesys):
data = response.json
assert data['new']
assert data['link_id'] == link.pk
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['already_paired'] is True
assert response.json['link_id'] == link.id
assert len(response.json['data']) == 3