226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import pytest
|
|
|
|
import utils
|
|
|
|
from django.utils.http import urlencode
|
|
|
|
from passerelle.apps.atos_genesys.models import Resource, Link
|
|
|
|
FAKE_URL = 'https://sirus.fr/genesys/'
|
|
|
|
|
|
@pytest.fixture
|
|
def genesys(db):
|
|
return utils.make_resource(
|
|
Resource,
|
|
title='Test 1',
|
|
slug='test1',
|
|
description='Connecteur de test',
|
|
webservice_base_url=FAKE_URL)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_codifications_ok():
|
|
response = open(os.path.join(
|
|
os.path.dirname(__file__),
|
|
'data',
|
|
'genesys_select_codifications.xml')).read()
|
|
with utils.mock_url(FAKE_URL + 'selectCodifications', response) as mock:
|
|
yield mock
|
|
|
|
|
|
def test_ws_categories(app, genesys, mock_codifications_ok):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert any(x for x in response.json['data'] if x['id'] == 'MOT_APA')
|
|
|
|
|
|
def test_ws_codifications(app, genesys, mock_codifications_ok):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
url += '/MOT_APA/'
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert any(x for x in response.json['data'] if x['id'] == 'AGG_APA')
|
|
|
|
|
|
def test_ws_codifications_failure(app, genesys, mock_500):
|
|
from django.core.cache import cache
|
|
cache.clear()
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'codifications', slug=genesys.slug)
|
|
url += '/MOT_APA/'
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
|
|
|
|
RESPONSE_UNKNOWN_LOGIN = '''<return><ROWSET><ROW>
|
|
<CD_RET>6</CD_RET>
|
|
<LB_RET>Identifiant inconnu de Genesis</LB_RET>
|
|
</ROW> </ROWSET></return>'''
|
|
|
|
|
|
def test_ws_link_unknown_appairage(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
|
|
with utils.mock_url(FAKE_URL, RESPONSE_UNKNOWN_LOGIN):
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'login': '1234',
|
|
'password': 'xyz',
|
|
'email': 'john.doe@example.com'
|
|
}))
|
|
assert response.json['err'] == 1
|
|
assert response.json['data']['code'] == '6'
|
|
assert response.json['data']['label']
|
|
|
|
|
|
RESPONSE_CREATED = '''<return><ROWSET><ROW>
|
|
<ID_PER>789</ID_PER>
|
|
<CD_RET>5</CD_RET>
|
|
<LB_RET>Identifiant et mot de passe Genesis corrects. Le compte de l'usager vient d'être créé dans l'Extranet.</LB_RET>
|
|
</ROW> </ROWSET></return>'''
|
|
|
|
|
|
def test_ws_link_created(app, genesys):
|
|
url = utils.generic_endpoint_url('atos-genesys', 'link', slug=genesys.slug)
|
|
assert Link.objects.count() == 0
|
|
with utils.mock_url(FAKE_URL, RESPONSE_CREATED):
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'login': '1234',
|
|
'password': 'xyz',
|
|
'email': 'john.doe@example.com'
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['link_id'] == 1
|
|
assert response.json['new']
|
|
assert Link.objects.filter(
|
|
name_id='zob', id_per='789', resource=genesys).count() == 1
|
|
|
|
xml_response = open(os.path.join(
|
|
os.path.dirname(__file__),
|
|
'data',
|
|
'genesys_select_usager.xml')).read()
|
|
with utils.mock_url(FAKE_URL, xml_response):
|
|
dossiers_url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
|
|
response = app.get(dossiers_url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'unlink', slug=genesys.slug)
|
|
response = app.post(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'link_id': response.json['data'][0]['id'],
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['deleted'] == 1
|
|
assert Link.objects.count() == 0
|
|
|
|
|
|
def test_ws_dossiers(app, genesys):
|
|
link = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='1234')
|
|
|
|
xml_response = open(os.path.join(
|
|
os.path.dirname(__file__),
|
|
'data',
|
|
'genesys_select_usager.xml')).read()
|
|
|
|
url = utils.generic_endpoint_url('atos-genesys', 'dossiers', slug=genesys.slug)
|
|
with utils.mock_url(FAKE_URL, xml_response):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert len(response.json['data']) == 1
|
|
assert response.json['data'][0]['id_per'] == '1234'
|
|
assert response.json['data'][0]['dossier']
|
|
assert response.json['data'][0]['id'] == str(link.id)
|
|
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
|
|
assert response.json['data'][0]['dossier']['IDENTIFICATION'][0]['CIVILITE'] == 'Madame'
|
|
assert len(response.json['data'][0]['dossier']['DEMANDES']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DEMANDES']['AD']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DROITS']) == 1
|
|
assert len(response.json['data'][0]['dossier']['DROITS']['PH']) == 1
|
|
|
|
link2 = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='4567')
|
|
|
|
with utils.mock_url(FAKE_URL, xml_response):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert len(response.json['data']) == 2
|
|
assert response.json['data'][0]['id_per'] == '1234'
|
|
assert response.json['data'][0]['dossier']
|
|
assert response.json['data'][0]['id'] == str(link.id)
|
|
assert response.json['data'][0]['text'] == u'%s - John DOE' % link.id_per
|
|
assert response.json['data'][1]['id_per'] == '4567'
|
|
assert response.json['data'][1]['dossier']
|
|
assert response.json['data'][1]['id'] == str(link2.id)
|
|
assert response.json['data'][1]['text'] == u'%s - John DOE' % link2.id_per
|
|
|
|
with utils.mock_url(FAKE_URL, xml_response):
|
|
response = app.get(url + '?' + urlencode({
|
|
'NameID': 'zob',
|
|
'link_id': link2.id,
|
|
}))
|
|
assert response.json['err'] == 0
|
|
assert response.json['data']
|
|
assert response.json['data']['id_per'] == '4567'
|
|
assert response.json['data']['dossier']
|
|
assert response.json['data']['id'] == str(link2.id)
|
|
assert response.json['data']['text'] == u'%s - John DOE' % link2.id_per
|
|
|
|
|
|
def test_row_locked_cache(genesys, freezer):
|
|
import time
|
|
from passerelle.apps.atos_genesys.utils import RowLockedCache
|
|
|
|
freezer.move_to('2018-01-01 00:00:00')
|
|
link = Link.objects.create(
|
|
resource=genesys,
|
|
name_id='zob',
|
|
id_per='4567')
|
|
|
|
class F(object):
|
|
calls = 0
|
|
value = 1
|
|
|
|
def __call__(self):
|
|
time.sleep(0.05)
|
|
self.calls += 1
|
|
return self.value
|
|
f = F()
|
|
|
|
# Check that cache works, f() is called only one time during the cache duration (60 seconds)
|
|
rlc = RowLockedCache(duration=60, row=link, function=f, key_prefix='cache')
|
|
value = rlc()
|
|
assert value == 1
|
|
assert f.calls == 1
|
|
freezer.move_to('2018-01-01 00:00:59')
|
|
for i in range(5):
|
|
assert rlc() == 1
|
|
assert f.calls == 1
|
|
|
|
# Check that with cache update f() is called only once again
|
|
freezer.move_to('2018-01-01 00:02:00')
|
|
F.value = 2
|
|
counter = 0
|
|
while rlc() == 1 or counter < 5:
|
|
counter += 1
|
|
assert rlc() == 2
|
|
assert f.calls == 2
|