passerelle/tests/test_atos_genesys.py

397 lines
14 KiB
Python

# -*- coding: utf-8 -*-
import os
import pytest
import utils
from django.utils.http import urlencode
from passerelle.apps.atos_genesys.models import Resource, Link
FAKE_URL = 'https://sirus.fr/'
@pytest.fixture
def genesys(db):
return utils.make_resource(
Resource,
title='Test 1',
slug='test1',
description='Connecteur de test',
webservice_base_url=FAKE_URL)
@pytest.fixture
def mock_codifications_ok():
response = open(os.path.join(
os.path.dirname(__file__),
'data',
'genesys_select_codifications.xml')).read()
with utils.mock_url(FAKE_URL, response) as mock:
yield mock
def test_base_url_normalization(db):
# db is necessary because Resource.__init__ set resource.logger which does DB queries :/
resource = Resource(title='t', slug='t', description='t')
resource.webservice_base_url = 'http://localhost/WSUsagerPublik/services/PublikService/'
assert (resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef')
resource.webservice_base_url = 'http://localhost/'
assert (resource.select_usager_by_ref_url
== 'http://localhost/WSUsagerPublik/services/PublikService/selectUsagerByRef')
def test_ws_categories(app, genesys, mock_codifications_ok):
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
def test_ws_codifications(app, genesys, mock_codifications_ok):
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
def test_ws_codifications_failure(app, genesys, mock_500):
from django.core.cache import cache
cache.clear()
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 1
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
<CD_RET>6</CD_RET>
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_unknown_appairage(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
with utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'login': '1234',
'password': 'xyz',
'email': 'john.doe@example.com'
}))
assert response.json['err'] == 1
assert response.json['data']['code'] == '6'
assert response.json['data']['label']
RESPONSE_CREATED = '''<return><ROWSET><ROW>
<ID_PER>789</ID_PER>
<CD_RET>5</CD_RET>
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_created(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
assert Link.objects.count() == 0
with utils.mock_url(FAKE_URL, RESPONSE_CREATED):
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'login': '1234',
'password': 'xyz',
'email': 'john.doe@example.com'
}))
link = Link.objects.latest('pk')
assert response.json['err'] == 0
assert response.json['link_id'] == link.pk
assert response.json['new']
assert Link.objects.filter(
name_id='zob', id_per='789', resource=genesys).count() == 1
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
dossiers_url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
response = app.get(dossiers_url + '?' + urlencode({
'NameID': 'zob',
}))
url = utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'link_id': response.json['data'][0]['id'],
}))
assert response.json['err'] == 0
assert response.json['deleted'] == 1
assert Link.objects.count() == 0
RESPONSE_SELECT_USAGER = open(os.path.join(
os.path.dirname(__file__),
'data',
'genesys_select_usager.xml')).read()
def test_ws_dossiers(app, genesys):
link = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='1234')
url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 1
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
link2 = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='4567')
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 2
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
assert response.json['data'][1]['id_per'] == '4567'
assert response.json['data'][1]['dossier']
assert response.json['data'][1]['id'] == str(link2.id)
assert response.json['data'][1]['text'] == u'%s - John DOE' % link2.id_per
with utils.mock_url(FAKE_URL, RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
'link_id': link2.id,
}))
assert response.json['err'] == 0
assert response.json['data']
assert response.json['data']['id_per'] == '4567'
assert response.json['data']['dossier']
assert response.json['data']['id'] == str(link2.id)
assert response.json['data']['text'] == u'%s - John DOE' % link2.id_per
def test_row_locked_cache(genesys, freezer):
import time
from passerelle.apps.atos_genesys.utils import RowLockedCache
freezer.move_to('2018-01-01 00:00:00')
link = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='4567')
class F(object):
calls = 0
value = 1
def __call__(self):
time.sleep(0.05)
self.calls += 1
return self.value
f = F()
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
value = rlc()
assert value == 1
assert f.calls == 1
freezer.move_to('2018-01-01 00:00:59')
for i in range(5):
assert rlc() == 1
assert f.calls == 1
# Check that with cache update f() is called only once again
freezer.move_to('2018-01-01 00:02:00')
F.value = 2
counter = 0
while rlc() == 1 or counter < 5:
counter += 1
assert rlc() == 2
assert f.calls == 2
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
<ROW num="1">
<NOMPER>John</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
<ROW num="2">
<NOMPER>Johnny</NOMPER>
<PRENOMPER>Doe</PRENOMPER>
<DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
<REF_PER>951858</REF_PER>
<LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
<PAY_NAIS>FRANCE</PAY_NAIS>
<ID_PER>1234</ID_PER>
</ROW>
</ROWSET>
</return>'''
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
<return><ROWSET>
<ROW num="1">
<IDENTIFICATION>
<IDENTIFICATION_ROW num="1">
<ID_PER>1234</ID_PER>
</IDENTIFICATION_ROW>
</IDENTIFICATION>
</ROW>
</ROWSET></return>'''
def test_ws_search(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'NïCe',
}))
assert response.json['err'] == 0
assert response.json['already_paired'] is False
assert response.json['link_id'] is None
assert len(response.json['data']) == 3
data = response.json['data']
assert data == [
{
u'id': u'tel1',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'phone': u'0655555555',
u'prenom': u'John',
u'text': u'par SMS vers 06*****555'},
{
u'id': u'tel2',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'phone': u'0644444444',
u'prenom': u'John',
u'text': u'par SMS vers 06*****444'
},
{
u'email': u'test@sirus.fr',
u'id': u'email1',
u'id_per': u'1234',
u'nom': u'DOE',
u'nom_naissance': u'TEST',
u'prenom': u'John',
u'text': u'par courriel vers te***@***.fr'
}
]
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'commune_naissance': 'Cassis',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-02',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'not-found'
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire',
RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER_NO_CONTACTS):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
}))
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no-contacts'
def test_ws_link_by_id_per(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'link-by-id-per', slug=genesys.slug)
assert Link.objects.count() == 0
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'id_per': '1234',
}))
assert response.json['err'] == 0
assert Link.objects.count() == 1
link = Link.objects.get()
data = response.json
assert data['new']
assert data['link_id'] == link.pk
url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
RESPONSE_SELECT_USAGER):
response = app.get(url + '?' + urlencode({
'first_name': 'John',
'last_name': 'Doe',
'date_of_birth': '1925-01-01',
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['already_paired'] is True
assert response.json['link_id'] == link.id
assert len(response.json['data']) == 3