esup_signature: start connector (#76994)
gitea/passerelle/pipeline/head This commit looks good Details

This commit is contained in:
Emmanuel Cazenave 2023-05-03 10:59:49 +02:00
parent 84cd51957e
commit fd09fb2fd7
6 changed files with 356 additions and 0 deletions

View File

@ -0,0 +1,69 @@
# Generated by Django 3.2.18 on 2023-05-03 13:41
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('base', '0030_resourcelog_base_resour_appname_298cbc_idx'),
]
operations = [
migrations.CreateModel(
name='EsupSignature',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('title', models.CharField(max_length=50, verbose_name='Title')),
('slug', models.SlugField(unique=True, verbose_name='Identifier')),
('description', models.TextField(verbose_name='Description')),
(
'basic_auth_username',
models.CharField(
blank=True, max_length=128, verbose_name='Basic authentication username'
),
),
(
'basic_auth_password',
models.CharField(
blank=True, max_length=128, verbose_name='Basic authentication password'
),
),
(
'client_certificate',
models.FileField(
blank=True, null=True, upload_to='', verbose_name='TLS client certificate'
),
),
(
'trusted_certificate_authorities',
models.FileField(blank=True, null=True, upload_to='', verbose_name='TLS trusted CAs'),
),
(
'verify_cert',
models.BooleanField(blank=True, default=True, verbose_name='TLS verify certificates'),
),
(
'http_proxy',
models.CharField(blank=True, max_length=128, verbose_name='HTTP and HTTPS proxy'),
),
('base_url', models.URLField(verbose_name='API URL')),
(
'users',
models.ManyToManyField(
blank=True,
related_name='_esup_signature_esupsignature_users_+',
related_query_name='+',
to='base.ApiUser',
),
),
],
options={
'verbose_name': 'Esup Signature',
},
),
]

View File

