solis: add RSA endpoints (#32877)

This commit is contained in:
Thomas NOËL 2019-05-09 00:13:09 +02:00
parent 1b58e3ddcd
commit d810f6799a
4 changed files with 390 additions and 4 deletions

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2019-05-09 07:18
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('solis', '0005_remove_solis_log_level'),
]
operations = [
migrations.CreateModel(
name='SolisRSALink',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name_id', models.CharField(max_length=256)),
('user_id', models.CharField(max_length=64)),
('code', models.CharField(max_length=64)),
('text', models.CharField(max_length=256)),
('resource', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='solis.Solis')),
],
),
]

View File

@ -96,6 +96,7 @@ class Solis(BaseResource):
verbose_name=_('Proxy URL'))
text_template_name = 'solis/apa_user_text.txt'
text_template_name_rsa = 'solis/rsa_user_text.txt'
category = _('Business Process Connectors')
@ -440,6 +441,130 @@ class Solis(BaseResource):
'files_failed_pdf_conversion': files_failed_pdf_conversion
}
#
# RSA endpoints
#
def rsa_token(self, user_id, code):
response = self.request('referentiels/grsa/token', data={
'indexIndividu': user_id,
'codeConfidentiel': code,
})
return response.get('token')
def rsa_get_information(self, information, user_id=None, code=None, token=None,
index='search'):
# simulate "individu" referential: get user details from civi/individu/user_id
if information == 'individu':
if not user_id:
raise APIError('missing user_id to get civi/individu')
endpoint = 'referentiels/%s/%s/%s/' % ('civi', 'individu', user_id)
content = self.request(endpoint)
if not isinstance(content, dict):
raise APIError('civi/individu response is not a dictionnary',
data={'json_content': content})
return content
if token is None:
token = self.rsa_token(user_id, code)
endpoint = 'referentiels/grsa/' + information + '/' + index + '/'
args = [('token', token)]
if index.startswith('search'): # it can be "search/next" in rdvs referential
args = [('indexIndividu', user_id)] + args
endpoint += '?' + urlencode(args)
return self.request(endpoint)
@endpoint(name='rsa-link', methods=['post'], perm='can_access',
description=_('Create link between name_id and '
'Solis RSA. Payload: name_id, user_id, code'))
def rsa_link(self, request):
try:
data = json.loads(request.body)
except ValueError:
raise APIError('payload is not a JSON dict')
if not isinstance(data, dict):
raise APIError('payload is not a JSON dict')
if not data.get('name_id'):
raise APIError('missing name_id')
if not data.get('user_id') or not data.get('code'):
raise APIError('missing user_id/code credentials')
name_id, user_id, code = data['name_id'], data['user_id'], data['code']
self.rsa_token(user_id, code) # invalid credentials raise APIError here
information = self.rsa_get_information('individu', user_id, code)
text = get_template(self.text_template_name_rsa).render(information).strip()
link, created = SolisRSALink.objects.update_or_create(resource=self, name_id=name_id,
user_id=user_id,
defaults={'code': code,
'text': text})
return {'data': {'user_id': user_id,
'text': text,
'created': created,
'updated': not created}}
@endpoint(name='rsa-unlink', methods=['post'], perm='can_access',
description=_('Delete a Solis RSA link. Payload: name_id, user_id'))
def rsa_unlink(self, request):
try:
data = json.loads(request.body)
except ValueError:
raise APIError('payload is not a JSON dict')
if not isinstance(data, dict):
raise APIError('payload is not a JSON dict')
if not data.get('name_id'):
raise APIError('missing name_id')
if not data.get('user_id'):
raise APIError('missing user_id')
name_id, user_id = data['name_id'], data['user_id']
SolisRSALink.objects.filter(resource=self, name_id=name_id, user_id=user_id).delete()
return {'data': {'user_id': user_id, 'deleted': True}}
@endpoint(name='rsa-links', perm='can_access',
description=_('List linked Solis RSA users'),
parameters={
'name_id': {
'description': _('user identifier'),
'example_value': '3eb56fc'
}
})
def rsa_links(self, request, name_id):
return {'data': [{'id': link.user_id, 'text': link.text}
for link in SolisRSALink.objects.filter(resource=self, name_id=name_id)]}
@endpoint(name='rsa-user-info', perm='can_access',
description=_('Get informations about a linked Solis RSA user'),
parameters={
'name_id': {
'description': _('user identifier'),
'example_value': '3eb56fc'
},
'user_id': {
'description': _('Solis RSA user identifier'),
'example_value': '4273',
},
'information': {
'description': _('individu, actions, allocataires, engagements, '
'evaluations, evenements, indus, menages, presences, rdvs'),
'example_value': 'allocataires',
},
'index': {
'description': _('get a specific item, if applicable'),
}
})
def rsa_user_info(self, request, name_id, user_id, information='individu',
index='search'):
try:
link = SolisRSALink.objects.get(resource=self, name_id=name_id, user_id=user_id)
except SolisRSALink.DoesNotExist:
raise APIError('unknown link')
response = self.rsa_get_information(information=information,
user_id=user_id, code=link.code,
index=index)
if information == 'individu':
text = get_template(self.text_template_name_rsa).render(response).strip()
if text != link.text:
link.text = text
link.save()
return {'data': response}
class SolisAPALink(models.Model):
resource = models.ForeignKey(Solis)
@ -447,3 +572,11 @@ class SolisAPALink(models.Model):
user_id = models.CharField(blank=False, max_length=64)
code = models.CharField(blank=False, max_length=64)
text = models.CharField(blank=False, max_length=256)
class SolisRSALink(models.Model):
resource = models.ForeignKey(Solis)
name_id = models.CharField(blank=False, max_length=256)
user_id = models.CharField(blank=False, max_length=64)
code = models.CharField(blank=False, max_length=64)
text = models.CharField(blank=False, max_length=256)

