397 lines
14 KiB
Python
397 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import pytest
|
|
|
|
import utils
|
|
|
|
from django.utils.http import urlencode
|
|
|
|
from passerelle.apps.atos_genesys.models import Resource, Link
|
|
|
|
FAKE_URL = 'https://sirus.fr/'
|
|
|
|
|
|
@pytest.fixture
|
|
def genesys(db):
|
|
return utils.make_resource(
|
|
Resource,
|
|
title='Test 1',
|
|
slug='test1',
|
|
description='Connecteur de test',
|
|
webservice_base_url=FAKE_URL)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_codifications_ok():
|
|
response = open(os.path.join(
|
|
os.path.dirname(__file__),
|
|
'data',
|
|
'genesys_select_codifications.xml')).read()
|
|
with utils.mock_url(FAKE_URL, response) as mock:
|
|
yield mock
|
|
|
|
|
|
def test_base_url_normalization(db):
|
|
# db is necessary because Resource.__init__ set resource.logger which does DB queries :/
|
|
resource = Resource(title='t', slug='t', description='t')
|
|
resource.webservice_base_url = 'http://localhost/WSUsagerPublik/services/PublikService/'
|
|
assert (resource.select_usager_by_ref_url
|
|
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef')
|
|
resource.webservice_base_url = 'http://localhost/'
|
|
assert (resource.select_usager_by_ref_url
|
|
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef')
|
|
|
|
|
|
def test_ws_categories(app, genesys, mock_codifications_ok):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
|
|
|
|
|
|
def test_ws_codifications(app, genesys, mock_codifications_ok):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
url += '/MOT_APA/'
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
|
|
|
|
|
|
def test_ws_codifications_failure(app, genesys, mock_500):
|
|
from django.core.cache import cache
|
|
cache.clear()
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
url += '/MOT_APA/'
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
|
|
|
|
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
|
|
<CD_RET>6</CD_RET>
|
|
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
|
|
</ROW> </ROWSET></return>'''
|
|
|
|
|
|
def test_ws_link_unknown_appairage(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
|
|
with utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'login': '1234',
|
|
'password': 'xyz',
|
|
'email': 'john.doe@example.com'
|
|
}))
|
|
assert response.json['err'] == 1
|
|
assert response.json['data']['code'] == '6'
|
|
assert response.json['data']['label']
|
|
|
|
|
|
RESPONSE_CREATED = '''<return><ROWSET><ROW>
|
|
<ID_PER>789</ID_PER>
|
|
<CD_RET>5</CD_RET>
|
|
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
|
|
</ROW> </ROWSET></return>'''
|
|
|
|
|
|
def test_ws_link_created(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
|
|
assert Link.objects.count() == 0
|
|
with utils.mock_url(FAKE_URL, RESPONSE_CREATED):
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'login': '1234',
|
|
'password': 'xyz',
|
|
'email': 'john.doe@example.com'
|
|
}))
|
|
link = Link.objects.latest('pk')
|
|
assert response.json['err'] == 0
|
|
assert response.json['link_id'] == link.pk
|
|
assert response.json['new']
|
|
assert Link.objects.filter(
|
|
name_id='zob', id_per='789', resource=genesys).count() == 1
|
|
|
|
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
|
|
dossiers_url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
|
|
response = app.get(dossiers_url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'link_id': response.json['data'][0]['id'],
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['deleted'] == 1
|
|
assert Link.objects.count() == 0
|
|
|
|
RESPONSE_SELECT_USAGER = open(os.path.join(
|
|
os.path.dirname(__file__),
|
|
'data',
|
|
'genesys_select_usager.xml')).read()
|
|
|
|
|
|
def test_ws_dossiers(app, genesys):
|
|
link = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='1234')
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
|
|
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert len(response.json['data']) == 1
|
|
assert response.json['data'][0]['id_per'] == '1234'
|
|
assert response.json['data'][0]['dossier']
|
|
assert response.json['data'][0]['id'] == str(link.id)
|
|
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
|
|
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
|
|
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
|
|
|
|
link2 = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='4567')
|
|
|
|
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert len(response.json['data']) == 2
|
|
assert response.json['data'][0]['id_per'] == '1234'
|
|
assert response.json['data'][0]['dossier']
|
|
assert response.json['data'][0]['id'] == str(link.id)
|
|
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
|
|
assert response.json['data'][1]['id_per'] == '4567'
|
|
assert response.json['data'][1]['dossier']
|
|
assert response.json['data'][1]['id'] == str(link2.id)
|
|
assert response.json['data'][1]['text'] == u'%s - John DOE' % link2.id_per
|
|
|
|
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'link_id': link2.id,
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert response.json['data']['id_per'] == '4567'
|
|
assert response.json['data']['dossier']
|
|
assert response.json['data']['id'] == str(link2.id)
|
|
assert response.json['data']['text'] == u'%s - John DOE' % link2.id_per
|
|
|
|
|
|
def test_row_locked_cache(genesys, freezer):
|
|
import time
|
|
from passerelle.apps.atos_genesys.utils import RowLockedCache
|
|
|
|
freezer.move_to('2018-01-01 00:00:00')
|
|
link = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='4567')
|
|
|
|
class F(object):
|
|
calls = 0
|
|
value = 1
|
|
|
|
def __call__(self):
|
|
time.sleep(0.05)
|
|
self.calls += 1
|
|
return self.value
|
|
f = F()
|
|
|
|
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
|
|
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
|
|
value = rlc()
|
|
assert value == 1
|
|
assert f.calls == 1
|
|
freezer.move_to('2018-01-01 00:00:59')
|
|
for i in range(5):
|
|
assert rlc() == 1
|
|
assert f.calls == 1
|
|
|
|
# Check that with cache update f() is called only once again
|
|
freezer.move_to('2018-01-01 00:02:00')
|
|
F.value = 2
|
|
counter = 0
|
|
while rlc() == 1 or counter < 5:
|
|
counter += 1
|
|
assert rlc() == 2
|
|
assert f.calls == 2
|
|
|
|
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
|
|
<ROW num="1">
|
|
<NOMPER>John</NOMPER>
|
|
<PRENOMPER>Doe</PRENOMPER>
|
|
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
|
|
<REF_PER>951858</REF_PER>
|
|
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
|
|
<PAY_NAIS>FRANCE</PAY_NAIS>
|
|
<ID_PER>1234</ID_PER>
|
|
</ROW>
|
|
</ROWSET>
|
|
</return>'''
|
|
|
|
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
|
|
<ROW num="1">
|
|
<NOMPER>John</NOMPER>
|
|
<PRENOMPER>Doe</PRENOMPER>
|
|
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
|
|
<REF_PER>951858</REF_PER>
|
|
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
|
|
<PAY_NAIS>FRANCE</PAY_NAIS>
|
|
<ID_PER>1234</ID_PER>
|
|
</ROW>
|
|
<ROW num="2">
|
|
<NOMPER>Johnny</NOMPER>
|
|
<PRENOMPER>Doe</PRENOMPER>
|
|
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
|
|
<REF_PER>951858</REF_PER>
|
|
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
|
|
<PAY_NAIS>FRANCE</PAY_NAIS>
|
|
<ID_PER>1234</ID_PER>
|
|
</ROW>
|
|
</ROWSET>
|
|
</return>'''
|
|
|
|
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
|
|
<return><ROWSET>
|
|
<ROW num="1">
|
|
<IDENTIFICATION>
|
|
<IDENTIFICATION_ROW num="1">
|
|
<ID_PER>1234</ID_PER>
|
|
</IDENTIFICATION_ROW>
|
|
</IDENTIFICATION>
|
|
</ROW>
|
|
</ROWSET></return>'''
|
|
|
|
|
|
def test_ws_search(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
|
|
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': '1925-01-01',
|
|
'commune_naissance': 'NïCe',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['already_paired'] is False
|
|
assert response.json['link_id'] is None
|
|
assert len(response.json['data']) == 3
|
|
data = response.json['data']
|
|
assert data == [
|
|
{
|
|
u'id': u'tel1',
|
|
u'id_per': u'1234',
|
|
u'nom': u'DOE',
|
|
u'nom_naissance': u'TEST',
|
|
u'phone': u'0655555555',
|
|
u'prenom': u'John',
|
|
u'text': u'par SMS vers 06*****555'},
|
|
{
|
|
u'id': u'tel2',
|
|
u'id_per': u'1234',
|
|
u'nom': u'DOE',
|
|
u'nom_naissance': u'TEST',
|
|
u'phone': u'0644444444',
|
|
u'prenom': u'John',
|
|
u'text': u'par SMS vers 06*****444'
|
|
},
|
|
{
|
|
u'email': u'test@sirus.fr',
|
|
u'id': u'email1',
|
|
u'id_per': u'1234',
|
|
u'nom': u'DOE',
|
|
u'nom_naissance': u'TEST',
|
|
u'prenom': u'John',
|
|
u'text': u'par courriel vers te***@***.fr'
|
|
}
|
|
]
|
|
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': '1925-01-01',
|
|
'commune_naissance': 'Cassis',
|
|
}))
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'not-found'
|
|
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': '1925-01-02',
|
|
}))
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'not-found'
|
|
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire',
|
|
RESPONSE_SEARCH):
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER_NO_CONTACTS):
|
|
response = app.get(url + '?' + urlencode({
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': '1925-01-01',
|
|
}))
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'no-contacts'
|
|
|
|
|
|
def test_ws_link_by_id_per(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'link-by-id-per', slug=genesys.slug)
|
|
|
|
assert Link.objects.count() == 0
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER):
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'id_per': '1234',
|
|
}))
|
|
|
|
assert response.json['err'] == 0
|
|
assert Link.objects.count() == 1
|
|
link = Link.objects.get()
|
|
data = response.json
|
|
assert data['new']
|
|
assert data['link_id'] == link.pk
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
|
|
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
|
|
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
|
|
RESPONSE_SELECT_USAGER):
|
|
response = app.get(url + '?' + urlencode({
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'date_of_birth': '1925-01-01',
|
|
'NameID': 'zob',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['already_paired'] is True
|
|
assert response.json['link_id'] == link.id
|
|
assert len(response.json['data']) == 3
|