@ -0,0 +1,208 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import binascii
import collections
import io
import json
import urllib
import requests
from django.db import models
from django.http.response import HttpResponse
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.jsonresponse import APIError
SIGN_REQUEST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': '',
'description': '',
'type': 'object',
'required': ['file', 'recipients_emails', 'eppn'],
'unflatten': True,
'properties': collections.OrderedDict(
{
'file': {
'type': 'object',
'description': 'File object',
'required': ['filename', 'content_type', 'content'],
'properties': {
'filename': {
'type': 'string',
},
'content_type': {
'type': 'string',
'description': 'MIME content-type',
},
'content': {
'type': 'string',
'description': 'Content, base64 encoded',
},
},
},
'recipients_emails': {
'type': 'array',
'description': 'Recipients emails',
'items': {'type': 'string'},
},
'eppn': {'type': 'string', 'description': 'EPPN of the sign request owner'},
'title': {'type': 'string', 'description': 'Title'},
}
),
}
class EsupSignature(BaseResource, HTTPResource):
base_url = models.URLField(_('API URL'))
category = _('Business Process Connectors')
class Meta:
verbose_name = _('Esup Signature')
def _call(self, path, method='get', data=None, files=None, expect_json=True):
url = urllib.parse.urljoin(self.base_url, path)
kwargs = {}
if method == 'post':
kwargs['data'] = data
kwargs['files'] = files
try:
resp = self.requests.request(url=url, method=method, **kwargs)
except (requests.Timeout, requests.RequestException) as e:
raise APIError(str(e))
try:
resp.raise_for_status()
except requests.RequestException as main_exc:
try:
err_data = resp.json()
except (json.JSONDecodeError, requests.exceptions.RequestException):
err_data = {'response_text': resp.text}
raise APIError(str(main_exc), data=err_data)
if expect_json:
try:
return resp.json()
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError) as e:
raise APIError(str(e))
return resp
@endpoint(
name='new',
description=_('Create a sign request'),
perm='can_access',
post={
'request_body': {
'schema': {
'application/json': SIGN_REQUEST_SCHEMA,
}
},
'input_example': {
'file': {
'filename': 'example-1.pdf',
'content_type': 'application/pdf',
'content': 'JVBERi0xL...(base64 PDF)...',
},
'recipients_emails/0': 'xx@foo.com',
'recipients_emails/1': 'yy@foo.com',
'recipients_emails/2': 'zz@foo.com',
'eppn': 'aa@foo.com',
'title': 'a title',
},
},
)
def new(self, request, post_data):
try:
file_bytes = io.BytesIO(base64.b64decode(post_data['file']['content']))
except (TypeError, binascii.Error):
raise APIError("Can't decode file")
files = {
'multipartFiles': (
post_data['file']['filename'],
file_bytes,
post_data['file']['content_type'],
)
}
recipients_emails = [email for email in post_data['recipients_emails'] if email]
data = {
'signType': 'pdfImageStamp',
'recipientsEmails': recipients_emails,
'eppn': post_data['eppn'],
'title': post_data.get('title', ''),
'pending': True,
}
return {'data': self._call('ws/signrequests/new', method='post', data=data, files=files)}
@endpoint(
methods=['get'],
name='status',
description=_('Get sign request status'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def status(self, request, signrequests_id, **kwargs):
return {
'data': self._call(
'ws/signrequests/%s' % signrequests_id,
)
}
@endpoint(
methods=['get'],
name='audit-trail',
description=_('Get sign request audit trail'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def audit_trail(self, request, signrequests_id, **kwargs):
return {
'data': self._call(
'ws/signrequests/audit-trail/%s' % signrequests_id,
)
}
@endpoint(
methods=['get'],
name='get-last-file',
description=_('Get the last signed file'),
perm='can_access',
parameters={
'signrequests_id': {
'example_value': '1',
}
},
)
def get_last_file(self, request, signrequests_id, **kwargs):
resp = self._call('ws/signrequests/get-last-file/%s' % signrequests_id, expect_json=False)
response = HttpResponse(resp.content, content_type=resp.headers['Content-Type'])
response['Content-Disposition'] = resp.headers['Content-Disposition']
return response

View File

@ -149,6 +149,7 @@ INSTALLED_APPS = (
'passerelle.apps.csvdatasource',
'passerelle.apps.esabora',
'passerelle.apps.esirius',
'passerelle.apps.esup_signature',
'passerelle.apps.family',
'passerelle.apps.feeds',
'passerelle.apps.filr_rest',

View File

@ -0,0 +1,78 @@
import base64
import pytest
import responses
from django.contrib.contenttypes.models import ContentType
from passerelle.apps.esup_signature.models import EsupSignature
from passerelle.base.models import AccessRight, ApiUser
@pytest.fixture()
def connector(db):
api = ApiUser.objects.create(username='all', keytype='', key='')
connector = EsupSignature.objects.create(
base_url='https://esup-signature.invalid/',
slug='esup-signature',
)
obj_type = ContentType.objects.get_for_model(connector)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=connector.pk
)
return connector
def test_new(app, connector):
params = {
'file': {
'filename': 'bla',
'content': base64.b64encode(b'who what').decode(),
'content_type': 'text/plain',
},
'recipients_emails/0': 'foo@invalid',
'recipients_emails/1': 'bar@invalid',
'eppn': 'baz@invalid',
'title': 'a title',
}
with responses.RequestsMock() as rsps:
rsps.post('https://esup-signature.invalid/ws/signrequests/new', status=200, json=9)
resp = app.post_json('/esup-signature/esup-signature/new', params=params)
assert len(rsps.calls) == 1
assert rsps.calls[0].request.headers['Content-Type'].startswith('multipart/form-data')
json_resp = resp.json
assert json_resp['err'] == 0
assert json_resp['data'] == 9
def test_status(app, connector):
with responses.RequestsMock() as rsps:
rsps.get('https://esup-signature.invalid/ws/signrequests/1', status=200, json={'status': 'completed'})
resp = app.get('/esup-signature/esup-signature/status?signrequests_id=1')
assert resp.json['err'] == 0
assert resp.json['data']['status'] == 'completed'
def test_audit_trail(app, connector):
with responses.RequestsMock() as rsps:
rsps.get(
'https://esup-signature.invalid/ws/signrequests/audit-trail/1',
status=200,
json={'id': 1, 'documentCheckSum': 'abc'},
)
resp = app.get('/esup-signature/esup-signature/audit-trail?signrequests_id=1')
assert resp.json['err'] == 0
assert resp.json['data']['id'] == 1
assert resp.json['data']['documentCheckSum'] == 'abc'
def test_get_last_file(app, connector):
with responses.RequestsMock() as rsps:
headers = {'Content-Type': 'text/plain', 'Content-Disposition': 'attachment; filename=foo.txt'}
rsps.get(
'https://esup-signature.invalid/ws/signrequests/get-last-file/1',
status=200,
body='who hwat',
headers=headers,
)
resp = app.get('/esup-signature/esup-signature/get-last-file?signrequests_id=1')
assert resp.text == 'who hwat'