View File

@ -0,0 +1 @@
{{ etatCivil.genre }} {{ etatCivil.prenom }} {{ etatCivil.nom }}{% if etatCivil.nomNaissance and etatCivil.nomNaissance != etatCivil.nom %} ({{ etatCivil.nomNaissance }}){% endif %}

View File

@ -8,7 +8,7 @@ from StringIO import StringIO
from django.contrib.contenttypes.models import ContentType
from django.core.files import File
from passerelle.apps.solis.models import Solis, SolisAPALink, unflat
from passerelle.apps.solis.models import Solis, SolisAPALink, SolisRSALink, unflat
from passerelle.base.models import ApiUser, AccessRight
NAMEID = 'bebe'
@ -25,6 +25,37 @@ APAREQUEST = '{"demandeAsg":{"visite":{"date":"2016-07-07","heure":"1330"},"dema
DEPARTEMENTS = '{"departements":[{"code":"1","libelle":"Ain","pays":{"code":"79","libelle":"France"}},{"code":"2","libelle":"Aisne","pays":{"code":"79","libelle":"France"}},{"code":"3","libelle":"Allier","pays":{"code":"79","libelle":"France"}},{"code":"4","libelle":"Alpes de Haute Provence","pays":{"code":"79","libelle":"France"}},{"code":"5","libelle":"Hautes Alpes","pays":{"code":"79","libelle":"France"}},{"code":"6","libelle":"Alpes Maritimes","pays":{"code":"79","libelle":"France"}},{"code":"7","libelle":"Ardèche","pays":{"code":"79","libelle":"France"}},{"code":"8","libelle":"Ardennes","pays":{"code":"79","libelle":"France"}}]}'
CIVI_INDIVIDU = '{"index":4273,"referentDossier":true,"presentFoyer":true,"etatCivil":{"genre":"MME","sexe":"F","nom":"NOM","prenom":"Prenom","nomNaissance":"NEENOM","dateNaissance":"1950-12-10","lieuNaissance":"NOWHERE"}}'
RSATOKEN = '''{
"token": "e18f0967-1b8b-4ae5-8e7a-3e89076429bd",
"endDate": "2019-05-21T12:23:08.004"
}'''
RSATOKEN_403 = '''[
{
"logref": "62215971-42dc-41af-a0c2-fdbe8115be73",
"message": "rSa - Référentiels - Accès non autorisé aux référentiels rSa",
"links": []
}
]'''
RSAALLOCATAIRES = '''{
"identifiant": {
"codeAllocataire": "12345",
"dateNaissance": "10/10/1966"
},
"poleEmploi": null,
"nir": "2760150025241",
"soumisDD": "Non soumis à droit et devoir",
"oriente": true,
"_links": {
"etatCivil": {
"href": "http://solis.infodb.fr/solisapi-rsa/referentiels/civi/individu/4242/"
},
"conjoint": {
"href": "http://solis.infodb.fr/solisapi-rsa/referentiels/civi/individu/4273/"
}
}
}'''
@pytest.fixture
def solis(db):
@ -35,7 +66,7 @@ def solis(db):
def test_solis_restricted_access(app, solis):
for service in ('apa-link', 'apa-unlink'):
for service in ('apa-link', 'apa-unlink', 'rsa-link', 'rsa-unlink'):
endpoint = utils.generic_endpoint_url('solis', service, slug=solis.slug)
assert endpoint == '/solis/test/%s' % service
with mock.patch('passerelle.utils.Request.post') as requests_post:
@ -46,7 +77,7 @@ def test_solis_restricted_access(app, solis):
assert 'PermissionDenied' in resp.json['err_class']
resp = app.get(endpoint, status=405)
assert requests_get.call_count == 0
for service in ('apa-links', 'apa-user-info', 'apa-users'):
for service in ('apa-links', 'apa-user-info', 'apa-users', 'rsa-links', 'rsa-user-info'):
endpoint = utils.generic_endpoint_url('solis', service, slug=solis.slug)
assert endpoint == '/solis/test/%s' % service
with mock.patch('passerelle.utils.Request.get') as requests_get:
@ -128,7 +159,7 @@ def test_solis_ping(app, solis, ping_response):
'https': 'http://proxy:3128/'}
def test_solis_link_infos_unlink(app, solis):
def test_solis_apa_link_infos_unlink(app, solis):
# full opened access
api = ApiUser.objects.create(username='all', keytype='', key='')
obj_type = ContentType.objects.get_for_model(solis)
@ -637,3 +668,197 @@ def test_solis_apa_integration(app, solis):
requests_post.assert_called_once() # don't try to post request
assert resp.json['err'] == 1
assert resp.json['err_desc'].startswith('error status:500')
def test_solis_rsa_link_infos_unlink(app, solis):
# full opened access
api = ApiUser.objects.create(username='all', keytype='', key='')
obj_type = ContentType.objects.get_for_model(solis)
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type,
resource_pk=solis.pk)
# link
with mock.patch('passerelle.utils.Request.post') as requests_post: # get solis token
with mock.patch('passerelle.utils.Request.get') as requests_get: # get solis informations
endpoint = utils.generic_endpoint_url('solis', 'rsa-link', slug=solis.slug)
for params in (None, '', []):
resp = app.post_json(endpoint, params=params, status=200)
assert requests_post.call_count == 0
assert resp.json['err'] == 1
assert 'payload is not a JSON dict' in resp.json['err_desc']
for params in ({}, {'user_id': 'x'}, {'code': 'x'}, {'foo': 'bar'},
{'name_id': ''}, {'user_id': '', 'code': ''}):
resp = app.post_json(endpoint, params=params, status=200)
assert requests_post.call_count == 0
assert resp.json['err'] == 1
assert 'missing name_id' in resp.json['err_desc']
params['name_id'] = 'xx'
resp = app.post_json(endpoint, params=params, status=200)
assert requests_post.call_count == 0
assert resp.json['err'] == 1
assert 'missing user_id/code credentials' in resp.json['err_desc']
requests_post.return_value = utils.FakedResponse(content=RSATOKEN_403, status_code=403)
resp = app.post_json(endpoint,
params={'user_id': 'x', 'code': 'x', 'name_id': NAMEID},
status=200)
assert requests_post.call_count == 1
assert requests_get.call_count == 0
assert resp.json['err'] == 1
assert 'non autor' in resp.json['err_desc']
assert SolisRSALink.objects.count() == 0
requests_post.return_value = utils.FakedResponse(content=RSATOKEN, status_code=200)
requests_get.return_value = utils.FakedResponse(content=CIVI_INDIVIDU,
status_code=200)
resp = app.post_json(endpoint,
params={'name_id': NAMEID, 'user_id': '4273', 'code': 'foo'},
status=200)
assert requests_post.call_count == 2
assert requests_get.call_count == 1
assert resp.json['err'] == 0
assert resp.json['data']['user_id'] == '4273'
assert resp.json['data']['created']
assert not resp.json['data']['updated']
assert SolisRSALink.objects.count() == 1
assert SolisRSALink.objects.first().name_id == NAMEID
assert SolisRSALink.objects.first().user_id == '4273'
assert SolisRSALink.objects.first().code == 'foo'
assert SolisRSALink.objects.first().text == 'MME Prenom NOM (NEENOM)'
# change code
resp = app.post_json(endpoint,
params={'name_id': NAMEID, 'user_id': '4273', 'code': 'bar'},
status=200)
assert requests_post.call_count == 3
assert requests_get.call_count == 2
assert resp.json['err'] == 0
assert resp.json['data']['user_id'] == '4273'
assert not resp.json['data']['created']
assert resp.json['data']['updated']
assert SolisRSALink.objects.count() == 1
assert SolisRSALink.objects.first().name_id == NAMEID
assert SolisRSALink.objects.first().user_id == '4273'
assert SolisRSALink.objects.first().code == 'bar'
assert SolisRSALink.objects.first().text == 'MME Prenom NOM (NEENOM)'
# second link
resp = app.post_json(endpoint,
params={'name_id': NAMEID, 'user_id': '4242', 'code': 'bar'},
status=200)
assert requests_post.call_count == 4
assert requests_get.call_count == 3
assert resp.json['err'] == 0
assert resp.json['data']['user_id'] == '4242'
assert resp.json['data']['created']
assert not resp.json['data']['updated']
assert SolisRSALink.objects.count() == 2
# verify recorded names after link
assert [x['text'] for x in SolisRSALink.objects.values('text')] == \
['MME Prenom NOM (NEENOM)', 'MME Prenom NOM (NEENOM)']
endpoint = utils.generic_endpoint_url('solis', 'rsa-links', slug=solis.slug)
resp = app.get(endpoint, status=400) # missing name_id
assert resp.json['err'] == 1
endpoint += '?name_id=%s' % NAMEID
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 0
assert len(resp.json['data']) == 2
assert resp.json['data'][0]['text'] == resp.json['data'][1]['text'] == \
'MME Prenom NOM (NEENOM)'
# get base informations from a linked user
changed_name = CIVI_INDIVIDU.replace('Prenom', 'Postnom')
with mock.patch('passerelle.utils.Request.get') as requests_get:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = utils.FakedResponse(content=RSATOKEN, status_code=200)
requests_get.return_value = utils.FakedResponse(content=changed_name, status_code=200)
endpoint = utils.generic_endpoint_url('solis', 'rsa-user-info', slug=solis.slug)
endpoint += '?name_id=%s&user_id=4242' % NAMEID
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 0
assert resp.json['data']['etatCivil']['prenom'] == 'Postnom'
# user "text" updated in link:
assert SolisRSALink.objects.get(name_id=NAMEID, user_id='4242').text == \
'MME Postnom NOM (NEENOM)'
# get referential for a linked user
with mock.patch('passerelle.utils.Request.get') as requests_get:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = utils.FakedResponse(content=RSATOKEN, status_code=200)
endpoint_base = utils.generic_endpoint_url('solis', 'rsa-user-info', slug=solis.slug)
resp = app.get(endpoint_base, status=400) # missing name_id
assert resp.json['err'] == 1
endpoint = endpoint_base + '?name_id=%s&user_id=4242&information=allocataires' % NAMEID
requests_get.return_value = utils.FakedResponse(content=RSAALLOCATAIRES,
status_code=200)
resp = app.get(endpoint, status=200)
assert requests_post.call_count == 1 # get a token
assert requests_get.call_count == 1 # get informations
assert '/referentiels/grsa/allocataires/search/' in requests_get.call_args[0][0]
assert resp.json['err'] == 0
assert resp.json['data']
# solis api crash
requests_get.return_value = utils.FakedResponse(content='boum',
status_code=500)
resp = app.get(endpoint, status=200)
assert requests_post.call_count == 2 # get a token
assert requests_get.call_count == 2 # get informations
assert '/referentiels/grsa/allocataires/search/' in requests_get.call_args[0][0]
assert resp.json['err'] == 1
assert resp.json['err_desc'].startswith('error status:500')
assert resp.json['data'] == {'json_content': None, 'status_code': 500}
requests_get.return_value = utils.FakedResponse(content='{"error":"foobar"}',
status_code=500)
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 1
assert resp.json['err_desc'].startswith('error status:500')
assert resp.json['data'] == {'json_content': {'error': 'foobar'}, 'status_code': 500}
# unknown name_id or user_id
for qs in ('name_id=%s&user_id=53' % NAMEID, 'name_id=unlinked&user_id=4242'):
endpoint = endpoint_base + '?information=allocataires&' + qs
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unknown link'
assert resp.json['data'] is None
# unlink
endpoint = utils.generic_endpoint_url('solis', 'rsa-unlink', slug=solis.slug)
for bad_params in ({}, {'user_id': '4273'}, {'name_id': NAMEID}):
resp = app.post_json(endpoint, params=bad_params, status=200)
assert resp.json['err'] == 1
resp = app.post_json(endpoint, params={'user_id': 'xxx', 'name_id': 'xxx'}, status=200)
assert resp.json['err'] == 0
assert resp.json['data']['deleted']
assert resp.json['data']['user_id'] == 'xxx'
assert SolisRSALink.objects.count() == 2
resp = app.post_json(endpoint, params={'user_id': '4242', 'name_id': NAMEID}, status=200)
assert resp.json['err'] == 0
assert resp.json['data']['deleted']
assert resp.json['data']['user_id'] == '4242'
assert SolisRSALink.objects.count() == 1
# unlink again, no trouble
resp = app.post_json(endpoint, params={'user_id': '4242', 'name_id': NAMEID}, status=200)
assert resp.json['err'] == 0
assert resp.json['data']['deleted']
assert resp.json['data']['user_id'] == '4242'
assert SolisRSALink.objects.count() == 1
# can not get informations from unlinked user
endpoint = utils.generic_endpoint_url('solis', 'rsa-user-info', slug=solis.slug)
endpoint += '?name_id=%s&user_id=4242' % NAMEID
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'unknown link'
assert resp.json['data'] is None