passerelle/passerelle/apps/esup_signature/models.py

334 lines
11 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import binascii
import collections
import io
import json
import urllib
import requests
from django.db import models
from django.http.response import HttpResponse
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
SIGN_REQUEST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': '',
'description': '',
'type': 'object',
'required': ['file', 'recipients_emails', 'eppn'],
'unflatten': True,
'properties': collections.OrderedDict(
{
'file': {
'type': 'object',
'description': 'File object',
'required': ['filename', 'content_type', 'content'],
'properties': {
'filename': {
'type': 'string',
},
'content_type': {
'type': 'string',
'description': 'MIME content-type',
},
'content': {
'type': 'string',
'description': 'Content, base64 encoded',
},
},
},
'recipients_emails': {
'type': 'array',
'description': 'Recipients emails',
'items': {'type': 'string'},
},
'eppn': {'type': 'string', 'description': 'EPPN of the sign request owner'},
'title': {'type': 'string', 'description': 'Title'},
}
),
}
SIGN_REQUEST_WITH_WORKFLOW_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': '',
'description': '',
'type': 'object',
'required': ['file', 'eppn', 'workflow_id'],
'unflatten': True,
'properties': collections.OrderedDict(
{
'file': {
'type': 'object',
'description': 'File object',
'required': ['filename', 'content_type', 'content'],
'properties': {
'filename': {
'type': 'string',
},
'content_type': {
'type': 'string',
'description': 'MIME content-type',
},
'content': {
'type': 'string',
'description': 'Content, base64 encoded',
},
},
},
'recipients_emails': {
'type': 'array',
'description': 'Recipients emails at each step',
'items': {'type': 'string'},
},
'target_emails': {
'type': 'array',
'description': 'Target emails',
'items': {'type': 'string'},
},
'all_sign_to_completes': {
'type': 'array',
'description': 'Steps numbers were every recipient has to sign',
'items': {'type': 'string'},
},
'eppn': {'type': 'string', 'description': 'EPPN of the sign request owner'},
'workflow_id': {'type': 'string', 'description': 'Identifier of the workflow'},
'title': {'type': 'string', 'description': 'Title'},
'target_urls': {
'type': 'array',
'description': 'End locations',
'items': {'type': 'string'},
},
'signrequest_params_jsonstring': {
'type': 'string',
'description': 'Signature parameters',
},
}
),
}
def clean_list(some_list):
return [elem for elem in some_list if elem]
class EsupSignature(BaseResource, HTTPResource):
base_url = models.URLField(_('API URL'))
category = _('Business Process Connectors')
class Meta:
verbose_name = _('Esup Signature')
def _call(self, path, method='get', params=None, files=None, expect_json=True):
url = urllib.parse.urljoin(self.base_url, path)
kwargs = {}
if method == 'post':
kwargs['params'] = params
kwargs['files'] = files
try:
resp = self.requests.request(url=url, method=method, **kwargs)
except (requests.Timeout, requests.RequestException) as e:
raise APIError(str(e))
try:
resp.raise_for_status()
except requests.RequestException as main_exc:
try:
err_data = resp.json()
except (json.JSONDecodeError, requests.exceptions.RequestException):
err_data = {'response_text': resp.text}
raise APIError(str(main_exc), data=err_data)
if expect_json:
try:
return resp.json()
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError) as e:
raise APIError(str(e))
return resp
@endpoint(
name='new',
description=_('Create a sign request'),
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': SIGN_REQUEST_SCHEMA,
}
},
'input_example': {
'file': {
'filename': 'example-1.pdf',
'content_type': 'application/pdf',
'content': 'JVBERi0xL...(base64 PDF)...',
},
'recipients_emails/0': 'xx@foo.com',
'recipients_emails/1': 'yy@foo.com',
'recipients_emails/2': 'zz@foo.com',
'eppn': 'aa@foo.com',
'title': 'a title',
},
},
)
def new(self, request, post_data):
try:
file_bytes = io.BytesIO(base64.b64decode(post_data['file']['content']))
except (TypeError, binascii.Error):
raise APIError("Can't decode file")
files = {
'multipartFiles': (
post_data['file']['filename'],
file_bytes,
post_data['file']['content_type'],
)
}
params = {
'signType': 'pdfImageStamp',
'recipientsEmails': clean_list(post_data['recipients_emails']),
'eppn': post_data['eppn'],
'title': post_data.get('title', ''),
'pending': True,
}
return {'data': self._call('ws/signrequests/new', method='post', params=params, files=files)}
@endpoint(
name='new-with-workflow',
description=_('Create a sign request'),
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': SIGN_REQUEST_WITH_WORKFLOW_SCHEMA,
}
},
'input_example': {
'file': {
'filename': 'example-1.pdf',
'content_type': 'application/pdf',
'content': 'JVBERi0xL...(base64 PDF)...',
},
'workflow_id': '99',
'eppn': 'aa@foo.com',
'title': 'a title',
'recipients_emails/0': '0*xx@foo.com',
'recipients_emails/1': '0*yy@foo.com',
'recipients_emails/2': '1*zz@foo.com',
'all_sign_to_completes/0': '12',
'all_sign_to_completes/1': '13',
'target_emails/0': 'xx@foo.com',
'target_emails/1': 'yy@foo.com',
'target_emails/2': 'zz@foo.com',
'signrequest_params_jsonstring': 'List [ OrderedMap { "xPos": 100, "yPos": 100, "signPageNumber": 1 }, '
'OrderedMap { "xPos": 200, "yPos": 200, "signPageNumber": 1 } ]',
'target_urls/0': 'smb://foo.bar/location-1/',
'target_urls/1': 'smb://foo.bar/location-2/',
},
},
)
def new_with_workflow(self, request, post_data):
try:
file_bytes = io.BytesIO(base64.b64decode(post_data['file']['content']))
except (TypeError, binascii.Error):
raise APIError("Can't decode file")
files = {
'multipartFiles': (
post_data['file']['filename'],
file_bytes,
post_data['file']['content_type'],
)
}
params = {
'createByEppn': post_data['eppn'],
'title': post_data.get('title', ''),
'recipientsEmails': clean_list(post_data.get('recipients_emails', [])),
'allSignToCompletes': clean_list(post_data.get('all_sign_to_completes', [])),
'targetEmails': clean_list(post_data.get('target_emails', [])),
'signRequestParamsJsonString': post_data.get('signrequest_params_jsonstring', ''),
'targetUrls': clean_list(post_data.get('target_urls', [])),
}
return {
'data': self._call(
'/ws/workflows/%s/new' % post_data['workflow_id'], method='post', params=params, files=files
)
}
@endpoint(
methods=['get'],
name='status',
description=_('Get sign request status'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def status(self, request, signrequests_id, **kwargs):
return {
'data': self._call(
'ws/signrequests/%s' % signrequests_id,
)
}
@endpoint(
methods=['get'],
name='audit-trail',
description=_('Get sign request audit trail'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def audit_trail(self, request, signrequests_id, **kwargs):
return {
'data': self._call(
'ws/signrequests/audit-trail/%s' % signrequests_id,
)
}
@endpoint(
methods=['get'],
name='get-last-file',
description=_('Get the last signed file'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def get_last_file(self, request, signrequests_id, **kwargs):
resp = self._call('ws/signrequests/get-last-file/%s' % signrequests_id, expect_json=False)
response = HttpResponse(resp.content, content_type=resp.headers['Content-Type'])
response['Content-Disposition'] = resp.headers['Content-Disposition']
return response