zoo/tests/test_nanterre_fragments.py

309 lines
11 KiB
Python

# -*- coding: utf-8 -*-
import json
import httmock
from django.urls import reverse
from zoo.zoo_nanterre.fragments import Synchronization
from zoo.models import Job
def test_synchro(nanterre_classic_family):
f = nanterre_classic_family
sync = Synchronization.create('technocarte', [
f['marie'], f['kevin']])
old_message = sync.message()
new_sync = Synchronization.from_json(sync.to_json())
new_message = new_sync.message()
old_message['metadonnees']['date-soumission'] = None
new_message['metadonnees']['date-soumission'] = None
assert old_message == new_message
assert new_sync.to_json() == sync.to_json()
def test_synchro_full(app, nanterre_classic_family):
f = nanterre_classic_family
request_bodies = []
@httmock.urlmatch()
def technocarte_ok(url, request):
request_bodies.append(json.loads(request.body))
return httmock.response(
200, [
{
'id-fragment': 1,
'id-metier': '1'
},
{
'id-fragment': 2,
'id-metier': '2',
},
{
'id-fragment': 3,
'id-metier': '3',
},
{
'id-fragment': 4,
'id-metier': '4',
},
],
{
'Content-Type': 'application/json',
})
# création des fédérations dans technocarte pour toute la famille
names = ['jean', 'marie', 'kevin', 'lilou']
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-synchronization'),
params={
'applications': ['technocarte'],
'individus': [
f[name].id for name in names
]
})
assert response.json['err'] == 0
assert Job.objects.count() == 1
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 1
for name in names:
ind = f[name]
ind.refresh_from_db()
assert 'technocarte' in ind.content['cles_de_federation']
assert request_bodies[-1]['metadonnees']['service'] == 'synchronisation'
assert (request_bodies[0]['fragments'][0]['fragment']['telephones'][0]['type'] ==
'professionnel')
assert (request_bodies[0]['fragments'][0]['fragment']['telephones'][0]['poste'] is None)
assert (request_bodies[0]['fragments'][0]['fragment']['telephones'][0]['numero'] ==
'0101010908')
# séparation des parents
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-separation', kwargs={'identifier': f['jean'].id}),
params={
})
assert response.json['err'] == 0
assert Job.objects.count() == 2
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 2
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 1
assert request_bodies[-1]['metadonnees']['service'] == 'declaration-separation'
# remariage des parents
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-declaration-union'),
params={
'individu_id_1': f['jean'].id,
'individu_id_2': f['marie'].id,
'adresse_commune': 1,
'statut': 'unionlibre',
})
assert response.json['err'] == 0
assert Job.objects.count() == 3
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 3
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 5
assert request_bodies[-1]['metadonnees']['service'] == 'declaration-union'
# déclaration de non resposanbilité sur un enfant
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-suppression-lien-de-responsabilite',
kwargs={
'identifier': f['jean'].id,
'identifier_enfant': f['kevin'].id
}),
params={
})
assert response.json['err'] == 0
assert Job.objects.count() == 4
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 4
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 2
assert request_bodies[-1]['metadonnees']['service'] == 'suppression-responsabilite-enfant'
# déclaration de non responsabilité sur un enfant
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-declaration-responsabilite-legale',
kwargs={
'identifier': f['jean'].id,
}),
params={
'enfant_id': f['kevin'].id,
'statut': 'parent',
})
assert response.json['err'] == 0
assert Job.objects.count() == 5
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 5
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 2
assert (request_bodies[-1]['metadonnees']['service'] ==
'declaration-responsabilite-legale-enfant')
# mise à jour données personnelles
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-reseau',
kwargs={
'identifier': f['jean'].id,
}),
params={
'date_de_naissance': '1980-09-18',
'email': 'jeannot@example.com',
})
assert response.json['err'] == 0
assert Job.objects.count() == 7
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 7
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 1
assert request_bodies[-2]['metadonnees']['service'] == 'mise-a-jour-identite'
assert request_bodies[-1]['metadonnees']['service'] == 'mise-a-jour-informations-contact'
# changement d'adresse
with httmock.HTTMock(technocarte_ok):
response = app.post_json(
reverse('rsu-api-reseau',
kwargs={
'identifier': f['jean'].id,
}),
params={
'adresse': {
'at': '',
'streetnumber': '124',
'streetnumberext': '',
'streetname': 'ALLÉE DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert response.json['err'] == 0
assert Job.objects.count() == 8
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 8
job = Job.objects.latest('created')
assert len(job.content['fragments']) == 4
id_techno_enfants = set([f[name].content['cles_de_federation']['technocarte'] for name in
['kevin', 'lilou']])
assert (set([fragment['fragment']['id-metier'] for fragment in job.content['fragments'][2:]])
== id_techno_enfants)
assert request_bodies[-1]['metadonnees']['service'] == 'signalement-changement-adresse'
def test_synchro_debug(app, nanterre_classic_family, settings):
f = nanterre_classic_family
settings.ZOO_NANTERRE_RSU_WS_DEBUG = True
# création des fédérations dans technocarte pour toute la famille
names = ['jean', 'marie', 'kevin', 'lilou']
response = app.post_json(
reverse('rsu-api-synchronization'),
params={
'applications': ['technocarte'],
'individus': [
f[name].id for name in names
]
})
assert response.json['err'] == 0
assert Job.objects.count() == 1
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 1
for name in names:
ind = f[name]
ind.refresh_from_db()
assert 'technocarte' in ind.content['cles_de_federation']
def test_infor(app, nanterre_classic_family):
f = nanterre_classic_family
request_bodies = []
@httmock.urlmatch()
def infor_ok(url, request):
request_bodies.append(json.loads(request.body))
return httmock.response(
200, {
'http_code': 200,
'response': [
{
'id-fragment': 1,
'id-metier': '1'
},
{
'id-fragment': 2,
'id-metier': '2',
},
{
'id-fragment': 3,
'id-metier': '3',
},
{
'id-fragment': 4,
'id-metier': '4',
},
]
},
{
'Content-Type': 'application/json',
})
# création des fédérations dans technocarte pour toute la famille
names = ['jean', 'marie', 'kevin', 'lilou']
with httmock.HTTMock(infor_ok):
response = app.post_json(
reverse('rsu-api-synchronization'),
params={
'applications': ['technocarte'],
'individus': [
f[name].id for name in names
]
})
assert response.json['err'] == 0
assert Job.objects.count() == 1
assert Job.objects.filter(state=Job.STATE_SUCCESS).count() == 1
for name in names:
ind = f[name]
ind.refresh_from_db()
assert 'technocarte' in ind.content['cles_de_federation']
assert request_bodies[-1]['metadonnees']['service'] == 'synchronisation'
@httmock.urlmatch()
def infor_nok(url, request):
request_bodies.append(json.loads(request.body))
return httmock.response(
200, {
'http_code': 500,
'response': {
'technique': 'mouai',
'metier': 'coin',
}
},
{
'Content-Type': 'application/json',
})
# création des fédérations dans technocarte pour toute la famille
Job.objects.all().delete()
names = ['jean', 'marie', 'kevin', 'lilou']
with httmock.HTTMock(infor_nok):
response = app.post_json(
reverse('rsu-api-synchronization'),
params={
'applications': ['technocarte'],
'individus': [
f[name].id for name in names
]
})
assert response.json['err'] == 1
assert response.json['errors']
assert Job.objects.count() == 1
assert Job.objects.filter(state=Job.STATE_UNRECOVERABLE_ERROR).count() == 1