import os import random from unittest import mock import pytest import requests from django.utils.http import urlencode import tests.utils from passerelle.apps.atos_genesys.models import Link, Resource FAKE_URL = 'https://sirus.fr/' @pytest.fixture def genesys(db): return tests.utils.make_resource( Resource, title='Test 1', slug='test1', description='Connecteur de test', webservice_base_url=FAKE_URL ) @pytest.fixture def mock_codifications_ok(): with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd: response = fd.read() with tests.utils.mock_url(FAKE_URL, response) as mock: yield mock def test_base_url_normalization(db): # db is necessary because Resource.__init__ set resource.logger which does DB queries :/ resource = Resource(title='t', slug='t', description='t') resource.webservice_base_url = 'http://localhost/WSUsagerPublik/services/PublikService/' assert ( resource.select_usager_by_ref_url == 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef' ) resource.webservice_base_url = 'http://localhost/' assert ( resource.select_usager_by_ref_url == 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef' ) def test_ws_categories(app, genesys, mock_codifications_ok): url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug) response = app.get(url) assert response.json['err'] == 0 assert response.json['data'] assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA') def test_ws_codifications(app, genesys, mock_codifications_ok): url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug) url += '/MOT_APA/' response = app.get(url) assert response.json['err'] == 0 assert response.json['data'] assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA') def test_ws_codifications_failure(app, genesys, mock_500): from django.core.cache import cache cache.clear() url = tests.utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug) url += '/MOT_APA/' response = app.get(url) assert response.json['err'] == 1 RESPONSE_UNKNOWN_LOGIN = ''' 6 Identifiant inconnu de Genesis ''' def test_ws_link_unknown_appairage(app, genesys): url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug) with tests.utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN): response = app.post( url + '?' + urlencode( {'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'} ) ) assert response.json['err'] == 1 assert response.json['data']['code'] == '6' assert response.json['data']['label'] RESPONSE_CREATED = ''' 789 5 Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet. ''' def test_ws_link_created(app, genesys): url = tests.utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug) assert Link.objects.count() == 0 with tests.utils.mock_url(FAKE_URL, RESPONSE_CREATED): response = app.post( url + '?' + urlencode( {'NameID': 'zob', 'login': '1234', 'password': 'xyz', 'email': 'john.doe@example.com'} ) ) link = Link.objects.latest('pk') assert response.json['err'] == 0 assert response.json['link_id'] == link.pk assert response.json['new'] assert Link.objects.filter(name_id='zob', id_per='789', resource=genesys).count() == 1 with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER): dossiers_url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug) response = app.get( dossiers_url + '?' + urlencode( { 'NameID': 'zob', } ) ) url = tests.utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug) response = app.post( url + '?' + urlencode( { 'NameID': 'zob', 'link_id': response.json['data'][0]['id'], } ) ) assert response.json['err'] == 0 assert response.json['deleted'] == 1 assert Link.objects.count() == 0 with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd: RESPONSE_SELECT_USAGER = fd.read() def test_ws_dossiers(app, genesys): link = Link.objects.create(resource=genesys, name_id='zob', id_per='1234') url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug) with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER): response = app.get( url + '?' + urlencode( { 'NameID': 'zob', } ) ) assert response.json['err'] == 0 assert response.json['data'] assert len(response.json['data']) == 1 assert response.json['data'][0]['id_per'] == '1234' assert response.json['data'][0]['dossier'] assert response.json['data'][0]['id'] == str(link.id) assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame' assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1 assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1 assert len(response.json['data'][0]['dossier']['DROITS']) == 1 assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1 link2 = Link.objects.create(resource=genesys, name_id='zob', id_per='4567') with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER): response = app.get( url + '?' + urlencode( { 'NameID': 'zob', } ) ) assert response.json['err'] == 0 assert response.json['data'] assert len(response.json['data']) == 2 assert response.json['data'][0]['id_per'] == '1234' assert response.json['data'][0]['dossier'] assert response.json['data'][0]['id'] == str(link.id) assert response.json['data'][0]['text'] == '%s - John DOE' % link.id_per assert response.json['data'][1]['id_per'] == '4567' assert response.json['data'][1]['dossier'] assert response.json['data'][1]['id'] == str(link2.id) assert response.json['data'][1]['text'] == '%s - John DOE' % link2.id_per with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER): response = app.get( url + '?' + urlencode( { 'NameID': 'zob', 'link_id': link2.id, } ) ) assert response.json['err'] == 0 assert response.json['data'] assert response.json['data']['id_per'] == '4567' assert response.json['data']['dossier'] assert response.json['data']['id'] == str(link2.id) assert response.json['data']['text'] == '%s - John DOE' % link2.id_per def test_ws_dossiers_timeout_error(app, genesys): Link.objects.create(resource=genesys, name_id='zob', id_per='1234') url = tests.utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug) with mock.patch('passerelle.utils.Request.request') as request: request.side_effect = requests.Timeout response = app.get( url + '?' + urlencode( { 'NameID': 'zob', } ) ) assert response.json['err_desc'] == 'genesys is down' def test_row_locked_cache(genesys, freezer): import time from passerelle.apps.atos_genesys.utils import RowLockedCache freezer.move_to('2018-01-01 00:00:00') link = Link.objects.create(resource=genesys, name_id='zob', id_per='4567') class F: calls = 0 value = 1 def __call__(self): time.sleep(0.05) self.calls += 1 return self.value f = F() # Check that cache works, f() is called only one time during the cache duration (60 seconds) rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache') value = rlc() assert value == 1 assert f.calls == 1 freezer.move_to('2018-01-01 00:00:59') for i in range(5): assert rlc() == 1 assert f.calls == 1 # Check that with cache update f() is called only once again freezer.move_to('2018-01-01 00:02:00') F.value = 2 counter = 0 while rlc() == 1 or counter < 5: counter += 1 assert rlc() == 2 assert f.calls == 2 RESPONSE_SEARCH = ''' John Doe 01/01/1925 951858 ANTIBES (006) FRANCE 1234 ''' RESPONSE_SEARCH_TOO_MANY = ''' John Doe 01/01/1925 951858 ANTIBES (006) FRANCE 1234 Johnny Doe 01/01/1925 951858 ANTIBES (006) FRANCE 1234 ''' RESPONSE_SELECT_USAGER_NO_CONTACTS = ''' 1234 ''' def test_ws_search(app, genesys): url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug) with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH ): with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER ): response = app.get( url + '?' + urlencode( { 'first_name': 'John', 'last_name': 'Doe', 'date_of_birth': '1925-01-01', 'commune_naissance': 'NïCe', } ) ) assert response.json['err'] == 0 assert response.json['already_paired'] is False assert response.json['link_id'] is None assert len(response.json['data']) == 3 data = response.json['data'] assert data == [ { 'id': 'tel1', 'id_per': '1234', 'nom': 'DOE', 'nom_naissance': 'TEST', 'phone': '0655555555', 'prenom': 'John', 'text': 'par SMS vers 06*****555', }, { 'id': 'tel2', 'id_per': '1234', 'nom': 'DOE', 'nom_naissance': 'TEST', 'phone': '0644444444', 'prenom': 'John', 'text': 'par SMS vers 06*****444', }, { 'email': 'test@sirus.fr', 'id': 'email1', 'id_per': '1234', 'nom': 'DOE', 'nom_naissance': 'TEST', 'prenom': 'John', 'text': 'par courriel vers te***@***.fr', }, ] with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH ): with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER ): response = app.get( url + '?' + urlencode( { 'first_name': 'John', 'last_name': 'Doe', 'date_of_birth': '1925-01-01', 'commune_naissance': 'Cassis', } ) ) assert response.json['err'] == 1 assert response.json['err_desc'] == 'not-found' with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH ): with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER ): response = app.get( url + '?' + urlencode( { 'first_name': 'John', 'last_name': 'Doe', 'date_of_birth': '1925-01-02', } ) ) assert response.json['err'] == 1 assert response.json['err_desc'] == 'not-found' with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH ): with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER_NO_CONTACTS, ): response = app.get( url + '?' + urlencode( { 'first_name': 'John', 'last_name': 'Doe', 'date_of_birth': '1925-01-01', } ) ) assert response.json['err'] == 1 assert response.json['err_desc'] == 'no-contacts' def test_ws_link_by_id_per(app, genesys): url = tests.utils.generic_endpoint_url('atos-genesys', 'link-by-id-per', slug=genesys.slug) assert Link.objects.count() == 0 with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER ): response = app.post( url + '?' + urlencode( { 'NameID': 'zob', 'id_per': '1234', } ) ) assert response.json['err'] == 0 assert Link.objects.count() == 1 link = Link.objects.get() data = response.json assert data['new'] assert data['link_id'] == link.pk url = tests.utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug) with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH ): with tests.utils.mock_url( FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager', RESPONSE_SELECT_USAGER ): response = app.get( url + '?' + urlencode( { 'first_name': 'John', 'last_name': 'Doe', 'date_of_birth': '1925-01-01', 'NameID': 'zob', } ) ) assert response.json['err'] == 0 assert response.json['already_paired'] is True assert response.json['link_id'] == link.id assert len(response.json['data']) == 3 def test_ws_dossier_by_pair(app, genesys): url = '/atos-genesys/test1/dossier-by-pair/' with tests.utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER) as mock: id_per = 57879 p2 = random.randint(100, 999) p1 = id_per * p2 response = app.get(url + f'?p1={p1}&p2={p2}') assert response.json == { 'data': { 'dossier': { 'DEMANDES': { 'AD': [ { 'CATEGORIE': 'Décision', 'CD_CODMES_MES': 'ADAPD ', 'CODE_CATEGORIE': 'DEC_APA', 'CODE_ETAPE': 'DEC_APA01', 'COD_APPLI': 'AD', 'DT_DMD': '08/12/2016', 'DT_DMD_ORDER': '2016-12-08 ' '00:00:00.0', 'DT_FIN': '31/12/2019', 'DT_MINI_DMD': '01/01/2015', 'ETAPE': 'Décision', 'GROUPE': 'BANDEAU', 'ID_MES': '1234', 'LISTE_ETAPES': [ { 'A_SURLIGNER': 'N', 'CATEGORIE': 'Instruction ' 'administrative ' 'de ' 'la ' 'demande', 'CODE_CATEGORIE': 'INDMD', 'GROUPE': 'BANDEAU', 'ID_EGST_MES': '1234', 'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision', 'NUM_CAT': '1', 'REF_EXT_MES': '1234', }, { 'A_SURLIGNER': 'N', 'CATEGORIE': 'Évaluation ' 'médico-sociale ' 'de ' 'la ' 'demande', 'CODE_CATEGORIE': 'EVMSD', 'GROUPE': 'BANDEAU', 'ID_EGST_MES': '1234', 'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision', 'NUM_CAT': '2', 'REF_EXT_MES': '1234', }, { 'A_SURLIGNER': 'O', 'CATEGORIE': 'Décision', 'CATEGORIE_ETP': 'Décision', 'CD_ETP': 'DEC_APA01', 'CODE_CATEGORIE': 'DEC_APA', 'ETP': [ { 'CODE_CHAMP': 'LABEL1', 'ETAPE': 'Décision', 'LIBELLE_CHAMP': 'Votre ' 'demande ' 'a ' 'fait ' "l'objet " "d'une " 'décision ' 'qui ' 'vous ' 'sera ' 'adressée, ' 'prochainement, ' 'par ' 'voie ' 'postale.', 'NUM_CHAMP': '1', 'TYP_CHAMP': 'L', } ], 'GROUPE': 'BANDEAU', 'ID_EGST_MES': '1234', 'LB_ETP': 'Décision', 'LIBELLE_MES': 'A.P.A. ' 'à ' 'Domicile ' '- ' 'Révision', 'NUM_CAT': '3', 'REF_EXT_MES': '1234', }, ], 'REF_EXT_MES': '1234', 'TYPE_MESURE': 'A.P.A. à Domicile - ' 'Révision', } ] }, 'DEMARCHES': [{'COD_FORM': 'F_RNV_APA'}], 'DROITS': { 'PH': [ { 'COD_APPLI': 'PH', 'DATE_DEBUT': '01/01/2017', 'DATE_FIN': '31/12/2019', 'ID_PRE': '1234', 'LIBELLE': 'Un service prestataire ', 'PRESTATAIRE': 'CLUB AZUR SERVICES ', 'TYPPREST': 'Aide à domicile PA-PH', } ] }, 'IDENTIFICATION': [ { 'CAISSE_RETRAITE': 'CRAM CAGNES SUR ' 'MER', 'CD_CMU_NAIS': '12345', 'CD_COD_VOIE_ADR': 'BLD', 'CD_PAY_ADR': '100 ', 'CD_SIT_FAM': 'SITFAM_C', 'CIVILITE': 'Madame', 'CMU_NAIS': 'NICE', 'CODE_POSTAL': '06000', 'CODPOS_NAIS': '12345', 'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA', 'CONJOINT': [ { 'CAISSE_RETRAITE': 'CRAM ' 'CAGNES ' 'SUR ' 'MER', 'CD_COD_VOIE_ADR': 'BLD', 'CODE_POSTAL': '06000', 'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'ALSACE ', 'DATE_NAISSANCE': '01/01/1960', 'LB_COD_BTQ_ADR': 'Bis', 'LB_COD_VOIE_ADR': 'Boulevard', 'LB_VOIE_ADR': 'du ' 'Revestel', 'MAIL': 'test@sirus.fr', 'NATIONALITE': 'FRANCE', 'NOM': 'NOM conjoint', 'NOM_NAISSANCE': 'NOM ', 'NSS': '253077507300584', 'NUM_AFFILIATION': '123456789', 'NU_VOIE_ADR': '65', 'PRENOM': 'Prénom', 'TEL_FIXE': '06.61.75.69.51', 'TEL_MOBILE': '06.61.75.69.51', 'VILLE': 'NICE', } ], 'CONTACTS': [ { 'CD_COD_VOIE_ADR': 'BLD', 'CODE_POSTAL': '06000', 'COMPLEMENT_ADRESSE': 'RESIDENCE ' 'BLABLA', 'LB_COD_BTQ_ADR': 'Bis', 'LB_COD_VOIE_ADR': 'Boulevard', 'LB_VOIE_ADR': 'GAMBETTA', 'MAIL': 'test@sirus.fr', 'NOM': 'NOM ', 'NU_VOIE_ADR': '65', 'PRENOM': 'Prénom', 'RELATION': 'TYP_CNT_AID', 'TEL_FIXE': '06.22.22.22.22', 'TEL_MOBILE': '06.11.11.11.11', 'VILLE': 'NICE', } ], 'DATE_NAISSANCE': '01/01/1923', 'DPT_NAIS': 'ALPES-MARITMES', 'IDDOS': '12345', 'ID_PER': '1234', 'LB_COD_BTQ_ADR': 'Bis', 'LB_COD_VOIE_ADR': 'Boulevard', 'LB_PAY_ADR': 'FRANCE', 'LB_VOIE_ADR': 'du Revestel', 'MAIL': 'test@sirus.fr', 'NATIONALITE': 'Française (Valeurs ' 'Possibles\xa0: ' 'Française, ' 'Européenne,Autre)', 'NOM': 'DOE', 'NOM_NAISSANCE': 'TEST', 'NSS': '253077507312383', 'NUM_AFFILIATION': '123456789', 'NU_VOIE_ADR': '65', 'PAYS_NAIS': 'FRANCE', 'PRENOM': 'John', 'REF_PH_DOS': '123456', 'REF_UTI_DOS': '123456', 'SEXE': 'F', 'SIT_FAM': 'Célibat', 'TEL_FIXE': '06.44.44.44.44', 'TEL_MOBILE': '06.55.55.55.55', 'VILLE': 'NICE', } ], }, 'id_per': 57879, }, 'err': 0, } assert f'idPer={id_per}&' in mock.handlers[0].call['requests'][0].url