passerelle/tests/test_atos_genesys.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

683 lines
26 KiB
Python
Raw Permalink Normal View History

import os
import random
from unittest import mock
import pytest
import requests
from django.utils.http import urlencode
import tests.utils
from passerelle.apps.atos_genesys.models import Link, Resource
FAKE_URL = 'https://sirus.fr/'
@pytest.fixture
def genesys(db):
return tests.utils.make_resource(
Resource, title='Test 1', slug='test1', description='Connecteur de test', webservice_base_url=FAKE_URL
)
@pytest.fixture
def mock_codifications_ok():
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd:
response = fd.read()
with tests.utils.mock_url(FAKE_URL, response) as mock:
yield mock
def test_base_url_normalization(db):
# db is necessary because Resource.__init__ set resource.logger which does DB queries :/
resource = Resource(title='t', slug='t', description='t')
resource.webservice_base_url = 'http://localhost/WSUsagerPublik/services/PublikService/'
assert (
resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef'
)
resource.webservice_base_url = 'http://localhost/'
assert (
resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef'
)
def test_ws_categories(app, genesys, mock_codifications_ok):
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
def test_ws_codifications(app, genesys, mock_codifications_ok):
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
def test_ws_codifications_failure(app, genesys, mock_500):
from django.core.cache import cache
2021-02-20 16:26:01 +01:00
cache.clear()
url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 1
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
<CD_RET>6</CD_RET>
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_unknown_appairage(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
with tests.utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
response = app.post(
url
+ '?'
+ urlencode(
{'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'}
2021-02-20 16:26:01 +01:00
)
)
assert response.json['err'] == 1
assert response.json['data']['code'] == '6'
assert response.json['data']['label']
RESPONSE_CREATED = '''<return><ROWSET><ROW>
<ID_PER>789</ID_PER>
<CD_RET>5</CD_RET>
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_created(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
assert Link.objects.count() == 0
with tests.utils.mock_url(FAKE_URL, RESPONSE_CREATED):
response = app.post(
url
+ '?'
+ urlencode(
{'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'}
2021-02-20 16:26:01 +01:00
)
)
link = Link.objects.latest('pk')
assert response.json['err'] == 0
assert response.json['link_id'] == link.pk
assert response.json['new']
assert Link.objects.filter(name_id='zob', id_per='789', resource=genesys).count() == 1
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
dossiers_url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
response = app.get(
dossiers_url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
2021-02-20 16:26:01 +01:00
)
url = tests.utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
response = app.post(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'link_id': response.json['data'][0]['id'],
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['deleted'] == 1
assert Link.objects.count() == 0
2021-02-20 16:26:01 +01:00
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd:
RESPONSE_SELECT_USAGER = fd.read()
def test_ws_dossiers(app, genesys):
link = Link.objects.create(resource=genesys, name_id='zob', id_per='1234')
url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 1
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
link2 = Link.objects.create(resource=genesys, name_id='zob', id_per='4567')
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 2
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per
assert response.json['data'][1]['id_per'] == '4567'
assert response.json['data'][1]['dossier']
assert response.json['data'][1]['id'] == str(link2.id)
assert response.json['data'][1]['text'] == '%s - John DOE' % link2.id_per
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'link_id': link2.id,
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['data']
assert response.json['data']['id_per'] == '4567'
assert response.json['data']['dossier']
assert response.json['data']['id'] == str(link2.id)
assert response.json['data']['text'] == '%s - John DOE' % link2.id_per
def test_ws_dossiers_timeout_error(app, genesys):
Link.objects.create(resource=genesys, name_id='zob', id_per='1234')
url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with mock.patch('passerelle.utils.Request.request') as request:
request.side_effect = requests.Timeout
response = app.get(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
}
)
)
assert response.json['err_desc'] == 'genesys is down'
def test_row_locked_cache(genesys, freezer):
import time
2021-05-07 11:53:34 +02:00
from passerelle.apps.atos_genesys.utils import RowLockedCache
freezer.move_to('2018-01-01 00:00:00')
link = Link.objects.create(resource=genesys, name_id='zob', id_per='4567')
class F:
calls = 0
value = 1
def __call__(self):
time.sleep(0.05)
self.calls += 1
return self.value
2021-02-20 16:26:01 +01:00
f = F()
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
value = rlc()
assert value == 1
assert f.calls == 1
freezer.move_to('2018-01-01 00:00:59')
for i in range(5):
assert rlc() == 1
assert f.calls == 1
# Check that with cache update f() is called only once again
freezer.move_to('2018-01-01 00:02:00')
F.value = 2
counter = 0
while rlc() == 1 or counter < 5:
counter += 1
assert rlc() == 2
assert f.calls == 2
2021-02-20 16:26:01 +01:00
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
<ROW num="2">
<NOMPER>Johnny</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
<return><ROWSET>
<ROW num="1">
<IDENTIFICATION>
<IDENTIFICATION_ROW num="1">
<ID_PER>1234</ID_PER>
</IDENTIFICATION_ROW>
</IDENTIFICATION>
</ROW>
</ROWSET></return>'''
def test_ws_search(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'NïCe',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['already_paired'] is False
assert response.json['link_id'] is None
assert len(response.json['data']) == 3
data = response.json['data']
assert data == [
{
'id': 'tel1',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'phone': '0655555555',
'prenom': 'John',
'text': 'par SMS vers 06*****555',
},
{
'id': 'tel2',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'phone': '0644444444',
'prenom': 'John',
'text': 'par SMS vers 06*****444',
},
{
'email': 'test@sirus.fr',
'id': 'email1',
'id_per': '1234',
'nom': 'DOE',
'nom_naissance': 'TEST',
'prenom': 'John',
'text': 'par courriel vers te***@***.fr',
},
]
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'Cassis',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-02',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER_NO_CONTACTS,
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no-contacts'
def test_ws_link_by_id_per(app, genesys):
url = tests.utils.generic_endpoint_url('atos-genesys', 'link-by-id-per', slug=genesys.slug)
assert Link.objects.count() == 0
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.post(
url
+ '?'
+ urlencode(
{
'NameID': 'zob',
'id_per': '1234',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert Link.objects.count() == 1
link = Link.objects.get()
data = response.json
assert data['new']
assert data['link_id'] == link.pk
url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH
):
with tests.utils.mock_url(
FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER
):
response = app.get(
url
+ '?'
+ urlencode(
{
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'NameID': 'zob',
}
)
2021-02-20 16:26:01 +01:00
)
assert response.json['err'] == 0
assert response.json['already_paired'] is True
assert response.json['link_id'] == link.id
assert len(response.json['data']) == 3
def test_ws_dossier_by_pair(app, genesys):
url = '/atos-genesys/test1/dossier-by-pair/'
with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER) as mock:
id_per = 57879
p2 = random.randint(100, 999)
p1 = id_per * p2
response = app.get(url + f'?p1={p1}&p2={p2}')
assert response.json == {
'data': {
'dossier': {
'DEMANDES': {
'AD': [
{
'CATEGORIE': 'Décision',
'CD_CODMES_MES': 'ADAPD ',
'CODE_CATEGORIE': 'DEC_APA',
'CODE_ETAPE': 'DEC_APA01',
'COD_APPLI': 'AD',
'DT_DMD': '08/12/2016',
'DT_DMD_ORDER': '2016-12-08 ' '00:00:00.0',
'DT_FIN': '31/12/2019',
'DT_MINI_DMD': '01/01/2015',
'ETAPE': 'Décision',
'GROUPE': 'BANDEAU',
'ID_MES': '1234',
'LISTE_ETAPES': [
{
'A_SURLIGNER': 'N',
'CATEGORIE': 'Instruction ' 'administrative ' 'de ' 'la ' 'demande',
'CODE_CATEGORIE': 'INDMD',
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '1',
'REF_EXT_MES': '1234',
},
{
'A_SURLIGNER': 'N',
'CATEGORIE': 'Évaluation ' 'médico-sociale ' 'de ' 'la ' 'demande',
'CODE_CATEGORIE': 'EVMSD',
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '2',
'REF_EXT_MES': '1234',
},
{
'A_SURLIGNER': 'O',
'CATEGORIE': 'Décision',
'CATEGORIE_ETP': 'Décision',
'CD_ETP': 'DEC_APA01',
'CODE_CATEGORIE': 'DEC_APA',
'ETP': [
{
'CODE_CHAMP': 'LABEL1',
'ETAPE': 'Décision',
'LIBELLE_CHAMP': 'Votre '
'demande '
'a '
'fait '
"l'objet "
"d'une "
'décision '
'qui '
'vous '
'sera '
'adressée, '
'prochainement, '
'par '
'voie '
'postale.',
'NUM_CHAMP': '1',
'TYP_CHAMP': 'L',
}
],
'GROUPE': 'BANDEAU',
'ID_EGST_MES': '1234',
'LB_ETP': 'Décision',
'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision',
'NUM_CAT': '3',
'REF_EXT_MES': '1234',
},
],
'REF_EXT_MES': '1234',
'TYPE_MESURE': 'A.P.A. à Domicile - ' 'Révision',
}
]
},
'DEMARCHES': [{'COD_FORM': 'F_RNV_APA'}],
'DROITS': {
'PH': [
{
'COD_APPLI': 'PH',
'DATE_DEBUT': '01/01/2017',
'DATE_FIN': '31/12/2019',
'ID_PRE': '1234',
'LIBELLE': 'Un service prestataire ',
'PRESTATAIRE': 'CLUB AZUR SERVICES ',
'TYPPREST': 'Aide à domicile PA-PH',
}
]
},
'IDENTIFICATION': [
{
'CAISSE_RETRAITE': 'CRAM CAGNES SUR ' 'MER',
'CD_CMU_NAIS': '12345',
'CD_COD_VOIE_ADR': 'BLD',
'CD_PAY_ADR': '100 ',
'CD_SIT_FAM': 'SITFAM_C',
'CIVILITE': 'Madame',
'CMU_NAIS': 'NICE',
'CODE_POSTAL': '06000',
'CODPOS_NAIS': '12345',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA',
'CONJOINT': [
{
'CAISSE_RETRAITE': 'CRAM ' 'CAGNES ' 'SUR ' 'MER',
'CD_COD_VOIE_ADR': 'BLD',
'CODE_POSTAL': '06000',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'ALSACE ',
'DATE_NAISSANCE': '01/01/1960',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_VOIE_ADR': 'du ' 'Revestel',
'MAIL': 'test@sirus.fr',
'NATIONALITE': 'FRANCE',
'NOM': 'NOM conjoint',
'NOM_NAISSANCE': 'NOM ',
'NSS': '253077507300584',
'NUM_AFFILIATION': '123456789',
'NU_VOIE_ADR': '65',
'PRENOM': 'Prénom',
'TEL_FIXE': '06.61.75.69.51',
'TEL_MOBILE': '06.61.75.69.51',
'VILLE': 'NICE',
}
],
'CONTACTS': [
{
'CD_COD_VOIE_ADR': 'BLD',
'CODE_POSTAL': '06000',
'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_VOIE_ADR': 'GAMBETTA',
'MAIL': 'test@sirus.fr',
'NOM': 'NOM ',
'NU_VOIE_ADR': '65',
'PRENOM': 'Prénom',
'RELATION': 'TYP_CNT_AID',
'TEL_FIXE': '06.22.22.22.22',
'TEL_MOBILE': '06.11.11.11.11',
'VILLE': 'NICE',
}
],
'DATE_NAISSANCE': '01/01/1923',
'DPT_NAIS': 'ALPES-MARITMES',
'IDDOS': '12345',
'ID_PER': '1234',
'LB_COD_BTQ_ADR': 'Bis',
'LB_COD_VOIE_ADR': 'Boulevard',
'LB_PAY_ADR': 'FRANCE',
'LB_VOIE_ADR': 'du Revestel',
'MAIL': 'test@sirus.fr',
'NATIONALITE': 'Française (Valeurs '
'Possibles\xa0: '
'Française, '
'Européenne,Autre)',
'NOM': 'DOE',
'NOM_NAISSANCE': 'TEST',
'NSS': '253077507312383',
'NUM_AFFILIATION': '123456789',
'NU_VOIE_ADR': '65',
'PAYS_NAIS': 'FRANCE',
'PRENOM': 'John',
'REF_PH_DOS': '123456',
'REF_UTI_DOS': '123456',
'SEXE': 'F',
'SIT_FAM': 'Célibat',
'TEL_FIXE': '06.44.44.44.44',
'TEL_MOBILE': '06.55.55.55.55',
'VILLE': 'NICE',
}
],
},
'id_per': 57879,
},
'err': 0,
}
assert f'idPer={id_per}&' in mock.handlers[0].call['requests'][0].url