passerelle/tests/test_atos_genesys.py

226 lines
7.5 KiB
Python

# -*- coding: utf-8 -*-
import os
import pytest
import utils
from django.utils.http import urlencode
from passerelle.apps.atos_genesys.models import Resource, Link
FAKE_URL = 'https://sirus.fr/genesys/'
@pytest.fixture
def genesys(db):
return utils.make_resource(
Resource,
title='Test 1',
slug='test1',
description='Connecteur de test',
webservice_base_url=FAKE_URL)
@pytest.fixture
def mock_codifications_ok():
response = open(os.path.join(
os.path.dirname(__file__),
'data',
'genesys_select_codifications.xml')).read()
with utils.mock_url(FAKE_URL + 'selectCodifications', response) as mock:
yield mock
def test_ws_categories(app, genesys, mock_codifications_ok):
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
def test_ws_codifications(app, genesys, mock_codifications_ok):
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data']
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
def test_ws_codifications_failure(app, genesys, mock_500):
from django.core.cache import cache
cache.clear()
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
url += '/MOT_APA/'
response = app.get(url)
assert response.json['err'] == 1
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
<CD_RET>6</CD_RET>
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_unknown_appairage(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
with utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'login': '1234',
'password': 'xyz',
'email': 'john.doe@example.com'
}))
assert response.json['err'] == 1
assert response.json['data']['code'] == '6'
assert response.json['data']['label']
RESPONSE_CREATED = '''<return><ROWSET><ROW>
<ID_PER>789</ID_PER>
<CD_RET>5</CD_RET>
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
</ROW> </ROWSET></return>'''
def test_ws_link_created(app, genesys):
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
assert Link.objects.count() == 0
with utils.mock_url(FAKE_URL, RESPONSE_CREATED):
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'login': '1234',
'password': 'xyz',
'email': 'john.doe@example.com'
}))
assert response.json['err'] == 0
assert response.json['link_id'] == 1
assert response.json['new']
assert Link.objects.filter(
name_id='zob', id_per='789', resource=genesys).count() == 1
xml_response = open(os.path.join(
os.path.dirname(__file__),
'data',
'genesys_select_usager.xml')).read()
with utils.mock_url(FAKE_URL, xml_response):
dossiers_url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
response = app.get(dossiers_url + '?' + urlencode({
'NameID': 'zob',
}))
url = utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
response = app.post(url + '?' + urlencode({
'NameID': 'zob',
'link_id': response.json['data'][0]['id'],
}))
assert response.json['err'] == 0
assert response.json['deleted'] == 1
assert Link.objects.count() == 0
def test_ws_dossiers(app, genesys):
link = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='1234')
xml_response = open(os.path.join(
os.path.dirname(__file__),
'data',
'genesys_select_usager.xml')).read()
url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
with utils.mock_url(FAKE_URL, xml_response):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 1
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
link2 = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='4567')
with utils.mock_url(FAKE_URL, xml_response):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
}))
assert response.json['err'] == 0
assert response.json['data']
assert len(response.json['data']) == 2
assert response.json['data'][0]['id_per'] == '1234'
assert response.json['data'][0]['dossier']
assert response.json['data'][0]['id'] == str(link.id)
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
assert response.json['data'][1]['id_per'] == '4567'
assert response.json['data'][1]['dossier']
assert response.json['data'][1]['id'] == str(link2.id)
assert response.json['data'][1]['text'] == u'%s - John DOE' % link2.id_per
with utils.mock_url(FAKE_URL, xml_response):
response = app.get(url + '?' + urlencode({
'NameID': 'zob',
'link_id': link2.id,
}))
assert response.json['err'] == 0
assert response.json['data']
assert response.json['data']['id_per'] == '4567'
assert response.json['data']['dossier']
assert response.json['data']['id'] == str(link2.id)
assert response.json['data']['text'] == u'%s - John DOE' % link2.id_per
def test_row_locked_cache(genesys, freezer):
import time
from passerelle.apps.atos_genesys.utils import RowLockedCache
freezer.move_to('2018-01-01 00:00:00')
link = Link.objects.create(
resource=genesys,
name_id='zob',
id_per='4567')
class F(object):
calls = 0
value = 1
def __call__(self):
time.sleep(0.05)
self.calls += 1
return self.value
f = F()
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
value = rlc()
assert value == 1
assert f.calls == 1
freezer.move_to('2018-01-01 00:00:59')
for i in range(5):
assert rlc() == 1
assert f.calls == 1
# Check that with cache update f() is called only once again
freezer.move_to('2018-01-01 00:02:00')
F.value = 2
counter = 0
while rlc() == 1 or counter < 5:
counter += 1
assert rlc() == 2
assert f.calls == 2