passerelle/tests/test_atos_genesys.py

683 lines
26 KiB
Python

import os
import random
from unittest import mock
import pytest
import requests
from django.utils.http import urlencode
import tests.utils
from passerelle.apps.atos_genesys.models import Link, Resource
FAKE_URL = 'https://sirus.fr/'
@pytest.fixture
def genesys(db):
return tests.utils.make_resource(
Resource, title='Test 1', slug='test1', description='Connecteur de test', webservice_base_url=FAKE_URL
)
@pytest.fixture
def mock_codifications_ok():
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd:
response = fd.read()
with tests.utils.mock_url(FAKE_URL, response) as mock:
yield mock
def test_base_url_normalization(db):
# db is necessary because Resource.__init__ set resource.logger which does DB queries :/
resource = Resource(title='t', slug='t', description='t')
resource.webservice_base_url = 'http://localhost/WSUsagerPublik/services/PublikService/'
assert (
resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef'
)
resource.webservice_base_url = 'http://localhost/'
assert (
resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef'
)
def test_ws_categories(app, genesys, mock_codifications_ok):
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
def test_ws_codifications(app, genesys, mock_codifications_ok):
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
def test_ws_codifications_failure(app, genesys, mock_500):
from django.core.cache import cache
cache.clear()
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 1
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
<CD_RET>6</CD_RET>
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_unknown_appairage(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
with tests.utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
response = app.post(
url
+ '?'
+ urlencode(
{'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'}
)
)
assert response.json['err'] == 1
assert response.json['data']['code'] == '6'
assert response.json['data']['label']
RESPONSE_CREATED = '''<return><ROWSET><ROW>
<ID_PER>789</ID_PER>
<CD_RET>5</CD_RET>
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_created(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
assert Link.objects.count() == 0
with tests.utils.mock_url(FAKE_URL, RESPONSE_CREATED):
response = app.post(
url
+ '?'
+ urlencode(
{'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'}
)
)
link = Link.objects.latest('pk')
assert response.json['err'] == 0
assert response.json['link_id'] == link.pk
assert response.json['new']
assert Link.objects.filter(name_id='zob', id_per='789', resource=genesys).count() == 1
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
dossiers_url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
response = app.get(
dossiers_url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
)
url = tests.utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
response = app.post(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'link_id': response.json['data'][0]['id'],
}
)
)
assert response.json['err'] == 0
assert response.json['deleted'] == 1
assert Link.objects.count() == 0
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd:
RESPONSE_SELECT_USAGER = fd.read()
def test_ws_dossiers(app, genesys):
link = Link.objects.create(resource=genesys, name_id='zob', id_per='1234')
url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
)
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 1
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
link2 = Link.objects.create(resource=genesys, name_id='zob', id_per='4567')
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
)
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 2
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per
assert response.json['data'][1]['id_per'] == '4567'
assert response.json['data'][1]['dossier']
assert response.json['data'][1]['id'] == str(link2.id)
assert response.json['data'][1]['text'] == '%s - John DOE' % link2.id_per
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'link_id': link2.id,
}
)
)
assert response.json['err'] == 0
assert response.json['data']
assert response.json['data']['id_per'] == '4567'
assert response.json['data']['dossier']
assert response.json['data']['id'] == str(link2.id)
assert response.json['data']['text'] == '%s - John DOE' % link2.id_per
def test_ws_dossiers_timeout_error(app, genesys):
Link.objects.create(resource=genesys, name_id='zob', id_per='1234')
url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with mock.patch('passerelle.utils.Request.request') as request:
request.side_effect = requests.Timeout
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
)
assert response.json['err_desc'] == 'genesys is down'
def test_row_locked_cache(genesys, freezer):
import time
from passerelle.apps.atos_genesys.utils import RowLockedCache
freezer.move_to('2018-01-01 00:00:00')
link = Link.objects.create(resource=genesys, name_id='zob', id_per='4567')
class F:
calls = 0
value = 1
def __call__(self):
time.sleep(0.05)
self.calls += 1
return self.value
f = F()
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
value = rlc()
assert value == 1
assert f.calls == 1
freezer.move_to('2018-01-01 00:00:59')
for i in range(5):
assert rlc() == 1
assert f.calls == 1
# Check that with cache update f() is called only once again
freezer.move_to('2018-01-01 00:02:00')
F.value = 2
counter = 0
while rlc() == 1 or counter < 5:
counter += 1
assert rlc() == 2
assert f.calls == 2
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
<ROW num="2">
<NOMPER>Johnny</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
<return><ROWSET>
<ROW num="1">
<IDENTIFICATION>
<IDENTIFICATION_ROW num="1">
<ID_PER>1234</ID_PER>
</IDENTIFICATION_ROW>
</IDENTIFICATION>
</ROW>
</ROWSET></return>'''
def test_ws_search(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'NïCe',
}
)
)
assert response.json['err'] == 0
assert response.json['already_paired'] is False
assert response.json['link_id'] is None
assert len(response.json['data']) == 3
data = response.json['data']
assert data == [
{
'id': 'tel1',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'phone': '0655555555',
'prenom': 'John',
'text': 'par SMS vers 06*****555',
},
{
'id': 'tel2',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'phone': '0644444444',
'prenom': 'John',
'text': 'par SMS vers 06*****444',
},
{
'email': 'test@sirus.fr',
'id': 'email1',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'prenom': 'John',
'text': 'par courriel vers te***@***.fr',
},
]
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'Cassis',
}
)
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-02',
}
)
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER_NO_CONTACTS,
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
}
)
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no-contacts'
def test_ws_link_by_id_per(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link-by-id-per', slug=genesys.slug)
assert Link.objects.count() == 0
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.post(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'id_per': '1234',
}
)
)
assert response.json['err'] == 0
assert Link.objects.count() == 1
link = Link.objects.get()
data = response.json
assert data['new']
assert data['link_id'] == link.pk
url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'NameID': 'zob',
}
)
)
assert response.json['err'] == 0
assert response.json['already_paired'] is True
assert response.json['link_id'] == link.id
assert len(response.json['data']) == 3
def test_ws_dossier_by_pair(app, genesys):
url = '/atos-genesys/test1/dossier-by-pair/'
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER) as mock:
id_per = 57879
p2 = random.randint(100, 999)
p1 = id_per * p2
response = app.get(url + f'?p1={p1}&p2={p2}')
assert response.json == {
'data': {
'dossier': {
'DEMANDES': {
'AD': [
{
'CATEGORIE': 'Décision',
'CD_CODMES_MES': 'ADAPD ',
'CODE_CATEGORIE': 'DEC_APA',
'CODE_ETAPE': 'DEC_APA01',
'COD_APPLI': 'AD',
'DT_DMD': '08/12/2016',
'DT_DMD_ORDER': '2016-12-08 ' '00:00:00.0',
'DT_FIN': '31/12/2019',
'DT_MINI_DMD': '01/01/2015',
'ETAPE': 'Décision',
'GROUPE': 'BANDEAU',
'ID_MES': '1234',
'LISTE_ETAPES': [
{
'A_SURLIGNER': 'N',
'CATEGORIE': 'Instruction ' 'administrative ' 'de ' 'la ' 'demande',
'CODE_CATEGORIE': 'INDMD',
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '1',
'REF_EXT_MES': '1234',
},
{
'A_SURLIGNER': 'N',
'CATEGORIE': 'Évaluation ' 'médico-sociale ' 'de ' 'la ' 'demande',
'CODE_CATEGORIE': 'EVMSD',
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '2',
'REF_EXT_MES': '1234',
},
{
'A_SURLIGNER': 'O',
'CATEGORIE': 'Décision',
'CATEGORIE_ETP': 'Décision',
'CD_ETP': 'DEC_APA01',
'CODE_CATEGORIE': 'DEC_APA',
'ETP': [
{
'CODE_CHAMP': 'LABEL1',
'ETAPE': 'Décision',
'LIBELLE_CHAMP': 'Votre '
'demande '
'a '
'fait '
"l'objet "
"d'une "
'décision '
'qui '
'vous '
'sera '
'adressée, '
'prochainement, '
'par '
'voie '
'postale.',
'NUM_CHAMP': '1',
'TYP_CHAMP': 'L',
}
],
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LB_ETP': 'Décision',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '3',
'REF_EXT_MES': '1234',
},
],
'REF_EXT_MES': '1234',
'TYPE_MESURE': 'A.P.A. à Domicile - ' 'Révision',
}
]
},
'DEMARCHES': [{'COD_FORM': 'F_RNV_APA'}],
'DROITS': {
'PH': [
{
'COD_APPLI': 'PH',
'DATE_DEBUT': '01/01/2017',
'DATE_FIN': '31/12/2019',
'ID_PRE': '1234',
'LIBELLE': 'Un service prestataire ',
'PRESTATAIRE': 'CLUB AZUR SERVICES ',
'TYPPREST': 'Aide à domicile PA-PH',
}
]
},
'IDENTIFICATION': [
{
'CAISSE_RETRAITE': 'CRAM CAGNES SUR ' 'MER',
'CD_CMU_NAIS': '12345',
'CD_COD_VOIE_ADR': 'BLD',
'CD_PAY_ADR': '100 ',
'CD_SIT_FAM': 'SITFAM_C',
'CIVILITE': 'Madame',
'CMU_NAIS': 'NICE',
'CODE_POSTAL': '06000',
'CODPOS_NAIS': '12345',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA',
'CONJOINT': [
{
'CAISSE_RETRAITE': 'CRAM ' 'CAGNES ' 'SUR ' 'MER',
'CD_COD_VOIE_ADR': 'BLD',
'CODE_POSTAL': '06000',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'ALSACE ',
'DATE_NAISSANCE': '01/01/1960',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_VOIE_ADR': 'du ' 'Revestel',
'MAIL': 'test@sirus.fr',
'NATIONALITE': 'FRANCE',
'NOM': 'NOM conjoint',
'NOM_NAISSANCE': 'NOM ',
'NSS': '253077507300584',
'NUM_AFFILIATION': '123456789',
'NU_VOIE_ADR': '65',
'PRENOM': 'Prénom',
'TEL_FIXE': '06.61.75.69.51',
'TEL_MOBILE': '06.61.75.69.51',
'VILLE': 'NICE',
}
],
'CONTACTS': [
{
'CD_COD_VOIE_ADR': 'BLD',
'CODE_POSTAL': '06000',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_VOIE_ADR': 'GAMBETTA',
'MAIL': 'test@sirus.fr',
'NOM': 'NOM ',
'NU_VOIE_ADR': '65',
'PRENOM': 'Prénom',
'RELATION': 'TYP_CNT_AID',
'TEL_FIXE': '06.22.22.22.22',
'TEL_MOBILE': '06.11.11.11.11',
'VILLE': 'NICE',
}
],
'DATE_NAISSANCE': '01/01/1923',
'DPT_NAIS': 'ALPES-MARITMES',
'IDDOS': '12345',
'ID_PER': '1234',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_PAY_ADR': 'FRANCE',
'LB_VOIE_ADR': 'du Revestel',
'MAIL': 'test@sirus.fr',
'NATIONALITE': 'Française (Valeurs '
'Possibles\xa0: '
'Française, '
'Européenne,Autre)',
'NOM': 'DOE',
'NOM_NAISSANCE': 'TEST',
'NSS': '253077507312383',
'NUM_AFFILIATION': '123456789',
'NU_VOIE_ADR': '65',
'PAYS_NAIS': 'FRANCE',
'PRENOM': 'John',
'REF_PH_DOS': '123456',
'REF_UTI_DOS': '123456',
'SEXE': 'F',
'SIT_FAM': 'Célibat',
'TEL_FIXE': '06.44.44.44.44',
'TEL_MOBILE': '06.55.55.55.55',
'VILLE': 'NICE',
}
],
},
'id_per': 57879,
},
'err': 0,
}
assert f'idPer={id_per}&' in mock.handlers[0].call['requests'][0].url