toulouse-maelis: get users linked to a family (#77295)
gitea/passerelle/pipeline/head This commit looks good Details

This commit is contained in:
Nicolas Roche 2023-11-02 14:54:39 +01:00 committed by Nicolas Roche
parent aa9585071a
commit 5cd1e3aacc
3 changed files with 205 additions and 1 deletions

View File

@ -30,15 +30,18 @@ from django.core.serializers.json import DjangoJSONEncoder
from django.db import models, transaction
from django.db.models import JSONField
from django.http import Http404, HttpResponse
from django.template.loader import render_to_string as render_template_to_string
from django.utils import dateformat
from django.utils.dateparse import parse_date
from django.utils.dateparse import parse_date, parse_datetime
from django.utils.text import slugify
from django.utils.timezone import now
from requests.exceptions import RequestException
from zeep.helpers import serialize_object
from zeep.wsse.username import UsernameToken
from passerelle.apps.base_adresse.models import CityModel
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.base.signature import sign_url
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import simplify
from passerelle.utils.jsonresponse import APIError
@ -1181,6 +1184,58 @@ class ToulouseMaelis(BaseResource, HTTPResource):
link.delete()
return {'data': 'ok'}
@endpoint(
display_category='Famille',
description='Obtenir les comptes usager liés à une famille',
name='get-link-list',
perm='can_access',
parameters={
'NameID': {'description': 'Publik NameID'},
'family_id': {'description': 'Numéro de DUI'},
},
)
def get_link_list(self, request, NameID=None, family_id=None, text_template=None):
family_id = family_id or self.get_link(NameID).family_id
data = [
{'id': x['name_id'], 'context': {'link': x}}
for x in self.link_set.filter(family_id=family_id).values()
]
for item in data:
del item['context']['link']['id']
del item['context']['link']['resource_id']
# call authentic to add user data to context
if getattr(settings, 'KNOWN_SERVICES', {}).get('authentic'):
idp_service = list(settings.KNOWN_SERVICES['authentic'].values())[0]
for item in data:
api_url = sign_url(
urljoin(
idp_service['url'], 'api/users/%s/?orig=%s' % (item['id'], idp_service.get('orig'))
),
key=idp_service.get('secret'),
)
try:
response = self.requests.get(api_url)
except RequestException:
pass
else:
if response.status_code == 200:
item['context']['user'] = response.json()
for key in 'date_joined', 'last_login':
value = item['context']['user'].get(key)
if value:
item['context']['user'][key] = parse_datetime(value)
if item['context']['user'].get('password'):
del item['context']['user']['password']
for item in data:
item['text'] = (
render_template_to_string('toulouse_maelis/family_link_template.txt', item['context'])
.replace('\n', '')
.strip()
)
return {'data': data}
@endpoint(
display_category='Famille',
description='Rechercher un dossier famille',

View File

@ -0,0 +1,11 @@
{% if user %}
{{ user.first_name }} {{ user.last_name }} <{{ user.email }}>
{% else %}
{{ link.name_id }}
{% endif %}
(lié le {{ link.created|date:"d/m/Y" }}
{% if user %}
; compte créé le {{ user.date_joined|date:"d/m/Y" }},
dernière connexion le {{ user.last_login|date:"d/m/Y" }}
{% endif %}
)

View File

@ -157,6 +157,20 @@ def wcs_service(settings, requests_mock):
yield mock
@pytest.fixture
def authentic_service(settings, requests_mock):
service = {
'idp': {
'url': 'http://idp.example.org/',
'verif_orig': 'abc',
'secret': 'def',
},
}
settings.KNOWN_SERVICES = {'authentic': service}
with requests_mock as mock:
yield mock
@pytest.fixture(scope='module')
def django_db_setup(django_db_setup, django_db_blocker):
with django_db_blocker.unblock():
@ -710,6 +724,130 @@ def test_unlink(con, app):
assert resp.json['err_desc'] == 'User not linked to family'
def test_get_link_list(con, app, authentic_service, freezer):
authentic_service.add(
responses.GET,
'http://idp.example.org/api/users/83f6e19feb2043d2aafb041aea445b2c/',
json={
'uuid': '83f6e19feb2043d2aafb041aea445b2c',
'username': 'jdoe',
'first_name': 'Jhon',
'last_name': 'Doe',
'email': 'jdoe@example.org',
'date_joined': '2020-04-06T19:00:00.000000+02:00',
'last_login': '2023-07-10T11:00:00.000000+02:00',
'password': 'XXX',
},
status=200,
)
authentic_service.add(
responses.GET,
'http://idp.example.org/api/users/local/',
json={'result': 0, 'errors': {'detail': 'Pas trouvé.'}},
status=404,
)
authentic_service.add(
responses.GET,
'http://idp.example.org/api/users/456/',
body=CONNECTION_ERROR,
)
url = get_endpoint('get-link-list')
# link 3 time to the 1312 family
freezer.move_to('2023-07-10 15:00:00')
Link.objects.create(resource=con, family_id='1312', name_id='83f6e19feb2043d2aafb041aea445b2c')
freezer.move_to('2023-07-10 16:00:00')
Link.objects.create(resource=con, family_id='1312', name_id='local')
Link.objects.create(resource=con, family_id='1312', name_id='456')
assert Link.objects.count() == 3
resp = app.get(url + '?family_id=1312')
assert len(authentic_service.calls) == 3
assert resp.json['err'] == 0
assert resp.json['data'] == [
{
'id': '83f6e19feb2043d2aafb041aea445b2c',
'context': {
'link': {
'name_id': '83f6e19feb2043d2aafb041aea445b2c',
'family_id': '1312',
'created': '2023-07-10T15:00:00Z',
'updated': '2023-07-10T15:00:00Z',
},
'user': {
'uuid': '83f6e19feb2043d2aafb041aea445b2c',
'username': 'jdoe',
'first_name': 'Jhon',
'last_name': 'Doe',
'email': 'jdoe@example.org',
'date_joined': '2020-04-06T19:00:00+02:00',
'last_login': '2023-07-10T11:00:00+02:00',
},
},
'text': 'Jhon Doe <jdoe@example.org> (lié le 10/07/2023 ; compte créé le 06/04/2020, dernière connexion le 10/07/2023)',
},
{
'id': 'local',
'context': {
'link': {
'name_id': 'local',
'family_id': '1312',
'created': '2023-07-10T16:00:00Z',
'updated': '2023-07-10T16:00:00Z',
}
},
'text': 'local (lié le 10/07/2023)',
},
{
'id': '456',
'context': {
'link': {
'name_id': '456',
'family_id': '1312',
'created': '2023-07-10T16:00:00Z',
'updated': '2023-07-10T16:00:00Z',
}
},
'text': '456 (lié le 10/07/2023)',
},
]
resp = app.get(url + '?family_id=plop')
assert resp.json['err'] == 0
assert resp.json['data'] == []
resp = app.get(url)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'User not linked to family'
resp = app.get(url + '?NameID=local')
assert resp.json['err'] == 0
assert [x['text'][:50] for x in resp.json['data']] == [
'Jhon Doe <jdoe@example.org> (lié le 10/07/2023 ; c',
'local (lié le 10/07/2023)',
'456 (lié le 10/07/2023)',
]
def test_get_link_list_service_error(con, app, freezer):
url = get_endpoint('get-link-list')
freezer.move_to('2023-07-10 15:00:00')
Link.objects.create(resource=con, family_id='1312', name_id='83f6e19feb2043d2aafb041aea445b2c')
freezer.move_to('2023-07-10 16:00:00')
Link.objects.create(resource=con, family_id='1312', name_id='local')
Link.objects.create(resource=con, family_id='1312', name_id='456')
assert Link.objects.count() == 3
resp = app.get(url + '?family_id=1312')
assert resp.json['err'] == 0
assert [x['text'][:50] for x in resp.json['data']] == [
'83f6e19feb2043d2aafb041aea445b2c (lié le 10/07/202',
'local (lié le 10/07/2023)',
'456 (lié le 10/07/2023)',
]
def test_get_referential(con):
assert con.get_referential('Category') == [
{'code': 'BI', 'id': 'BI', 'libelle': 'BIPARENTALE', 'text': 'BIPARENTALE'},