215 lines
6.5 KiB
Python
215 lines
6.5 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import binascii
|
|
import json
|
|
import urllib
|
|
|
|
import requests
|
|
from django.db import models
|
|
from django.utils import dateparse
|
|
from django.utils.timezone import make_aware
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from passerelle.base.models import BaseResource, HTTPResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
|
|
from . import schemas
|
|
|
|
|
|
def parse_datetime(datetime_str):
|
|
try:
|
|
obj = dateparse.parse_datetime(datetime_str)
|
|
except ValueError:
|
|
raise APIError("Invalid datetime: %s" % datetime_str)
|
|
if obj is None:
|
|
raise APIError("Invalid datetime format: %s" % datetime_str)
|
|
return obj
|
|
|
|
|
|
class Litteralis(BaseResource, HTTPResource):
|
|
base_url = models.URLField(_('API URL'))
|
|
|
|
category = _('Business Process Connectors')
|
|
|
|
class Meta:
|
|
verbose_name = _('Litteralis')
|
|
|
|
def _call(self, path, method='get', data=None, files=None):
|
|
url = urllib.parse.urljoin(self.base_url, path)
|
|
kwargs = {}
|
|
|
|
if method == 'post':
|
|
if not data:
|
|
data = {}
|
|
kwargs['json'] = data
|
|
if files:
|
|
kwargs['files'] = files
|
|
|
|
try:
|
|
resp = getattr(self.requests, method)(url, **kwargs)
|
|
except (requests.Timeout, requests.RequestException) as e:
|
|
raise APIError(str(e))
|
|
try:
|
|
resp.raise_for_status()
|
|
except requests.RequestException as main_exc:
|
|
try:
|
|
err_data = resp.json()
|
|
except (json.JSONDecodeError, requests.exceptions.RequestException):
|
|
err_data = {'response_text': resp.text}
|
|
raise APIError(str(main_exc), data=err_data)
|
|
|
|
content_type = resp.headers.get('Content-Type')
|
|
if content_type and content_type.startswith('application/json'):
|
|
try:
|
|
return resp.json()
|
|
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError) as e:
|
|
raise APIError(str(e))
|
|
|
|
return resp.text
|
|
|
|
def _upload(self, url, post_data):
|
|
try:
|
|
file_byte_content = base64.b64decode(post_data['file']['content'])
|
|
except (TypeError, binascii.Error):
|
|
raise APIError("Can't decode file")
|
|
|
|
files = {
|
|
'file': (post_data['file']['filename'], file_byte_content, post_data['file']['content_type'])
|
|
}
|
|
return {
|
|
'data': self._call(
|
|
url,
|
|
method='post',
|
|
files=files,
|
|
)
|
|
}
|
|
|
|
@endpoint(
|
|
name='demandes-recues',
|
|
description=_('Create submission'),
|
|
perm='can_access',
|
|
post={
|
|
'request_body': {
|
|
'schema': {
|
|
'application/json': schemas.DEMANDES_RECUES,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
def demandes_recues(self, request, post_data):
|
|
if post_data['demandeur'].get('raisonSociale'):
|
|
pass
|
|
else:
|
|
for field in ('nom', 'prenom'):
|
|
if not post_data['demandeur'].get(field):
|
|
raise APIError('Missing <%s> in demandeur' % field)
|
|
|
|
def clean_payload(data):
|
|
if not isinstance(data, dict):
|
|
return data
|
|
|
|
res = {}
|
|
for k, v in data.items():
|
|
if v:
|
|
if k.startswith('date'):
|
|
res[k] = make_aware(parse_datetime(v)).isoformat()
|
|
else:
|
|
cleaned_payload = clean_payload(v)
|
|
if cleaned_payload:
|
|
res[k] = cleaned_payload
|
|
return res
|
|
|
|
data = clean_payload(post_data)
|
|
coord = data.get('geom', {}).get('coordinates', [])
|
|
if coord:
|
|
new_coord = []
|
|
for c in coord:
|
|
new_coord.append(float(c))
|
|
data['geom']['coordinates'] = new_coord
|
|
|
|
return {'data': self._call('demandes-recues', method='post', data=data)}
|
|
|
|
@endpoint(
|
|
name='upload',
|
|
description=_('Upload summary file'),
|
|
perm='can_access',
|
|
post={
|
|
'request_body': {
|
|
'schema': {
|
|
'application/json': schemas.UPLOAD_ANNEXES,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
def upload(self, request, post_data):
|
|
url = 'demandes-recues/%s/upload' % post_data['id_demande']
|
|
return self._upload(url, post_data)
|
|
|
|
@endpoint(
|
|
name='annexes',
|
|
description=_('Upload appendix file'),
|
|
perm='can_access',
|
|
post={
|
|
'request_body': {
|
|
'schema': {
|
|
'application/json': schemas.UPLOAD_ANNEXES,
|
|
}
|
|
}
|
|
},
|
|
)
|
|
def annexes(self, request, post_data):
|
|
url = 'demandes-recues/%s/annexes' % post_data['id_demande']
|
|
return self._upload(url, post_data)
|
|
|
|
@endpoint(
|
|
methods=['get'],
|
|
name='demandes-recues-reponses',
|
|
description=_('Get submission status'),
|
|
perm='can_access',
|
|
parameters={
|
|
'id_demande': {
|
|
'example_value': '1',
|
|
}
|
|
},
|
|
)
|
|
def demandes_recues_reponses(self, request, id_demande, **kwargs):
|
|
return {
|
|
'data': self._call(
|
|
'demandes-recues/%s/reponses' % id_demande,
|
|
)
|
|
}
|
|
|
|
@endpoint(
|
|
methods=['get'],
|
|
name='demandes-recues-arrete',
|
|
description=_('Get submission decree'),
|
|
perm='can_access',
|
|
parameters={
|
|
'id_demande': {
|
|
'example_value': '1',
|
|
}
|
|
},
|
|
)
|
|
def demandes_recues_arrete(self, request, id_demande):
|
|
return {
|
|
'data': self._call(
|
|
'demandes-recues/%s/arrete' % id_demande,
|
|
)
|
|
}
|