nanterre: sécurise les créations initiales et en rejeu des fédérations (fixes #18481)

- ne pas les rejouer
- locker les individus pour lesquels on va potentiellement créer une fédération
- en cas de rejeu forcé et d'échec on nettoye la fédération
- on sauvegarde les anciennes clés en cas d'écrasement dans une
  liste 'anciennes_clés_de_fédération' composée de tuples:

	[nom de l'application, date de l'écrasement, valeur de l'ancienne clé]
This commit is contained in:
Benjamin Dauvergne 2017-09-06 02:07:57 +02:00
parent 392629ea2c
commit 5732cc61c4
5 changed files with 118 additions and 7 deletions

View File

@ -4,6 +4,7 @@ import pytest
import datetime
import isodate
import requests
import threading
import httmock
@ -58,7 +59,7 @@ def test_person_search_api(app, db, rsu):
assert (now() - isodate.parse_datetime(individu['date_de_creation'])).seconds < 100
def test_create_individu(db, app, rsu_schema):
def test_create_individu(transactional_db, app, rsu_schema):
def get_reseau(identifier):
reseau_url = reverse('rsu-api-reseau', kwargs={
@ -650,9 +651,9 @@ def test_create_individu(db, app, rsu_schema):
})
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
assert 'temporaire' in response.json['errors'][0]
assert u'irrécupérable' in response.json['errors'][0]
job = Job.objects.get()
assert job.state == Job.STATE_ERROR
assert job.state == Job.STATE_UNRECOVERABLE_ERROR
assert job.content['error']['code'] == 'transport-error'
@httmock.urlmatch()
@ -679,11 +680,70 @@ def test_create_individu(db, app, rsu_schema):
assert 'technocarte' not in get_content(enfant_id)['cles_de_federation']
with httmock.HTTMock(technocarte_ok):
job = Job.objects.get()
job.state = Job.STATE_TODO
job.do()
job = Job.objects.get()
assert job.state == Job.STATE_SUCCESS
assert get_content(first_id)['cles_de_federation']['technocarte'] == '1234'
assert get_content(enfant_id)['cles_de_federation']['technocarte'] == '5678'
assert 'anciennes_cles_de_federation' not in get_content(first_id)
assert 'anciennes_cles_de_federation' not in get_content(enfant_id)
with httmock.HTTMock(technocarte_ok):
job = Job.objects.get()
job.state = Job.STATE_TODO
job.do()
job = Job.objects.get()
assert job.state == Job.STATE_SUCCESS
assert get_content(first_id)['cles_de_federation']['technocarte'] == '1234'
assert get_content(enfant_id)['cles_de_federation']['technocarte'] == '5678'
assert 'anciennes_cles_de_federation' in get_content(first_id)
assert 'anciennes_cles_de_federation' in get_content(enfant_id)
assert get_content(first_id)['anciennes_cles_de_federation'][0][0] == 'technocarte'
assert get_content(first_id)['anciennes_cles_de_federation'][0][2] == '1234'
assert get_content(enfant_id)['anciennes_cles_de_federation'][0][0] == 'technocarte'
assert get_content(enfant_id)['anciennes_cles_de_federation'][0][2] == '5678'
counter = [0]
@httmock.urlmatch()
def technocarte_ok2(url, request):
counter[0] += 1
if counter[0] == 1:
content = [
{
'id-fragment': 1,
'id-metier': str(1234 + counter[0]),
},
{
'id-fragment': 2,
'id-metier': str(5678 + counter[0]),
},
]
else:
content = []
return httmock.response(
200, content,
{
'Content-Type': 'application/json',
})
with httmock.HTTMock(technocarte_ok2):
responses = []
def doit():
responses.append(app.post_json(synchronization_url, params={
'applications': ['infor'],
'individus': [first_id, enfant_id],
}))
threads = [threading.Thread(target=doit) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(responses) == 10
for i, r in enumerate(responses):
assert r.json['err'] == 0
@pytest.mark.django_db(True)

View File

@ -14,14 +14,11 @@ def test_synchro(nanterre_classic_family):
sync = Synchronization.create('technocarte', [
f['marie'], f['kevin']])
old_message = sync.message()
print json.dumps(sync.to_json(), indent=2)
print json.dumps(sync.message(), indent=2)
new_sync = Synchronization.from_json(sync.to_json())
new_message = new_sync.message()
old_message['metadonnees']['date-soumission'] = None
new_message['metadonnees']['date-soumission'] = None
assert old_message == new_message
print json.dumps(new_sync.to_json(), indent=2)
assert new_sync.to_json() == sync.to_json()
@ -307,6 +304,5 @@ def test_infor(app, nanterre_classic_family):
})
assert response.json['err'] == 1
assert response.json['errors']
print response.json
assert Job.objects.count() == 1
assert Job.objects.filter(state=Job.STATE_UNRECOVERABLE_ERROR).count() == 1

View File

@ -181,6 +181,7 @@ REST_FRAMEWORK = {
ZOO_NANTERRE_APPLICATIONS = {
'infor': {
'name': 'Infor',
'rsu_ws_url': 'http://infor.example.com/',
},
'technocarte': {
'name': 'Technocarte',

View File

@ -5,7 +5,9 @@ import datetime
import requests
from requests.exceptions import RequestException
from django.utils.timezone import now
from django.conf import settings
from django.db import DatabaseError
from zoo.models import Entity, Job
@ -106,6 +108,7 @@ class FragmentBuilder(object):
human_result = None
state_on_network_error = Job.STATE_ERROR
lock_individus = False
def __init__(self):
self.fragments = []
@ -136,6 +139,11 @@ class FragmentBuilder(object):
'application %s does not have a rsu_ws_url' % application
self.individus = individus
self.meta = meta
if self.lock_individus:
# on verrouille les entités si nécessaire
Entity.objects.filter(
id__in=[individu.id for individu in self.individus]
).select_for_update()
return self
def message(self):
@ -156,6 +164,27 @@ class FragmentBuilder(object):
return []
def do(self, job=None, **kwargs):
# verrouille les objets
try:
if self.lock_individus:
Entity.objects.filter(
id__in=[individu.id for individu in self.individus]).select_for_update(
nowait=True)
except DatabaseError:
self.human_result = (
'Erreur due à une action concurrente sur les mêmes individus dans le RSU')
self.error = {
'code': 'lock-error',
}
for individu in self.individus:
utils.journalize(
individu,
meta=self.meta,
job_url=job and job.admin_url,
transaction=job and job.transaction,
text=self.human_result,
**kwargs)
return self.state_on_network_error
error_detail = None
try:
if settings.ZOO_NANTERRE_RSU_WS_DEBUG:
@ -228,6 +257,7 @@ class FragmentBuilder(object):
'meta': self.meta,
'response': self.response,
'date_soumission': self.date_soumission,
'state_on_network_error': self.state_on_network_error,
}
for individu in sorted(self.individus, key=lambda i: i.id):
s['individus'].append(individu.id)
@ -258,6 +288,8 @@ class FragmentBuilder(object):
self.meta = serialization['meta']
self.response = serialization['response']
self.date_soumission = serialization.get('date_soumission')
self.state_on_network_error = serialization.get('state_on_network_error',
cls.state_on_network_error)
return self
def reference(self, individu):
@ -368,6 +400,10 @@ class FragmentBuilder(object):
def nouvel_individu(self, individu):
if self.application_id in individu.content['cles_de_federation']:
return # individu déjà fédéré, pas nouveau, on ne fait rien
# tout message contenant une création de fédération n'est pas rejoué
self.state_on_network_error = Job.STATE_UNRECOVERABLE_ERROR
content = individu_to_fragment(individu)
if utils.is_majeur(individu):
fragment = {
@ -462,6 +498,7 @@ class Synchronization(FragmentBuilder):
# pour chaque individu pour lequel la synchronisation est demandée et qui est créé, son
# identifiant de fragment est ajouté à la métadonnée bénéficiaires
service = 'synchronisation'
lock_individus = True
@classmethod
def create(cls, application, individus, meta=None):
@ -521,6 +558,17 @@ class Synchronization(FragmentBuilder):
return self
def handle_response_200(self, response, result, job):
state = self.handle_response_200_real(response, result, job)
if state != Job.STATE_SUCCESS:
for individu in self.fragment_to_id.values():
old_key = individu.content['cles_de_federation'].pop(self.application_id, None)
if old_key:
individu.content.setdefault('anciennes_cles_de_federation', []).insert(
0, [self.application_id, now().isoformat(), old_key])
individu.save()
return state
def handle_response_200_real(self, response, result, job):
id_metiers_map = []
# result can be "null" we must accept it
if result is None:
@ -566,7 +614,12 @@ class Synchronization(FragmentBuilder):
id_metiers_map.append((individu, id_metier))
# apply new id_metier
for individu, id_metier in id_metiers_map:
old_key = individu.content['cles_de_federation'].get(self.application_id)
individu.content['cles_de_federation'][self.application_id] = id_metier
# on sauvegarde les anciennes clés en cas d'écrasement
if old_key:
individu.content.setdefault('anciennes_cles_de_federation', []).insert(
0, [self.application_id, now().isoformat(), old_key])
individu.save()
return Job.STATE_SUCCESS

View File

@ -200,6 +200,7 @@ class CalculQF(fragments.FragmentBuilder):
qf = None
service = 'calcul-quotient-familial'
state_on_network_error = Job.STATE_UNRECOVERABLE_ERROR
lock_individus = True
@classmethod
def create(cls, application, individu,