360 lines
13 KiB
Python
360 lines
13 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2023 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from collections import OrderedDict
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import models
|
|
from django.db.models import JSONField
|
|
from django.http.response import HttpResponse
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
from passerelle.utils.jsonresponse import APIError
|
|
from passerelle.utils.models import resource_file_upload_to
|
|
from passerelle.utils.pdf import PDF
|
|
from passerelle.utils.templates import evaluate_condition, evaluate_template
|
|
|
|
PDF_FILE_OBJECT = {
|
|
'type': 'object',
|
|
'description': _('PDF file'),
|
|
'required': ['content'],
|
|
'properties': {
|
|
'filename': {
|
|
'type': 'string',
|
|
'description': _('file name'),
|
|
},
|
|
'content_type': {
|
|
'type': 'string',
|
|
'description': _('MIME content-type'),
|
|
},
|
|
'content': {
|
|
'type': 'string',
|
|
'description': _('file content, base64 encoded'),
|
|
},
|
|
},
|
|
}
|
|
|
|
ASSEMBLE_SCHEMA = {
|
|
'$schema': 'http://json-schema.org/draft-04/schema#',
|
|
'title': '',
|
|
'description': '',
|
|
'type': 'object',
|
|
'required': ['filename', 'files'],
|
|
'unflatten': True,
|
|
'properties': OrderedDict(
|
|
{
|
|
'filename': {
|
|
'description': _('output PDF filename'),
|
|
'type': 'string',
|
|
},
|
|
'files': {
|
|
'type': 'array',
|
|
'description': _('PDF files to catenate'),
|
|
'items': {
|
|
'oneOf': [
|
|
PDF_FILE_OBJECT,
|
|
{'type': 'string', 'description': _('PDF content, base64 encoded')},
|
|
{'type': 'null', 'description': _('empty file, do not consider')},
|
|
]
|
|
},
|
|
},
|
|
}
|
|
),
|
|
}
|
|
|
|
WATERMARK_SCHEMA = {
|
|
'$schema': 'http://json-schema.org/draft-04/schema#',
|
|
'title': '',
|
|
'description': '',
|
|
'type': 'object',
|
|
'required': ['filename', 'file', 'stamp'],
|
|
'unflatten': True,
|
|
'properties': OrderedDict(
|
|
{
|
|
'filename': {
|
|
'description': _('output PDF filename'),
|
|
'type': 'string',
|
|
},
|
|
'file': PDF_FILE_OBJECT,
|
|
'stamp': PDF_FILE_OBJECT,
|
|
'mode': {
|
|
'description': _('watermark mode (default is background)'),
|
|
'type': 'string',
|
|
'enum': ['background', 'multibackground', 'stamp', 'multistamp'],
|
|
},
|
|
}
|
|
),
|
|
}
|
|
|
|
|
|
def validate_pdf(fieldfile):
|
|
to_close = fieldfile.closed
|
|
try:
|
|
if fieldfile.read(5) != b'%PDF-':
|
|
raise ValidationError(
|
|
_('%(value)s is not a PDF file'),
|
|
params={'value': fieldfile},
|
|
)
|
|
finally:
|
|
if to_close:
|
|
fieldfile.close()
|
|
|
|
|
|
class Resource(BaseResource):
|
|
category = _('Misc')
|
|
|
|
fill_form_file = models.FileField(
|
|
_('Fill Form input file'),
|
|
upload_to=resource_file_upload_to,
|
|
help_text=_('PDF file'),
|
|
validators=[validate_pdf],
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
fields_mapping = JSONField(verbose_name=_('Field mapping'), null=True, blank=True)
|
|
|
|
hide_description_fields = ['fields_mapping']
|
|
|
|
class Meta:
|
|
verbose_name = _('PDF')
|
|
|
|
@classmethod
|
|
def get_manager_form_class(cls, **kwargs):
|
|
kwargs['exclude'] = tuple(kwargs.get('exclude') or ()) + ('fields_mapping',)
|
|
return super().get_manager_form_class(**kwargs)
|
|
|
|
def run_pdftk(self, args):
|
|
args = [settings.PDFTK_PATH] + args + ['output', '-']
|
|
try:
|
|
return subprocess.check_output(args, timeout=settings.PDFTK_TIMEOUT, stderr=subprocess.STDOUT)
|
|
except subprocess.TimeoutExpired as e:
|
|
raise APIError('pdftk timed out after %s seconds' % e.timeout)
|
|
except subprocess.CalledProcessError as e:
|
|
raise APIError('pdftk returned non-zero exit status %s (%r)' % (e.returncode, e.output))
|
|
|
|
@endpoint(
|
|
description=_('Returns the assembly of received PDF files'),
|
|
perm='can_access',
|
|
methods=['post'],
|
|
display_order=0,
|
|
post={
|
|
'request_body': {'schema': {'application/json': ASSEMBLE_SCHEMA}},
|
|
'input_example': {
|
|
'filename': 'output.pdf',
|
|
'files/0': {
|
|
'filename': 'example-1.pdf',
|
|
'content_type': 'application/pdf',
|
|
'content': 'JVBERi0xL...(base64 PDF)...',
|
|
},
|
|
'files/1': {
|
|
'filename': 'example-2.pdf',
|
|
'content_type': 'application/pdf',
|
|
'content': '//4lUERGL...(base64 PDF)...',
|
|
},
|
|
'files/2': '//4lUERGL...(base64 PDF)',
|
|
},
|
|
},
|
|
)
|
|
def assemble(self, request, post_data):
|
|
filename = post_data.pop('filename')
|
|
|
|
with tempfile.TemporaryDirectory(prefix='passerelle-pdftk-%s-assemble-' % self.id) as tmpdir:
|
|
infiles = []
|
|
for i, infile in enumerate(post_data['files']):
|
|
if isinstance(infile, dict) and infile.get('content'):
|
|
b64content = infile['content']
|
|
elif isinstance(infile, str) and infile:
|
|
b64content = infile
|
|
else:
|
|
continue
|
|
infile_filename = os.path.join(tmpdir, 'pdf-%d.pdf' % i)
|
|
with open(infile_filename, mode='wb') as fd:
|
|
fd.write(base64.b64decode(b64content))
|
|
infiles.append(infile_filename)
|
|
if not infiles:
|
|
raise APIError("no valid file found in 'files' property", http_status=400)
|
|
pdf_content = self.run_pdftk(args=infiles + ['cat'])
|
|
|
|
response = HttpResponse(pdf_content, content_type='application/pdf')
|
|
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
|
|
return response
|
|
|
|
@endpoint(
|
|
description=_('Applies a PDF watermark (stamp) to a PDF file'),
|
|
perm='can_access',
|
|
methods=['post'],
|
|
display_order=0,
|
|
post={
|
|
'request_body': {'schema': {'application/json': WATERMARK_SCHEMA}},
|
|
'input_example': {
|
|
'filename': 'output.pdf',
|
|
'file': {
|
|
'filename': 'example-1.pdf',
|
|
'content_type': 'application/pdf',
|
|
'content': 'JVBERi0xL...(base64 PDF)...',
|
|
},
|
|
'stamp': {
|
|
'filename': 'example-2.pdf',
|
|
'content_type': 'application/pdf',
|
|
'content': '//4lUERGL...(base64 PDF)...',
|
|
},
|
|
},
|
|
},
|
|
)
|
|
def watermark(self, request, post_data):
|
|
filename = post_data.pop('filename')
|
|
mode = post_data.get('mode') or 'background'
|
|
|
|
with tempfile.TemporaryDirectory(prefix='passerelle-pdftk-%s-watermark-' % self.id) as tmpdir:
|
|
|
|
def create_file(key):
|
|
filename = os.path.join(tmpdir, 'pdf-%s.pdf' % key)
|
|
b64content = post_data[key]['content']
|
|
with open(filename, mode='wb') as fd:
|
|
fd.write(base64.b64decode(b64content))
|
|
return filename
|
|
|
|
file = create_file('file')
|
|
stamp = create_file('stamp')
|
|
pdf_content = self.run_pdftk(args=[file, mode, stamp])
|
|
|
|
response = HttpResponse(pdf_content, content_type='application/pdf')
|
|
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
|
|
return response
|
|
|
|
FILL_FORM_SCHEMA = {
|
|
'$schema': 'http://json-schema.org/draft-04/schema#',
|
|
'title': '',
|
|
'description': _('content of the form to map on PDF fields'),
|
|
'unflatten': True,
|
|
'type': 'object',
|
|
'properties': OrderedDict(
|
|
{
|
|
'extra': {
|
|
'type': 'object',
|
|
'properties': OrderedDict(
|
|
{
|
|
'filename': {
|
|
'type': 'string',
|
|
'description': _('file name'),
|
|
},
|
|
'flatten': {
|
|
'description': _('remove PDF fields, keep only the drawed values'),
|
|
'type': 'boolean',
|
|
},
|
|
}
|
|
),
|
|
}
|
|
}
|
|
),
|
|
}
|
|
|
|
@endpoint(
|
|
name='fill-form',
|
|
description=_('Fills the input PDF form with fields applying mappings to the received payload'),
|
|
perm='can_access',
|
|
methods=['post'],
|
|
display_order=1,
|
|
parameters={
|
|
'filename': {'description': _('file name')},
|
|
'flatten': {'description': _('remove PDF fields, keep only the drawed values')},
|
|
},
|
|
post={
|
|
'request_body': {'schema': {'application/json': FILL_FORM_SCHEMA}},
|
|
'input_example': {
|
|
'extra': {
|
|
'filename': 'filled.pdf',
|
|
'flatten': True,
|
|
},
|
|
'prenom': 'Jean',
|
|
'nom': 'Dupont',
|
|
},
|
|
},
|
|
)
|
|
def fill_form(self, request, post_data, flatten=None, filename=None):
|
|
extra = post_data.pop('extra', {})
|
|
filename = filename or extra.get('filename') or post_data.get('filename') or 'form.pdf'
|
|
flatten_pdf = str(flatten or extra.get('flatten') or post_data.get('flatten')).lower() in (
|
|
'1',
|
|
'on',
|
|
'yes',
|
|
'true',
|
|
)
|
|
|
|
if not self.fill_form_file:
|
|
raise APIError('not PDF file configured')
|
|
fields_mapping = self.fields_mapping
|
|
if not fields_mapping:
|
|
raise APIError('no fields mapping configured')
|
|
|
|
with self.fill_form_file.open() as fd:
|
|
pdf = PDF(fd)
|
|
for page in pdf.pages:
|
|
for field in page.fields:
|
|
mapping_template = fields_mapping.get(f'field_{field.digest_id}')
|
|
if not mapping_template:
|
|
continue
|
|
if field.widget_type == 'checkbox':
|
|
value = evaluate_condition(mapping_template, post_data)
|
|
elif field.widget_type == 'text':
|
|
value = evaluate_template(mapping_template, post_data)
|
|
elif field.widget_type == 'radio':
|
|
value = evaluate_template(mapping_template, post_data)
|
|
elif field.widget_type in ('combo', 'list'):
|
|
value = evaluate_template(mapping_template, post_data)
|
|
self.logger.info('field=%r value=%r', field, value)
|
|
else:
|
|
raise NotImplementedError
|
|
if value is not None:
|
|
field.set(value)
|
|
response = HttpResponse(content_type='application/pdf')
|
|
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
|
|
pdf.write(response, flatten=flatten_pdf)
|
|
return response
|
|
|
|
@endpoint(
|
|
name='field-values',
|
|
description=_('Return possible values for PDF\'s combo or list form fields'),
|
|
perm='can_access',
|
|
parameters={
|
|
'digest_id': {'description': _('Identifier of the field')},
|
|
},
|
|
)
|
|
def field_values(self, request, digest_id):
|
|
if not self.fill_form_file:
|
|
raise APIError('not PDF file configured')
|
|
|
|
with self.fill_form_file.open() as fd:
|
|
pdf_content = fd.read()
|
|
|
|
pdf = PDF(pdf_content)
|
|
fields = [field for page in pdf.pages for field in page.fields if field.digest_id == digest_id]
|
|
if not fields:
|
|
raise APIError(f'unknown digest-id {digest_id!r}')
|
|
field = fields[0]
|
|
if field.widget_type not in ('list', 'combo'):
|
|
raise APIError(f'wrong field type for digest-id {digest_id!r}: {field.widget_type}')
|
|
|
|
return {'data': [{'id': value, 'text': value} for _, value in field.combo_possible_values]}
|