650 lines
29 KiB
Python
650 lines
29 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2017 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import copy
|
|
import json
|
|
import re
|
|
import unicodedata
|
|
|
|
from django.db import models
|
|
from django.template.loader import get_template
|
|
from django.utils.encoding import force_text
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.utils.http import urlencode
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
from passerelle.utils.conversion import to_pdf
|
|
|
|
|
|
def simplify(s):
|
|
'''
|
|
Simplify a string, trying to transform it to lower ascii chars (a-z, 0-9)
|
|
and minimize spaces. Used to compare strings on ?q=something requests.
|
|
'''
|
|
if not s:
|
|
return ''
|
|
if not isinstance(s, unicode):
|
|
s = unicode(s, 'utf-8', 'ignore')
|
|
s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore')
|
|
s = re.sub(r'[^\w\s\'-]', '', s)
|
|
s = re.sub(r'[\s\'_-]+', ' ', s)
|
|
return s.strip().lower()
|
|
|
|
|
|
def unflat(flatten_dict, separator='_'):
|
|
'''
|
|
Expand a "flatten" dict:
|
|
>>> unflat({'foo': 'bar', 'two_foo': 'one', 'two_bar': 'two'})
|
|
{'foo': 'bar', 'two': {'foo': 'one', 'bar': 'two'}}
|
|
'''
|
|
dict_ = {}
|
|
for key, value in flatten_dict.items():
|
|
root = dict_
|
|
keys = [x.strip() for x in key.split(separator)]
|
|
for key in keys[:-1]: # build branch
|
|
if key not in root:
|
|
root[key] = {}
|
|
root = root[key]
|
|
root[keys[-1]] = value # add value in leaf
|
|
return dict_
|
|
|
|
|
|
def keystore_upload_to(instance, filename):
|
|
return '%s/%s/keystore/%s' % (instance.get_connector_slug(), instance.id, filename)
|
|
|
|
|
|
def trusted_cas_upload_to(instance, filename):
|
|
return '%s/%s/trusted_ca/%s' % (instance.get_connector_slug(), instance.id, filename)
|
|
|
|
|
|
class Solis(BaseResource):
|
|
service_url = models.URLField(max_length=256, blank=False,
|
|
verbose_name=_('Service URL'),
|
|
help_text=_('Solis API base URL'))
|
|
|
|
basic_auth_username = models.CharField(max_length=128, blank=True,
|
|
verbose_name=_('HTTP Basic Auth username'))
|
|
basic_auth_password = models.CharField(max_length=128, blank=True,
|
|
verbose_name=_('HTTP Basic Auth password'))
|
|
|
|
client_certificate = models.FileField(upload_to=keystore_upload_to,
|
|
null=True, blank=True,
|
|
verbose_name=_('Client certificate'),
|
|
help_text=_('Client certificate and private key (PEM format)'))
|
|
verify_cert = models.BooleanField(default=True,
|
|
verbose_name=_('Check HTTPS Certificate validity'))
|
|
trusted_certificate_authorities = models.FileField(upload_to=trusted_cas_upload_to,
|
|
null=True, blank=True,
|
|
verbose_name=_('Trusted CAs'),
|
|
help_text=_('Trusted CAs (PEM format)'))
|
|
|
|
http_proxy = models.CharField(max_length=128, blank=True,
|
|
verbose_name=_('Proxy URL'))
|
|
|
|
text_template_name = 'solis/apa_user_text.txt'
|
|
text_template_name_rsa = 'solis/rsa_user_text.txt'
|
|
|
|
category = _('Business Process Connectors')
|
|
|
|
class Meta:
|
|
verbose_name = _('Solis')
|
|
|
|
log_requests_errors = False
|
|
|
|
def request(self, endpoint, data=None, files=None):
|
|
url = self.service_url + endpoint
|
|
headers = {'Accept': 'application/json'}
|
|
if data is not None:
|
|
response = self.requests.post(url, json=data, headers=headers)
|
|
elif files is not None:
|
|
response = self.requests.post(url, files=files, headers=headers)
|
|
else:
|
|
response = self.requests.get(url, headers=headers)
|
|
|
|
if response.status_code // 100 != 2:
|
|
try:
|
|
json_content = response.json()
|
|
except ValueError:
|
|
json_content = None
|
|
raise APIError('error status:%s %r, content:%r' %
|
|
(response.status_code, response.reason, response.content[:1024]),
|
|
data={'status_code': response.status_code,
|
|
'json_content': json_content})
|
|
|
|
if response.status_code == 204: # 204 No Content
|
|
return None
|
|
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
raise APIError('invalid JSON content:%r' % response.content[:1024])
|
|
|
|
def check_status(self):
|
|
'''
|
|
Raise an exception if something goes wrong.
|
|
If OK, returns something usable by ping() endpoint.
|
|
'''
|
|
pong = self.request('main/isAlive')
|
|
try:
|
|
if not pong.get('response').startswith('Solis API est op'):
|
|
raise APIError('response is %r' % pong.get('response'), data=pong)
|
|
except (AttributeError, KeyError):
|
|
raise APIError('invalid response: %r' % pong, data=pong)
|
|
return {'data': 'pong', 'response': pong.get('response')}
|
|
|
|
@endpoint(name='ping', description=_('Check Solis API availability'))
|
|
def ping(self, request):
|
|
# FIXME deprecated this endpoint (replaced by 'up' endpoint)
|
|
return self.check_status()
|
|
|
|
@endpoint(name='referential', perm='can_access',
|
|
pattern=r'^(?P<module>[\w-]+)/(?P<name>[\w-]+)/$',
|
|
example_pattern='{module}/{name}/',
|
|
description=_('Get module/name references'),
|
|
parameters={
|
|
'module': {'description': _('Referential module: asg, civi, trans'),
|
|
'example_value': 'trans'},
|
|
'name': {'description': _('Referential name in this module'),
|
|
'example_value': 'lieu'},
|
|
'codePays': {'example_value': '79'},
|
|
'codeDepartement': {'example_value': '80'},
|
|
'codeCommune': {'example_value': '21'},
|
|
'q': {'description': _('Returns only items whose text matches'),
|
|
'example_value': 'claudel'},
|
|
'id': {'description': _('Returns only items with this id (code)')},
|
|
'ignore': {'description': _('Do not return items with this id, '
|
|
'or multiple ids separated with commas'),
|
|
'example_value': '9999'},
|
|
})
|
|
def referential(self, request, module, name, q=None, id=None, ignore=None, **kwargs):
|
|
|
|
if (module == 'trans' and name == 'lieu' and 'codeDepartement' in kwargs
|
|
and 'codeCommune' in kwargs and q):
|
|
# use optimized endpoint for trans/lieu search
|
|
endpoint = 'referentiels/trans/nomlieu/%s/%s/%s' % (kwargs.pop('codeDepartement'),
|
|
kwargs.pop('codeCommune'), q)
|
|
q = None
|
|
else:
|
|
endpoint = 'referentiels/%s/%s' % (module, name)
|
|
args = [(code, value) for code, value in kwargs.items() if code.startswith('code')]
|
|
if args:
|
|
endpoint += '?' + urlencode(args)
|
|
|
|
content = self.request(endpoint)
|
|
|
|
if (not isinstance(content, dict) or len(content) != 1 or
|
|
not isinstance(content.values()[0], list)):
|
|
raise APIError('response is not a dictionnary with only one key '
|
|
'and whose value is a list', data={'json_content': content})
|
|
|
|
items = content.values()[0]
|
|
|
|
if not all(isinstance(item, dict) and item.get('code') for item in items):
|
|
raise APIError('items must be dictionnaries with a "code" key',
|
|
data={'json_content': content})
|
|
|
|
for item in items:
|
|
item['id'] = item['code']
|
|
item['text'] = item.get('libelle', item['id'])
|
|
|
|
if ignore:
|
|
ignore_ids = [ignore_id.strip() for ignore_id in ignore.split(',')
|
|
if ignore_id.strip()]
|
|
if q:
|
|
q = simplify(q)
|
|
|
|
def condition(item):
|
|
if id and item['id'] != id:
|
|
return False
|
|
if ignore and item['id'] in ignore_ids:
|
|
return False
|
|
if q and q not in simplify(item['text']):
|
|
return False
|
|
return True
|
|
items = filter(condition, items)
|
|
|
|
return {'data': items}
|
|
|
|
@endpoint(name='referential-item', perm='can_access',
|
|
pattern=r'^(?P<module>[\w-]+)/(?P<name>[\w-]+)/(?P<index>[\w-]+)/$',
|
|
example_pattern='{module}/{name}/{index}/',
|
|
description=_('Get an item from module/name (available only on some referentials)'),
|
|
parameters={
|
|
'module': {'description': _('Referential module: asg, civi, trans'),
|
|
'example_value': 'civi'},
|
|
'name': {'description': _('Referential name in this module'),
|
|
'example_value': 'individu'},
|
|
'index': {'description': _('Item index number'),
|
|
'example_value': '4273'},
|
|
})
|
|
def referential_item(self, request, module, name, index):
|
|
endpoint = 'referentiels/%s/%s/%s/' % (module, name, index)
|
|
content = self.request(endpoint)
|
|
if not isinstance(content, dict):
|
|
raise APIError('response is not a dictionnary', data={'json_content': content})
|
|
return {'data': content}
|
|
|
|
#
|
|
# APA endpoints
|
|
#
|
|
|
|
def apa_token(self, user_id, code):
|
|
response = self.request('asg/apa/generationJeton', data={
|
|
'indexIndividu': user_id,
|
|
'codeConfidentiel': code,
|
|
})
|
|
return response.get('token')
|
|
|
|
def apa_get_information(self, information, user_id=None, code=None, token=None, index=None):
|
|
if token is None:
|
|
token = self.apa_token(user_id, code)
|
|
endpoint = 'asg/apa/' + information + '/' + token
|
|
if index:
|
|
endpoint += '/' + index
|
|
return self.request(endpoint)
|
|
|
|
@endpoint(name='apa-link', methods=['post'], perm='can_access',
|
|
description=_('Create link between name_id and '
|
|
'Solis APA. Payload: name_id, user_id, code'))
|
|
def apa_link(self, request):
|
|
try:
|
|
data = json.loads(request.body)
|
|
except ValueError:
|
|
raise APIError('payload is not a JSON dict')
|
|
if not isinstance(data, dict):
|
|
raise APIError('payload is not a JSON dict')
|
|
if 'name_id' not in data:
|
|
raise APIError('missing name_id')
|
|
if 'user_id' not in data or 'code' not in data:
|
|
raise APIError('missing user_id/code credentials')
|
|
name_id, user_id, code = data['name_id'], data['user_id'], data['code']
|
|
token = self.apa_token(user_id, code) # invalid credentials raise APIError here
|
|
information = self.apa_get_information(information='exportDonneesIndividu', token=token)
|
|
text = get_template(self.text_template_name).render(information).strip()
|
|
link, created = SolisAPALink.objects.update_or_create(resource=self, name_id=name_id,
|
|
user_id=user_id,
|
|
defaults={'code': code,
|
|
'text': text})
|
|
return {'data': {'user_id': user_id,
|
|
'created': created,
|
|
'updated': not created}}
|
|
|
|
@endpoint(name='apa-unlink', methods=['post'], perm='can_access',
|
|
description=_('Delete a Solis APA link. Payload: name_id, user_id'))
|
|
def apa_unlink(self, request):
|
|
try:
|
|
data = json.loads(request.body)
|
|
except ValueError:
|
|
raise APIError('payload is not a JSON dict')
|
|
if not isinstance(data, dict):
|
|
raise APIError('payload is not a JSON dict')
|
|
if 'name_id' not in data:
|
|
raise APIError('missing name_id')
|
|
if 'user_id' not in data:
|
|
raise APIError('missing user_id')
|
|
name_id, user_id = data['name_id'], data['user_id']
|
|
SolisAPALink.objects.filter(resource=self, name_id=name_id, user_id=user_id).delete()
|
|
return {'data': {'user_id': user_id, 'deleted': True}}
|
|
|
|
@endpoint(name='apa-links', perm='can_access',
|
|
description=_('List linked Solis APA users'),
|
|
parameters={
|
|
'name_id': {
|
|
'description': _('user identifier'),
|
|
'example_value': '3eb56fc'
|
|
}
|
|
})
|
|
def apa_links(self, request, name_id):
|
|
return {'data': [{'id': link.user_id, 'text': link.text}
|
|
for link in SolisAPALink.objects.filter(resource=self, name_id=name_id)]}
|
|
|
|
@endpoint(name='apa-user-info', perm='can_access',
|
|
description=_('Get informations about a linked Solis APA user'),
|
|
parameters={
|
|
'name_id': {
|
|
'description': _('user identifier'),
|
|
'example_value': '3eb56fc'
|
|
},
|
|
'user_id': {
|
|
'description': _('Solis APA user identifier'),
|
|
'example_value': '2345',
|
|
},
|
|
'information': {
|
|
'description': _('exportDonneesIndividu, consultationDeMesDroits, '
|
|
'suiviDemandeEnInstruction, suiviDemandeHistorique, '
|
|
'propositionPlanAide, demandeUnitaire'),
|
|
'example_value': 'consultationDeMesDroits',
|
|
},
|
|
'index': {
|
|
'description': _('mandatory if information is "demandeUnitaire"'),
|
|
'example_value': '87123'
|
|
}
|
|
})
|
|
def apa_user_info(self, request, name_id, user_id, information='exportDonneesIndividu',
|
|
index=None):
|
|
if information == 'demandeUnitaire' and index is None:
|
|
raise APIError('index mandatory if information=demandeUnitaire', http_status=400)
|
|
try:
|
|
link = SolisAPALink.objects.get(resource=self, name_id=name_id, user_id=user_id)
|
|
except SolisAPALink.DoesNotExist:
|
|
raise APIError('unknown link')
|
|
response = self.apa_get_information(information=information, user_id=user_id, code=link.code,
|
|
index=index if information == 'demandeUnitaire' else None)
|
|
if information == 'exportDonneesIndividu':
|
|
text = get_template(self.text_template_name).render(response).strip()
|
|
if text != link.text:
|
|
link.text = text
|
|
link.save()
|
|
elif index is not None and information != 'demandeUnitaire':
|
|
# search index in response
|
|
for resp in response.get('demandeAsg') or []:
|
|
if str(resp.get('demande', {}).get('indexDemande')) == index:
|
|
return {'data': {'demandeAsg': resp}}
|
|
raise APIError('cannot find indexDemande=%s in demandeAsg list' % index)
|
|
return {'data': response}
|
|
|
|
@endpoint(name='apa-users', perm='can_access',
|
|
description=_('Get exportDonneesIndividu datas about all linked Solis APA users'),
|
|
parameters={
|
|
'name_id': {
|
|
'description': _('user identifier'),
|
|
'example_value': '3eb56fc'
|
|
}
|
|
})
|
|
def apa_users(self, request, name_id):
|
|
users = []
|
|
template = get_template(self.text_template_name)
|
|
for link in SolisAPALink.objects.filter(resource=self, name_id=name_id):
|
|
try:
|
|
information = self.apa_get_information(information='exportDonneesIndividu',
|
|
user_id=link.user_id, code=link.code)
|
|
except APIError:
|
|
# don't list unknown/unlinked users
|
|
continue
|
|
text = template.render(information).strip()
|
|
if text != link.text:
|
|
link.text = text
|
|
link.save()
|
|
users.append({
|
|
'id': link.user_id,
|
|
'text': text,
|
|
'information': information})
|
|
return {'data': users}
|
|
|
|
@endpoint(name='apa-integration', perm='can_access', methods=['post'],
|
|
description=_('Send data to "integrationDemandeApa"'))
|
|
def apa_integration(self, request):
|
|
try:
|
|
payload = json.loads(request.body)
|
|
except ValueError:
|
|
raise APIError('payload is not a JSON object', http_status=400)
|
|
if not isinstance(payload, dict):
|
|
raise APIError('payload is not a JSON dict', http_status=400)
|
|
|
|
# handle specific file: and del: keys
|
|
files = []
|
|
delete_keys = []
|
|
files_failed_pdf_conversion = []
|
|
for key, value in payload.items():
|
|
# extract files from payload, to send them before the request
|
|
if key.startswith('file:'):
|
|
delete_keys.append(key)
|
|
if value is None:
|
|
continue
|
|
filename = key[5:]
|
|
if isinstance(value, dict) and 'content' in value:
|
|
content = base64.b64decode(value['content'])
|
|
try:
|
|
content = to_pdf(content)
|
|
except ValueError:
|
|
files_failed_pdf_conversion.append(filename)
|
|
else:
|
|
files.append(('files', (filename, content, 'application/pdf')))
|
|
else:
|
|
files_failed_pdf_conversion.append(filename)
|
|
# Solis doesn't accept somes values or dict-of-values if there are empty
|
|
# (for example is there is not "conjoint"): remove all these keys if a
|
|
# specific "del:key_prefix":true entry exists (for example "del:conjoint")
|
|
if key.startswith('del:'):
|
|
if value is True:
|
|
for k in payload.keys():
|
|
if k.startswith(key[4:]):
|
|
delete_keys.append(k)
|
|
delete_keys.append(key)
|
|
|
|
for key in delete_keys:
|
|
if key in payload:
|
|
del payload[key]
|
|
|
|
# prepare request data
|
|
integration_data = {'demandeApa': unflat(payload)}
|
|
|
|
# send files before request data
|
|
sendfiles = None
|
|
if files:
|
|
sendfiles = self.request('asg/apa/piecesjointes/multiple', files=files)
|
|
if not isinstance(sendfiles, dict):
|
|
raise APIError('fail to send files, response is not a dict', data=sendfiles)
|
|
if sendfiles.get('rejets') or sendfiles.get('nbFichiersAcceptes') != len(files):
|
|
raise APIError('fail to send all files', data=sendfiles)
|
|
if not sendfiles.get('id'):
|
|
raise APIError('fail to get uidPiecesJointes on sending files', data=sendfiles)
|
|
# ok, add reference id in request data
|
|
integration_data['uidPiecesJointes'] = sendfiles.get('id')
|
|
|
|
response = self.request('asg/apa/integrationDemandeApa', data=integration_data)
|
|
return {
|
|
'data': response,
|
|
'files_sent': sendfiles,
|
|
'files_failed_pdf_conversion': files_failed_pdf_conversion
|
|
}
|
|
|
|
#
|
|
# RSA endpoints
|
|
#
|
|
|
|
def rsa_token(self, user_id, code):
|
|
response = self.request('referentiels/grsa/token', data={
|
|
'indexIndividu': user_id,
|
|
'codeConfidentiel': code,
|
|
})
|
|
return response.get('token')
|
|
|
|
def rsa_fill_with_link_content(self, link):
|
|
'''fill one or several link (_links entry in grsa referential object)'''
|
|
if isinstance(link, list):
|
|
for sublink in link:
|
|
self.rsa_fill_with_link_content(sublink)
|
|
return
|
|
if (not isinstance(link, dict) or not link.get('href')
|
|
or not link['href'].startswith(self.service_url)):
|
|
return
|
|
endpoint = link['href'][len(self.service_url):]
|
|
try:
|
|
value = self.request(endpoint)
|
|
except APIError as e: # do not raise on linked informations
|
|
value = {
|
|
'err': 1,
|
|
'err_class': e.__class__.__name__,
|
|
'err_desc': force_text(e)
|
|
}
|
|
link['content'] = value
|
|
|
|
def rsa_get_information(self, information, user_id=None, code=None, token=None,
|
|
index='search', links=None):
|
|
# simulate "individu" referential: get user details from civi/individu/user_id
|
|
if information == 'individu':
|
|
if not user_id:
|
|
raise APIError('missing user_id to get civi/individu')
|
|
endpoint = 'referentiels/%s/%s/%s/' % ('civi', 'individu', user_id)
|
|
content = self.request(endpoint)
|
|
if not isinstance(content, dict):
|
|
raise APIError('civi/individu response is not a dictionnary',
|
|
data={'json_content': content})
|
|
return content
|
|
if token is None:
|
|
token = self.rsa_token(user_id, code)
|
|
endpoint = 'referentiels/grsa/' + information + '/' + index + '/'
|
|
args = [('token', token)]
|
|
if index.startswith('search'): # it can be "search/next" in rdvs referential
|
|
args = [('indexIndividu', user_id)] + args
|
|
endpoint += '?' + urlencode(args)
|
|
|
|
information = self.request(endpoint)
|
|
|
|
if isinstance(information, dict) and '_links' in information:
|
|
# return linked objects in non-underscored key, usable in Django template
|
|
information['rsa_links'] = copy.deepcopy(information['_links'])
|
|
|
|
if links is not None:
|
|
if not links.strip(): # links is empty: get all
|
|
links = information['rsa_links'].keys()
|
|
else:
|
|
links = [x.strip() for x in links.split(',') if x.strip()]
|
|
links = [x for x in links if x in information['rsa_links']]
|
|
for link in links:
|
|
self.rsa_fill_with_link_content(information['rsa_links'][link])
|
|
|
|
return information
|
|
|
|
@endpoint(name='rsa-link', methods=['post'], perm='can_access',
|
|
description=_('Create link between name_id and '
|
|
'Solis RSA. Payload: name_id, user_id, code'))
|
|
def rsa_link(self, request):
|
|
try:
|
|
data = json.loads(request.body)
|
|
except ValueError:
|
|
raise APIError('payload is not a JSON dict')
|
|
if not isinstance(data, dict):
|
|
raise APIError('payload is not a JSON dict')
|
|
if not data.get('name_id'):
|
|
raise APIError('missing name_id')
|
|
if not data.get('user_id') or not data.get('code'):
|
|
raise APIError('missing user_id/code credentials')
|
|
name_id, user_id, code = data['name_id'], data['user_id'], data['code']
|
|
self.rsa_token(user_id, code) # invalid credentials raise APIError here
|
|
information = self.rsa_get_information('individu', user_id, code)
|
|
text = get_template(self.text_template_name_rsa).render(information).strip()
|
|
link, created = SolisRSALink.objects.update_or_create(resource=self, name_id=name_id,
|
|
user_id=user_id,
|
|
defaults={'code': code,
|
|
'text': text})
|
|
return {'data': {'user_id': user_id,
|
|
'text': text,
|
|
'created': created,
|
|
'updated': not created}}
|
|
|
|
@endpoint(name='rsa-unlink', methods=['post'], perm='can_access',
|
|
description=_('Delete a Solis RSA link. Payload: name_id, user_id'))
|
|
def rsa_unlink(self, request):
|
|
try:
|
|
data = json.loads(request.body)
|
|
except ValueError:
|
|
raise APIError('payload is not a JSON dict')
|
|
if not isinstance(data, dict):
|
|
raise APIError('payload is not a JSON dict')
|
|
if not data.get('name_id'):
|
|
raise APIError('missing name_id')
|
|
if not data.get('user_id'):
|
|
raise APIError('missing user_id')
|
|
name_id, user_id = data['name_id'], data['user_id']
|
|
SolisRSALink.objects.filter(resource=self, name_id=name_id, user_id=user_id).delete()
|
|
return {'data': {'user_id': user_id, 'deleted': True}}
|
|
|
|
@endpoint(name='rsa-links', perm='can_access',
|
|
description=_('List linked Solis RSA users'),
|
|
parameters={
|
|
'name_id': {
|
|
'description': _('user identifier'),
|
|
'example_value': '3eb56fc'
|
|
}
|
|
})
|
|
def rsa_links(self, request, name_id):
|
|
return {'data': [{'id': link.user_id, 'text': link.text}
|
|
for link in SolisRSALink.objects.filter(resource=self, name_id=name_id)]}
|
|
|
|
@endpoint(name='rsa-user-info', perm='can_access',
|
|
description=_('Get informations about a linked Solis RSA user'),
|
|
parameters={
|
|
'name_id': {
|
|
'description': _('user identifier'),
|
|
'example_value': '3eb56fc'
|
|
},
|
|
'user_id': {
|
|
'description': _('Solis RSA user identifier'),
|
|
'example_value': '4273',
|
|
},
|
|
'information': {
|
|
'description': _('individu, actions, allocataires, engagements, '
|
|
'evaluations, evenements, indus, menages, presences, rdvs'),
|
|
'example_value': 'allocataires',
|
|
},
|
|
'index': {
|
|
'description': _('get a specific item, if applicable'),
|
|
},
|
|
'links': {
|
|
'description': _('get linked informations (comma separated list, empty for all)'),
|
|
'example_value': 'etatCivil,conjoint',
|
|
},
|
|
'filters': {
|
|
'description': _('filter response (list), ex: idStructure=399 or '
|
|
'idStructure!=399,prescriptionPlacement=Placement'),
|
|
},
|
|
})
|
|
def rsa_user_info(self, request, name_id, user_id, information='individu',
|
|
index='search', links=None, filters=None):
|
|
try:
|
|
link = SolisRSALink.objects.get(resource=self, name_id=name_id, user_id=user_id)
|
|
except SolisRSALink.DoesNotExist:
|
|
raise APIError('unknown link')
|
|
response = self.rsa_get_information(information=information,
|
|
user_id=user_id, code=link.code,
|
|
index=index, links=links)
|
|
if information == 'individu':
|
|
text = get_template(self.text_template_name_rsa).render(response).strip()
|
|
if text != link.text:
|
|
link.text = text
|
|
link.save()
|
|
if filters and isinstance(response, list):
|
|
for filter_ in filters.split(','):
|
|
key, value = filter_.split('=')
|
|
if key.endswith('!'):
|
|
response = [item for item in response
|
|
if str(item.get(key[:-1])) != value]
|
|
else:
|
|
response = [item for item in response
|
|
if str(item.get(key)) == value]
|
|
return {'data': response}
|
|
|
|
|
|
class SolisAPALink(models.Model):
|
|
resource = models.ForeignKey(Solis)
|
|
name_id = models.CharField(blank=False, max_length=256)
|
|
user_id = models.CharField(blank=False, max_length=64)
|
|
code = models.CharField(blank=False, max_length=64)
|
|
text = models.CharField(blank=False, max_length=256)
|
|
|
|
|
|
class SolisRSALink(models.Model):
|
|
resource = models.ForeignKey(Solis)
|
|
name_id = models.CharField(blank=False, max_length=256)
|
|
user_id = models.CharField(blank=False, max_length=64)
|
|
code = models.CharField(blank=False, max_length=64)
|
|
text = models.CharField(blank=False, max_length=256)
|