zoo/tests/test_nanterre.py

1325 lines
52 KiB
Python

# -*- coding: utf-8 -*-
import json
import datetime
import isodate
import requests
import threading
import pytest
import httmock
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.http import urlencode
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from zoo.zoo_data.models import Entity, Relation, Log, Job
from zoo.zoo_nanterre.utils import (PersonSearch, adresse as get_individu_adresse, adresses_norel,
adresses)
def test_person_search(db, rsu):
search = PersonSearch()
found = list(search.search_name(rsu[0].content['prenoms'] + ' ' + rsu[0].content['nom_de_naissance']))
assert rsu[0].id == found[0].id
assert found[0].similarity == 1.0
found = list(search.search_query(rsu[0].content['prenoms'] + ' ' + rsu[0].content['nom_de_naissance']))
assert rsu[0].id == found[0].id
assert found[0].similarity == 1.0
birthdate = datetime.datetime.strptime(rsu[0].content['date_de_naissance'], '%Y-%m-%d').date()
found = list(search.search_birthdate(birthdate))
assert any(x for x in found if x.id == rsu[0].id)
assert len(found) == 1
found = list(search.search_email(rsu[0].content['email']))
assert found[0].id == rsu[0].id
found = list(search.search_identifier(rsu[0].content['cles_de_federation']['technocarte']))
assert found[0].id == rsu[0].id
found = list(search.search_identifier('%d' % rsu[0].id))
assert found[0].id == rsu[0].id
# check that a valid date restrain the scope of the search
found = list(search.search_email(rsu[0].content['email']).search_birthdate('01/01/1919'))
assert len(found) == 0
# check that an invalid date is ignored
found = list(search.search_email(rsu[0].content['email']).search_birthdate('99/01/1919'))
assert len(found) > 0
found = list(search.search_email(rsu[0].content['email']).search_birthdate('99/1919'))
assert len(found) > 0
def test_person_search_api(app, db, rsu):
url = reverse('rsu-api-search')
for statut in ['majeur', 'mineur']:
response = app.get(url + '?statut_legal=' + statut)
assert response.json['err'] == 0
assert len(response.json['data'])
for individu in response.json['data']:
assert individu['statut_legal'] == statut
assert (now() - isodate.parse_datetime(individu['date_de_creation'])).seconds < 100
assert individu['date_de_modification'] == individu['date_de_creation']
birthdate = rsu[0].content['date_de_naissance']
response = app.get(url + '?q=%s' % birthdate)
assert response.json['err'] == 0
assert len(response.json['data'])
assert any(data['id'] == rsu[0].id for data in response.json['data'])
response = app.get(url + '?nom=%s' % rsu[0].content['nom_de_naissance'])
assert len(response.json['data']) > 0
response = app.get(url + '?date_de_naissance=%s' % rsu[0].content['date_de_naissance'])
assert len(response.json['data']) > 0
response = app.get(url + '?cle=42', status=200)
response = app.get(url + '?email=%s' % rsu[0].content['email'])
assert len(response.json['data']) > 0
response = app.get(url + '?NameID=my_a2_username', status=200)
response = app.get(url + '?cle_?=*', status=200)
def test_create_individu(settings, transactional_db, app, app_noauth, rsu_schema):
def get_reseau(identifier):
reseau_url = reverse('rsu-api-reseau', kwargs={
'identifier': identifier
})
response = app.get(reseau_url)
assert response.json['err'] == 0
assert response.json['data']
return response.json['data']
# création du premier adulte
create_url = reverse('rsu-api-create-individu')
response = app.post_json(create_url, params={}, status=400)
assert response.json['err'] == 1
assert len(response.json['errors']) > 5
response = app.post_json(create_url, params={
'prenoms': 'Jean Eude',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': u'Grégoire',
'date_de_naissance': '1992-03-04',
'genre': 'homme',
'email': '',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLe DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
first_id = response.json['data']['id']
first_adult = entity = Entity.objects.get(id=first_id)
assert entity.content['prenoms'] == 'JEAN EUDE'
assert entity.content['nom_de_naissance'] == 'MICHALON-GOURDE'
assert entity.content['nom_d_usage'] == u'GRÉGOIRE'
assert entity.content['date_de_naissance'] == '1992-03-04'
first_adult_address = get_individu_adresse(entity)
assert first_adult_address.content['streetname'] == 'ALLE DE L\'ARLEQUIN'
assert Entity.objects.filter(schema__slug='individu').count() == 1
assert Entity.objects.filter(schema__slug='adresse').count() == 1
assert Relation.objects.count() == 1
assert Log.objects.filter(entity__id=first_id).count() == 1
# extraction depuis l'API
get_url = reverse('rsu-api-reseau', kwargs={'identifier': first_id})
response = app.get(get_url)
assert response.json['err'] == 0
individu = response.json['data']
assert individu['prenoms'] == 'JEAN EUDE'
assert individu['nom_de_naissance'] == 'MICHALON-GOURDE'
assert individu['nom_d_usage'] == u'GRÉGOIRE'
assert individu['date_de_naissance'] == '1992-03-04'
assert individu['adresses'][0]['streetname'] == 'ALLE DE L\'ARLEQUIN'
assert (now() - isodate.parse_datetime(individu['date_de_creation'])).seconds < 100
assert individu['date_de_modification'] == individu['date_de_creation']
# mise à jour premier adulte
update_url = reverse('rsu-api-reseau', kwargs={
'identifier': first_id})
response = app.post_json(update_url, params={
'nom_d_usage': 'Grégorio',
'date_de_naissance': '1991-03-04',
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLéE DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert response.json['err'] == 0
assert response.json['data']['id'] == first_id
entity.refresh_from_db()
assert entity.content['date_de_naissance'] == '1991-03-04'
assert entity.content['nom_d_usage'] == u'GRÉGORIO'
streetname = get_individu_adresse(entity).content['streetname']
assert streetname == u'ALLÉE DE L\'ARLEQUIN'
# extraction via l'API
get_url = reverse('rsu-api-reseau', kwargs={'identifier': first_id})
response = app.get(get_url)
assert response.json['err'] == 0
individu = response.json['data']
assert individu['prenoms'] == 'JEAN EUDE'
assert individu['nom_de_naissance'] == 'MICHALON-GOURDE'
assert individu['nom_d_usage'] == u'GRÉGORIO'
assert individu['date_de_naissance'] == '1991-03-04'
assert individu['adresses'][0]['streetname'] == u'ALLÉE DE L\'ARLEQUIN'
assert (now() - isodate.parse_datetime(individu['date_de_modification'])).seconds < 100
assert individu['date_de_modification'] != individu['date_de_creation']
# vérification dans la base
assert Entity.objects.count() == 2
assert Entity.objects.filter(schema__slug='individu').count() == 1
assert Entity.objects.filter(schema__slug='adresse').count() == 1
assert Relation.objects.count() == 1
assert Log.objects.filter(entity__id=first_id).count() == 2
assert get_individu_adresse(first_adult) == first_adult_address
# création second adulte
response = app.post_json(create_url, params={
'prenoms': 'Micheline',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': '',
'date_de_naissance': '1990-05-05',
'genre': 'femme',
'email': 'micheline@gmail.com',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLE DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert Entity.objects.count() == 4
assert Entity.objects.filter(schema__slug='individu').count() == 2
assert Entity.objects.filter(schema__slug='adresse').count() == 2
assert Relation.objects.count() == 2
assert Relation.objects.filter(schema__slug='habite').count() == 2
second_id = response.json['data']['id']
second_adult = Entity.objects.get(id=second_id)
second_adult_address = get_individu_adresse(second_adult)
assert Log.objects.filter(entity__id=second_id).count() == 1
# création d'un troisième adulte
response = app.post_json(create_url, params={
'prenoms': 'John',
'nom_de_naissance': 'Doe',
'nom_d_usage': '',
'date_de_naissance': '1981-05-05',
'genre': 'homme',
'email': 'john.doe@gmail.com',
'telephones': [
],
'adresse': {
'at': '',
'streetnumber': '12',
'streetnumberext': '',
'streetname': 'RUE DU PAPE',
'ext1': '',
'ext2': '',
'streetmatriculation': '00055',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert Entity.objects.count() == 6
assert Entity.objects.filter(schema__slug='individu').count() == 3
assert Entity.objects.filter(schema__slug='adresse').count() == 3
assert Relation.objects.count() == 3
assert Relation.objects.filter(schema__slug='habite').count() == 3
third_id = response.json['data']['id']
third_adult = Entity.objects.get(id=third_id)
third_adult_address = get_individu_adresse(third_adult)
assert Log.objects.filter(entity__id=third_id).count() == 1
# rattachement du premier enfant au premier adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': first_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant': {
'prenoms': u'kévin',
'nom_de_naissance': 'Michalon-Gourde',
'genre': 'homme',
'date_de_naissance': '2015-03-04',
'email': '',
'telephones': [],
}
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
enfant_id = response.json['data']['id']
first_child = Entity.objects.get(id=enfant_id)
assert Log.objects.filter(entity__id=enfant_id).count() == 1
assert Log.objects.filter(entity__id=first_id).count() == 3
assert get_individu_adresse(first_child) == first_adult_address
# rattachement du premier enfant au deuxième adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': second_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant_id': enfant_id,
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
assert Log.objects.filter(entity__id=enfant_id).count() == 2
assert Log.objects.filter(entity__id=second_id).count() == 2
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
# détachement du premier enfant du premier adulte
suppression_responsabilite_url = reverse(
'rsu-api-suppression-lien-de-responsabilite',
kwargs={
'identifier': first_id,
'identifier_enfant': enfant_id
})
response = app.post_json(suppression_responsabilite_url)
assert response.json['err'] == 0
assert Log.objects.filter(entity__id=enfant_id).count() == 3
assert Log.objects.filter(entity__id=first_id).count() == 4
assert get_individu_adresse(first_child) == second_adult_address
# ré-attachement du premier enfant au premier adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': first_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant_id': enfant_id,
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
assert Log.objects.filter(entity__id=enfant_id).count() == 4
assert Log.objects.filter(entity__id=first_id).count() == 5
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
# rattachement d'un deuxième enfant au deuxième adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': second_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant': {
'prenoms': u'huguette',
'nom_de_naissance': 'Michalon-Gourde',
'genre': 'femme',
'date_de_naissance': '2016-03-04',
'email': '',
'telephones': [],
}
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
enfant_id_2 = response.json['data']['id']
second_child = Entity.objects.get(id=enfant_id_2)
assert get_individu_adresse(second_child) == second_adult_address
assert Entity.objects.count() == 8
assert Entity.objects.filter(schema__slug='individu').count() == 5
assert Entity.objects.filter(schema__slug='adresse').count() == 3
assert Relation.objects.count() == 9
assert Relation.objects.filter(schema__slug='habite').count() == 6
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 3
assert Log.objects.filter(entity__id=enfant_id_2).count() == 1
assert Log.objects.filter(entity__id=second_id).count() == 3
# rattachement du premier enfant à un troisième responsable légal
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': third_id})
# comme parent cela doit échouer
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant_id': enfant_id,
}, status=400)
assert response.json['err'] == 1
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
# comme tiers de confiance cela marche
response = app.post_json(enfant_url, params={
'statut': 'tiers_de_confiance',
'enfant_id': enfant_id,
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
assert Log.objects.filter(entity__id=enfant_id).count() == 5
assert Log.objects.filter(entity__id=third_id).count() == 2
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address,
third_adult_address])
# on détache l'enfant du tiers
suppression_responsabilite_url = reverse(
'rsu-api-suppression-lien-de-responsabilite',
kwargs={
'identifier': third_id,
'identifier_enfant': enfant_id
})
response = app.post_json(suppression_responsabilite_url)
assert response.json['err'] == 0
assert Log.objects.filter(entity__id=enfant_id).count() == 6
assert Log.objects.filter(entity__id=third_id).count() == 3
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
# déclaration d'union entre les deux adultes
union_url = reverse('rsu-api-declaration-union')
# erreur: on ne peut pas se marrier avec un enfant, voyons !
response = app.post_json(union_url, params={
'individu_id_1': first_id,
'individu_id_2': enfant_id,
'statut': 'pacs/mariage',
'adresse_commune': 1,
})
assert response.json['err'] == 1
assert response.json['errors']
# ok, deux adultes
response = app.post_json(union_url, params={
'individu_id_1': first_id,
'individu_id_2': second_id,
'statut': 'unionlibre',
'adresse_commune': 1,
})
assert response.json['err'] == 0
assert Entity.objects.filter(schema__slug='individu').count() == 5
assert Entity.objects.filter(schema__slug='adresse').count() == 2
assert Entity.objects.count() == 7
assert Relation.objects.filter(schema__slug='habite').count() == 5
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 3
assert Relation.objects.filter(schema__slug='union').count() == 1
assert Relation.objects.filter(schema__slug='union', content__statut='unionlibre').count() == 1
assert Log.objects.filter(entity__id=first_id).count() == 6
assert Log.objects.filter(entity__id=second_id).count() == 4
assert Relation.objects.count() == 9
# l'adress conservée est celle du premier adulte
assert get_individu_adresse(first_adult) == first_adult_address
assert get_individu_adresse(second_adult) == first_adult_address
assert get_individu_adresse(first_child) == first_adult_address
assert get_individu_adresse(second_child) == first_adult_address
# l'adresse du deuxième adulte a été supprimé
assert Entity.objects.filter(id=second_adult_address.id).count() == 0
# déclaration d'adresse principale pour le premier enfant
declaration_adresse_principale_url = reverse('rsu-api-declaration-adresse-principale', kwargs={
'identifier': enfant_id
})
# requette get
response = app.get(declaration_adresse_principale_url)
assert response.content_type == 'application/json'
assert not response.json['err']
data = get_reseau(enfant_id)
assert len(data['adresses']) == 1
assert data['adresses'][0]['principale'] is False
# sur première adresse ok
response = app.post_json(declaration_adresse_principale_url, params={
'adresse_principale': 1,
})
assert response.json['err'] == 0
data = get_reseau(enfant_id)
assert len(data['adresses']) == 1
assert data['adresses'][0]['principale'] is True
assert Log.objects.filter(entity__id=enfant_id).count() == 7
# pas de seconde adresse, ça doit foirer
response = app.post_json(declaration_adresse_principale_url, params={
'adresse_principale': 2,
})
assert response.json['err'] == 1
assert response.json['errors']
# adresse 0 ne veut rien dire
response = app.post_json(declaration_adresse_principale_url, params={
'adresse_principale': 0,
})
assert response.json['err'] == 1
assert response.json['errors']
# sur un adulte, pas possible
declaration_adresse_principale_url = reverse('rsu-api-declaration-adresse-principale', kwargs={
'identifier': first_id
})
response = app.post_json(declaration_adresse_principale_url, params={
'adresse_principale': 1,
})
assert response.json['err'] == 1
assert response.json['errors']
data = get_reseau(first_id)
assert data['union']
assert data['union_statut'] == 'unionlibre'
# teste du ws reseau-liste
reseau_liste_url = reverse('rsu-api-reseau-liste', kwargs={
'identifier': second_id,
})
response = app.get(reseau_liste_url)
assert response.json['err'] == 0
assert len(response.json['data']) == 4
assert all('text' in x for x in response.json['data'])
assert response.json['data'][0]['id'] == second_id
assert response.json['data'][1]['id'] == first_id
assert response.json['data'][2]['id'] == enfant_id
assert response.json['data'][3]['id'] == enfant_id_2
response = app.get(reseau_liste_url + '?conjoint')
assert response.json['err'] == 0
assert len(response.json['data']) == 1
assert all('text' in x for x in response.json['data'])
assert response.json['data'][0]['id'] == first_id
response = app.get(reseau_liste_url + '?enfants')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
assert all('text' in x for x in response.json['data'])
assert response.json['data'][0]['id'] == enfant_id
assert response.json['data'][1]['id'] == enfant_id_2
enfant = Entity.objects.get(id=enfant_id)
ddn = enfant.content['date_de_naissance']
enfant.content['date_de_naissance'] = (now() + datetime.timedelta(100)).date().isoformat()
enfant.save()
response = app.get(reseau_liste_url + '?enfants')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
assert all('text' in x for x in response.json['data'])
assert response.json['data'][0]['id'] == enfant_id_2
assert response.json['data'][1]['id'] == enfant_id
enfant.content['date_de_naissance'] = ddn
enfant.save()
# changement de situation maritale entre les deux adultes
changement_de_situation_maritale_url = reverse('rsu-api-changement-de-situation-maritale',
kwargs={
'identifier': first_id
})
response = app.post_json(changement_de_situation_maritale_url, params={
'statut': 'pacs/mariage'
})
assert response.json['err'] == 0
data = get_reseau(first_id)
assert data['union']
assert data['union_statut'] == 'pacs/mariage'
assert Log.objects.filter(entity__id=first_id).count() == 7
assert Log.objects.filter(entity__id=second_id).count() == 5
# pour les adresses rien ne change
assert get_individu_adresse(first_adult) == first_adult_address
assert get_individu_adresse(second_adult) == first_adult_address
assert get_individu_adresse(first_child) == first_adult_address
assert get_individu_adresse(second_child) == first_adult_address
# déclaration de séparation entre les deux adultes
separation_url = reverse('rsu-api-separation', kwargs={
'identifier': first_id
})
response = app.get(separation_url)
assert response.json['err'] == 0
data = response.json['data']
assert set(data.keys()) == set(['union', 'union_statut', 'enfants'])
assert data['union']['id'] == second_id
assert data['union_statut'] == 'pacs/mariage'
assert len(data['enfants']) == 1
assert data['enfants'][0]['id'] == enfant_id
response = app.post_json(separation_url, params={
'adresse_principale_1': [enfant_id],
})
assert Entity.objects.count() == 8
assert Entity.objects.filter(schema__slug='individu').count() == 5
assert Entity.objects.filter(schema__slug='adresse').count() == 3
assert Relation.objects.count() == 9
# le premier adulte a toujours la même adresse
assert get_individu_adresse(first_adult) == first_adult_address
# le deuxième adulte a une nouvelle adresse
assert get_individu_adresse(second_adult) != first_adult_address
assert get_individu_adresse(second_adult) != second_adult_address
second_adult_address = get_individu_adresse(second_adult)
# le premier enfant a deux adresses
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
for adress, rel in adresses(first_child):
if adress == first_adult_address:
assert rel.content['principale']
# le second enfant a une adresse
assert get_individu_adresse(second_child) == second_adult_address
# tout le monde habite quelque part, un enfant a deux logements
adresse1 = Relation.objects.filter(left_id=first_id, schema__slug='habite').get().right
adresse2 = Relation.objects.filter(left_id=second_id, schema__slug='habite').get().right
assert Relation.objects.filter(schema__slug='habite').count() == 6
# le premier adulte et le premier enfant partagent un logement
assert Relation.objects.filter(left_id=enfant_id, right_id=adresse1.id).count() == 1
# le deuxième adulte et le premier enfant partagent un logement
assert Relation.objects.filter(left_id=enfant_id, right_id=adresse2.id).count() == 1
# le deuxième adulte et le deuxième enfant partagent un logement
assert Relation.objects.filter(left_id=enfant_id_2, right_id=adresse2.id).count() == 1
# le premier et le deuxième adulte ne partagent plus de logement
assert Relation.objects.filter(left_id=first_id, right_id=adresse2.id).count() == 0
assert Relation.objects.filter(left_id=second_id, right_id=adresse1.id).count() == 0
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 3
assert Relation.objects.filter(schema__slug='union').count() == 0
assert Log.objects.filter(entity__id=first_id).count() == 8
assert Log.objects.filter(entity__id=second_id).count() == 6
deces_url = reverse('rsu-api-declaration-de-deces',
kwargs={
'identifier': second_id,
})
response = app.post_json(deces_url, params={
'date_de_deces': datetime.date.today().isoformat(),
'journal_title': 'coucou',
})
assert response.json['err'] == 0
assert Entity.objects.filter(schema__slug='individu').count() == 5
assert Entity.objects.filter(schema__slug='adresse').count() == 3
assert Entity.objects.count() == 8
assert Relation.objects.filter(schema__slug='habite').count() == 4
# le deuxième enfant n'a plus de logement
assert Relation.objects.filter(
left=enfant_id_2, schema__slug='habite').count() == 0
# le premier adulte et le premier enfant partagent un logement
assert Relation.objects.filter(
left=first_id, right__right_relations__left=enfant_id).count() == 1
# le deuxième adulte et le premier enfant ne partagent plus de logement
assert Relation.objects.filter(
left=second_id, right__right_relations__left=enfant_id).count() == 0
# le deuxième adulte a un logment
assert Relation.objects.filter(
left=second_id, schema__slug='habite').count() == 1
# le premier et le deuxième adulte ne partagent plus de logement
assert Relation.objects.filter(
left=first_id,
right__right_relations__schema__slug='habite',
right__right_relations__left=second_id).count() == 0
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 1
assert Relation.objects.filter(schema__slug='union').count() == 0
assert Relation.objects.count() == 5
assert Log.objects.filter(entity__id=second_id).count() == 7
assert (Log.objects.filter(entity__id=second_id).latest('timestamp').content['meta']['title']
== 'coucou')
# écriture dans le journal du premier adulte
journal_url = reverse('rsu-api-journal', kwargs={'identifier': first_id})
for i in range(30):
response = app.post_json(journal_url, params={
'id': i,
})
assert response.json['err'] == 0
assert Log.objects.filter(entity__id=first_id)[0].content['id'] == i
qs = Log.objects.filter(entity__id=first_id)
more = journal_url + '?limit=10'
count = 0
all_data = []
while more:
response = app.get(more)
assert response.json['err'] == 0
assert len(response.json['data'])
count += len(response.json['data'])
all_data.extend(response.json['data'])
more = response.json.get('more')
if more:
assert 'cookie' in response.json
parsed = urlparse.urlparse(response.json['more'])
query = parsed.query
assert urlparse.parse_qs(query)['cookie'] == [response.json['cookie']]
assert sorted(d['id'] for d in all_data) == sorted(qs.values_list('id', flat=True))
assert count == qs.count()
assert 'more' not in response.json
# lecture avec filtrage
response = app.get(journal_url + '?' + urlencode({'filter_text': u'Déclaration d\'union'}))
assert 'more' not in response.json
assert response.json['data']
assert len(response.json['data']) == 1
synchronization_url = reverse('rsu-api-synchronization')
@httmock.urlmatch()
def connection_error(url, request):
raise requests.ConnectionError
with httmock.HTTMock(connection_error):
response = app.post_json(synchronization_url, params={
'applications': ['technocarte'],
'individus': [first_id, enfant_id],
})
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
assert u'irrécupérable' in response.json['errors'][0]
job = Job.objects.get()
assert job.state == Job.STATE_UNRECOVERABLE_ERROR
assert job.content['error']['code'] == 'transport-error'
@httmock.urlmatch()
def technocarte_ok(url, request):
return httmock.response(
200, [
{
'id-fragment': 1,
'id-metier': '1234'
},
{
'id-fragment': 2,
'id-metier': '5678',
},
],
{
'Content-Type': 'application/json',
})
def get_content(key):
return Entity.objects.get(id=key).content
assert 'technocarte' not in get_content(first_id)['cles_de_federation']
assert 'technocarte' not in get_content(enfant_id)['cles_de_federation']
with httmock.HTTMock(technocarte_ok):
job = Job.objects.get()
job.state = Job.STATE_TODO
job.do()
job = Job.objects.get()
assert job.state == Job.STATE_SUCCESS
assert get_content(first_id)['cles_de_federation']['technocarte'] == '1234'
assert get_content(enfant_id)['cles_de_federation']['technocarte'] == '5678'
assert 'anciennes_cles_de_federation' not in get_content(first_id)
assert 'anciennes_cles_de_federation' not in get_content(enfant_id)
with httmock.HTTMock(technocarte_ok):
job = Job.objects.get()
job.state = Job.STATE_TODO
job.do()
job = Job.objects.get()
assert job.state == Job.STATE_SUCCESS
assert get_content(first_id)['cles_de_federation']['technocarte'] == '1234'
assert get_content(enfant_id)['cles_de_federation']['technocarte'] == '5678'
assert 'anciennes_cles_de_federation' in get_content(first_id)
assert 'anciennes_cles_de_federation' in get_content(enfant_id)
assert get_content(first_id)['anciennes_cles_de_federation'][0][0] == 'technocarte'
assert get_content(first_id)['anciennes_cles_de_federation'][0][2] == '1234'
assert get_content(enfant_id)['anciennes_cles_de_federation'][0][0] == 'technocarte'
assert get_content(enfant_id)['anciennes_cles_de_federation'][0][2] == '5678'
counter = [0]
@httmock.urlmatch()
def technocarte_ok2(url, request):
counter[0] += 1
if counter[0] == 1:
content = [
{
'id-fragment': 1,
'id-metier': str(1234 + counter[0]),
},
{
'id-fragment': 2,
'id-metier': str(5678 + counter[0]),
},
]
else:
content = []
return httmock.response(
200, content,
{
'Content-Type': 'application/json',
})
with httmock.HTTMock(technocarte_ok2):
responses = []
def doit():
responses.append(app.post_json(synchronization_url, params={
'applications': ['infor'],
'individus': [first_id, enfant_id],
}))
threads = [threading.Thread(target=doit) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(responses) == 10
for i, r in enumerate(responses):
assert r.json['err'] == 0
# test obtention de clés de fédération
def get_federation(uuid, **kwargs):
return app_noauth.get('/rsu/individu/%s/federation/technocarte/' % uuid, **kwargs).json
first = Entity.objects.get(id=first_id)
first.content['cles_de_federation']['authentic'] = 'abcd'
first.save()
enfant = Entity.objects.get(id=enfant_id)
enfant.content['cles_de_federation']['authentic'] = 'efgh'
enfant.save()
assert get_federation('abcd', status=403)['err'] == 1
assert get_federation('efgh', status=403)['err'] == 1
assert get_federation('abcd', params={'apikey': 'xyz'}, status=403)['err'] == 1
settings.ZOO_NANTERRE_APPLICATIONS['technocarte']['apikey'] = 'xyz'
assert get_federation('abcd', status=401)['err'] == 1
assert get_federation('efgh', status=401)['err'] == 1
assert get_federation('abcd', params={'apikey': 'xyz'})['cle_de_federation'] == '1234'
assert get_federation('efgh', params={'apikey': 'xyz'})['cle_de_federation'] == '5678'
settings.ZOO_NANTERRE_APPLICATIONS['technocarte']['apikey'] = ''
assert get_federation('abcd', params={'apikey': 'xyz'}, status=403)['err'] == 1
@pytest.mark.django_db(True)
def test_cles_de_federations(app, rsu_schema):
# creation avec une clé authentic
create_url = reverse('rsu-api-create-individu')
individu = {
'prenoms': 'Jean Eude',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': u'Grégoire',
'date_de_naissance': '1992-03-04',
'genre': 'homme',
'email': '',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLe DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
}
individu['cles_de_federation'] = {'authentic': '1234'}
response = app.post_json(create_url, params=individu, status=400)
assert response.json['err'] == 1
assert 'data' not in response.json
assert response.json['errors'][0] == u'clés de fédération non admises lors de la création d\'un individu'
individu['cles_de_federation'] = {}
response = app.post_json(create_url, params=individu)
assert response.json['err'] == 0
assert 'id' in response.json['data']
first_id = response.json['data']['id']
# ajout d'une clé de fédération
update_url = reverse('rsu-api-reseau', kwargs={'identifier': first_id})
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '1234',
},
})
entity = Entity.objects.get(id=first_id)
assert entity.content['cles_de_federation']['authentic'] == '1234'
assert Entity.objects.filter(content__cles_de_federation__authentic='1234').count() == 1
assert Entity.objects.filter(content__cles_de_federation__authentic='1234').get().id == first_id
# création second adulte
response = app.post_json(create_url, params={
'prenoms': 'Micheline',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': '',
'date_de_naissance': '1990-05-05',
'genre': 'femme',
'email': 'micheline@gmail.com',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLE DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
second_id = response.json['data']['id']
# ajout d'une clé de fédération : refus car déjà utilisée par individu 1
update_url = reverse('rsu-api-reseau', kwargs={'identifier': second_id})
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '1234',
},
}, status=400)
assert response.json['err'] == 1
assert 'data' not in response.json
assert response.json['errors'][0] == u'la clé authentic 1234 est déjà utilisée par l\'individu #%s' % first_id
# ajout d'une clé de fédération
update_url = reverse('rsu-api-reseau', kwargs={'identifier': second_id})
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '4321',
},
})
entity2 = Entity.objects.get(id=second_id)
assert entity2.content['cles_de_federation']['authentic'] == '4321'
assert Entity.objects.filter(content__cles_de_federation__authentic='4321').get().id == second_id
# mise à jour clés individu 1 : refus car déjà utilisée sur individu 2
update_url = reverse('rsu-api-reseau', kwargs={'identifier': first_id})
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '4321',
'technocarte': 'technoid',
},
}, status=400)
assert response.json['err'] == 1
assert 'data' not in response.json
assert response.json['errors'][0] == u'la clé authentic 4321 est déjà utilisée par l\'individu #%s' % second_id
# mise à jour clés individu 1
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '12345',
'technocarte': 'technoid',
},
})
assert response.json['err'] == 0
assert response.json['data']['id'] == first_id
entity.refresh_from_db()
assert entity.content['cles_de_federation']['authentic'] == '12345'
assert entity.content['cles_de_federation']['technocarte'] == 'technoid'
assert Entity.objects.filter(content__cles_de_federation__authentic='1234').count() == 0
assert Entity.objects.filter(content__cles_de_federation__authentic='4321').count() == 1
assert Entity.objects.filter(content__cles_de_federation__authentic='12345').count() == 1
assert Entity.objects.filter(content__cles_de_federation__authentic='12345').get().id == first_id
assert Entity.objects.filter(content__cles_de_federation__technocarte='technoid').count() == 1
assert Entity.objects.filter(content__cles_de_federation__technocarte='technoid').get().id == first_id
# mise à jour clés individu 1 : modification de technocarte mais pas d'authentic
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': '12345',
'technocarte': 'idtechno',
},
})
assert response.json['err'] == 0
assert response.json['data']['id'] == first_id
entity.refresh_from_db()
assert entity.content['cles_de_federation']['authentic'] == '12345'
assert entity.content['cles_de_federation']['technocarte'] == 'idtechno'
assert Entity.objects.filter(content__cles_de_federation__authentic='12345').count() == 1
assert Entity.objects.filter(content__cles_de_federation__authentic='12345').get().id == first_id
assert Entity.objects.filter(content__cles_de_federation__technocarte='technoid').count() == 0
assert Entity.objects.filter(content__cles_de_federation__technocarte='idtechno').count() == 1
assert Entity.objects.filter(content__cles_de_federation__technocarte='idtechno').get().id == first_id
# suppression clé authentic
update_url = reverse('rsu-api-reseau', kwargs={'identifier': first_id})
response = app.post_json(update_url, params={
'cles_de_federation': {
'authentic': ''
},
})
assert response.json['err'] == 0
assert response.json['data']['id'] == first_id
entity.refresh_from_db()
assert 'authentic' not in entity.content['cles_de_federation']
assert entity.content['cles_de_federation']['technocarte'] == 'idtechno'
assert Entity.objects.filter(content__cles_de_federation__authentic='1234').count() == 0
assert Entity.objects.filter(content__cles_de_federation__authentic='12345').count() == 0
assert Entity.objects.filter(content__cles_de_federation__technocarte='idtechno').count() == 1
assert Entity.objects.filter(content__cles_de_federation__technocarte='idtechno').get().id == first_id
# suppression d'un individu
suppression_url = reverse('rsu-api-suppression-individu', kwargs={
'identifier': first_id})
response = app.post(suppression_url)
# impossible il y a toujours une clé
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
# on supprimer la clé technocarte
response = app.post_json(update_url, params={
'cles_de_federation': {
'technocarte': ''
},
})
entities = Entity.objects.all()
relations = Relation.objects.all()
# on a initialement 2 individus avec 2 adresses
assert entities.count() == 4
# on retente
response = app.post(suppression_url)
# c'est ok
assert response.json['err'] == 0
# on vérifie qu'il n'y a plus qu'une fiche et qu'une adresse et qu'ils sont liés
assert entities.filter(schema__slug='individu').count() == 1
assert entities.filter(id=second_id).count() == 1
assert entities.filter(schema__slug='adresse').count() == 1
assert relations.count() == 1
assert relations.filter(left__id=second_id, schema__slug='habite').count() == 1
def test_separation(db, app, rsu_schema):
# création du premier adulte
create_url = reverse('rsu-api-create-individu')
response = app.post_json(create_url, params={
'prenoms': 'Jean Eude',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': u'Grégoire',
'date_de_naissance': '1992-03-04',
'genre': 'homme',
'email': '',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLe DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
assert response.json['err'] == 0
first_adult_id = response.json['data']['id']
first_adult = entity = Entity.objects.get(id=first_adult_id)
first_adult_address = get_individu_adresse(entity)
# création second adulte
response = app.post_json(create_url, params={
'prenoms': 'Micheline',
'nom_de_naissance': 'Michalon-Gourde',
'nom_d_usage': '',
'date_de_naissance': '1990-05-05',
'genre': 'femme',
'email': 'micheline@gmail.com',
'telephones': [
{
'numero': '0609080604',
'type': 'mobile',
},
],
'adresse': {
'at': '',
'streetnumber': '123',
'streetnumberext': '',
'streetname': 'ALLE DE L\'ARLEQUIN',
'ext1': '',
'ext2': '',
'streetmatriculation': '00032',
'zipcode': '92000',
'inseecode': '92000',
'city': 'NANTERRE',
'country': 'FRANCE',
}
})
second_adult_id = response.json['data']['id']
second_adult = Entity.objects.get(id=second_adult_id)
second_adult_address = get_individu_adresse(second_adult)
# déclaration d'union entre les deux adultes
union_url = reverse('rsu-api-declaration-union')
response = app.post_json(union_url, params={
'individu_id_1': first_adult_id,
'individu_id_2': second_adult_id,
'statut': 'pacs/mariage',
'adresse_commune': 2,
})
assert response.json['err'] == 0
assert get_individu_adresse(first_adult) == second_adult_address
assert get_individu_adresse(second_adult) == second_adult_address
assert Entity.objects.filter(id=first_adult_address.id).count() == 0
# on ne peut supprimer aucun des deux
suppression_url = reverse('rsu-api-suppression-individu', kwargs={
'identifier': first_adult_id})
response = app.post(suppression_url)
# impossible il y a une union
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
assert 'conjoint' in response.json['errors'][0]
suppression_url = reverse('rsu-api-suppression-individu', kwargs={
'identifier': second_adult_id})
response = app.post(suppression_url)
# impossible il y a une union
assert response.json['err'] == 1
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
assert 'conjoint' in response.json['errors'][0]
# rattachement du premier enfant au premier adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': first_adult_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant': {
'prenoms': u'kévin',
'nom_de_naissance': 'Michalon-Gourde',
'genre': 'homme',
'date_de_naissance': '2015-03-04',
'email': '',
'telephones': [],
}
})
assert response.json['err'] == 0
assert 'id' in response.json['data']
first_child_id = response.json['data']['id']
first_child = Entity.objects.get(id=first_child_id)
assert get_individu_adresse(first_child) == second_adult_address
# on ne peut supprimer le parent, il a un enfant et un conjoint
suppression_url = reverse('rsu-api-suppression-individu', kwargs={
'identifier': first_adult_id})
response = app.post(suppression_url)
# impossible il y a une union
assert response.json['err'] == 1
assert len(response.json['errors']) == 2
assert 'enfant' in response.json['errors'][0]
assert 'conjoint' in response.json['errors'][1]
# on ne peut supprimer l'enfant, il a un parent
suppression_url = reverse('rsu-api-suppression-individu', kwargs={
'identifier': first_child_id})
response = app.post(suppression_url)
# impossible il y a une union
assert response.json['err'] == 1
assert len(response.json['errors']) == 1
assert 'avec un parent' in response.json['errors'][0]
# rattachement du premier enfant au deuxième adulte
enfant_url = reverse('rsu-api-declaration-responsabilite-legale',
kwargs={'identifier': second_adult_id})
response = app.post_json(enfant_url, params={
'statut': 'parent',
'enfant_id': first_child_id,
})
assert response.json['err'] == 0
assert get_individu_adresse(first_child) == second_adult_address
# déclaration de séparation entre les deux adultes
separation_url = reverse('rsu-api-separation', kwargs={
'identifier': second_adult_id,
})
response = app.post_json(separation_url, params={
'adresse_principale_1': [first_child_id],
})
# le premier adulte a une nouvelle adresse
assert get_individu_adresse(first_adult) != second_adult_address
assert get_individu_adresse(first_adult) != first_adult_address
# le second adulte a toujours la même adresse
assert get_individu_adresse(second_adult) == second_adult_address
# le premier enfant a deux adresses
first_adult_address = get_individu_adresse(first_adult)
assert set(adresses_norel(first_child)) == set([first_adult_address, second_adult_address])
for adress, rel in adresses(first_child):
if adress == second_adult_address:
assert rel.content['principale']
def test_rsu_cron(db, settings, nanterre_classic_family):
import datetime
from django.utils.timezone import now
from zoo.zoo_data.models import Log
from django.core.management import call_command
d = nanterre_classic_family
settings.ZOO_NANTERRE_LOG_EXPIRATION_DAYS = 9
n = now()
logs = [Log.objects.create(entity=d['jean'], content={}) for i in range(20)]
# modify timestamp
for i, l in enumerate(logs):
l.timestamp = n - datetime.timedelta(days=i)
l.save()
assert Log.objects.count() == 20
call_command('rsu-cron', verbosity=0)
assert Log.objects.count() == 10
def test_passage_a_la_majorite(db, settings, nanterre_classic_family, freezer):
from zoo.zoo_nanterre.utils import passage_a_la_majorite
from django.core.management import call_command
freezer.move_to('2019-01-01')
# set dummy cles de federation with technocarte
for individu in Entity.objects.filter(schema__slug='individu'):
individu.content['cles_de_federation'] = {'technocarte': str(individu.id)}
individu.save()
for key in nanterre_classic_family:
nanterre_classic_family[key].refresh_from_db()
assert Entity.objects.filter(schema__slug='individu').count() == 4
assert Entity.objects.filter(content__statut_legal='majeur').count() == 2
assert Entity.objects.filter(schema__slug='adresse').count() == 1
assert Relation.objects.filter(schema__slug='habite').count() == 4
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 4
# passage à la majorité de lilou
requests = []
assert passage_a_la_majorite() is None
settings.ZOO_NANTERRE_PASSAGE_A_LA_MAJORITE = True
result = passage_a_la_majorite()
assert result['updated_entities'] == 1
assert result['deleted_relations'] == 2
assert Entity.objects.filter(schema__slug='individu').count() == 4
assert Entity.objects.filter(schema__slug='adresse').count() == 2
assert Relation.objects.filter(schema__slug='habite').count() == 4
assert Relation.objects.filter(schema__slug='responsabilite-legale').count() == 2
assert Entity.objects.filter(content__statut_legal='majeur').count() == 3
assert Entity.objects.filter(content__prenoms='LILOU', content__statut_legal='majeur').count() == 1
assert Job.objects.count() == 1
assert Job.objects.todo().count() == 1
assert len(requests) == 0
@httmock.urlmatch()
def technocarte_ok(url, request):
requests.append(request)
return httmock.response(
200, 'null',
{
'Content-Type': 'application/json',
})
with httmock.HTTMock(technocarte_ok):
Job.redo(timestamp=now() + datetime.timedelta(seconds=20))
assert len(requests) == 1
req_content = json.loads(force_text(requests[0].body))
assert req_content['metadonnees']['service'] == 'passage-majorite'
assert len(req_content['fragments']) == 3
assert req_content['fragments'][0]['type'] == 'maj-adresse'
assert req_content['fragments'][1]['type'] == 'suppression-relation'
assert (req_content['fragments'][1]['fragment']['beneficiaire1']
== nanterre_classic_family['jean'].content['cles_de_federation']['technocarte'])
assert (req_content['fragments'][1]['fragment']['beneficiaire2']
== nanterre_classic_family['lilou'].content['cles_de_federation']['technocarte'])
assert req_content['fragments'][2]['type'] == 'suppression-relation'
assert (req_content['fragments'][2]['fragment']['beneficiaire1']
== nanterre_classic_family['marie'].content['cles_de_federation']['technocarte'])
assert (req_content['fragments'][2]['fragment']['beneficiaire2']
== nanterre_classic_family['lilou'].content['cles_de_federation']['technocarte'])
freezer.move_to('2025-12-31')
result = passage_a_la_majorite()
assert result['updated_entities'] == 0
assert result['deleted_relations'] == 0
assert Entity.objects.filter(content__statut_legal='majeur').count() == 3
assert Entity.objects.filter(content__prenoms='LILOU', content__statut_legal='majeur').count() == 1
# passage à la majorité de Kévin le jour de son anniversaire
freezer.move_to('2026-01-01')
call_command('rsu-cron')
assert Entity.objects.filter(content__statut_legal='majeur').count() == 4
assert Entity.objects.filter(content__statut_legal='mineur').count() == 0
assert Entity.objects.filter(content__prenoms=u'KÉVIN', content__statut_legal='majeur').count() == 1