diff --git a/debian/control b/debian/control index b09dfa7a..768e48d8 100644 --- a/debian/control +++ b/debian/control @@ -13,7 +13,9 @@ Homepage: https://dev.entrouvert.org/projects/passerelle Package: python3-passerelle Architecture: all -Depends: pdftk, +Depends: ghostscript, + pdftk, + poppler-utils, python3-cmislib, python3-dateutil, python3-distutils, diff --git a/debian/debian_config.py b/debian/debian_config.py index 1c377432..12206985 100644 --- a/debian/debian_config.py +++ b/debian/debian_config.py @@ -36,6 +36,11 @@ LOGGING['loggers']['paramiko.transport'] = { 'propagate': True, } +# silence pdfrw +LOGGING['loggers']['pdfrw'] = { + 'propagate': False, +} + exec(open('/etc/%s/settings.py' % PROJECT_NAME).read()) # run additional settings snippets diff --git a/passerelle/utils/pdf.py b/passerelle/utils/pdf.py new file mode 100644 index 00000000..78356a22 --- /dev/null +++ b/passerelle/utils/pdf.py @@ -0,0 +1,247 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2023 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +import dataclasses +import functools +import hashlib +import io +import subprocess +import tempfile +import typing + +import pdfrw + + +class Rect(typing.NamedTuple): + x1: float + y1: float + x2: float + y2: float + + +@dataclasses.dataclass(frozen=True) +class Widget: + page: 'Page' = dataclasses.field(compare=False, repr=False) + name: str + widget_type: str = dataclasses.field(compare=False) + rect: Rect = dataclasses.field(compare=False) + on_value: str = dataclasses.field(compare=False, default=pdfrw.PdfName.On) + annotation: pdfrw.PdfDict = dataclasses.field(default=None, repr=False) + + @property + def digest_id(self): + if not self.name: + return '' + name_bytes = self.name.encode() + digest_algo = hashlib.md5(name_bytes) + digest = digest_algo.digest() + b32_encoded = base64.b32encode(digest).decode() + return b32_encoded.strip('=').upper() + + @property + def value(self): + if self.widget_type == 'text': + if self.annotation[pdfrw.PdfName.V]: + return self.annotation[pdfrw.PdfName.V].decode() + return '' + elif self.widget_type == 'checkbox': + return self.annotation[pdfrw.PdfName.V] == self.on_value + + def set(self, value): + # allow rendering of values in Acrobat Reader + self.page.pdf._pdf_reader.Root.AcroForm.update(pdfrw.PdfDict(NeedAppearances=pdfrw.PdfObject('true'))) + if self.widget_type == 'text': + str_value = str(value) + self.annotation.update(pdfrw.PdfDict(V=str_value, AS=str_value)) + elif self.widget_type == 'checkbox': + bool_value = self.on_value if value else pdfrw.PdfName.Off + self.annotation.update(pdfrw.PdfDict(V=bool_value, AS=bool_value)) + + +@dataclasses.dataclass +class Page: + pdf: object + page_number: object + + THUMBNAIL_DEFAULT_WIDTH = 800 + + @property + def page(self): + return self.pdf._pdf_reader.pages[self.page_number] + + @property + def fields(self): + fields = [] + for annotation in self.page[pdfrw.PdfName.Annots] or (): + if annotation[pdfrw.PdfName.Subtype] != pdfrw.PdfName.Widget: + continue + if not annotation[pdfrw.PdfName.T]: + continue + name = annotation[pdfrw.PdfName.T].decode() + parent = annotation[pdfrw.PdfName.Parent] + while parent and parent[pdfrw.PdfName.T]: + name = f'{parent[pdfrw.PdfName.T].decode()}.{name}' + parent = parent[pdfrw.PdfName.Parent] + if not annotation[pdfrw.PdfName.FT]: + continue + pdf_field_type = annotation[pdfrw.PdfName.FT] + pdf_field_flags = annotation[pdfrw.PdfName.Ff] or 0 + RADIO_FLAG = 2**16 + PUSH_BUTTON_FLAG = 2**17 + if ( + pdf_field_type == pdfrw.PdfName.Btn + and not (pdf_field_flags & RADIO_FLAG) + and not (pdf_field_flags & PUSH_BUTTON_FLAG) + ): + widget_type = 'checkbox' + elif pdf_field_type == pdfrw.PdfName.Tx: + widget_type = 'text' + else: + continue + on_value = None + if widget_type == 'checkbox': + try: + on_values = list(annotation[pdfrw.PdfName.AP][pdfrw.PdfName.N].keys()) + except KeyError: + on_value = pdfrw.PdfName.On + else: + if pdfrw.PdfName.Off in on_values: + on_values.remove(pdfrw.PdfName.Off) + on_value = on_values[0] + fields.append( + Widget( + name=name, + widget_type=widget_type, + rect=Rect(*map(float, annotation[pdfrw.PdfName.Rect])), + on_value=on_value, + page=self, + annotation=annotation, + ) + ) + fields.sort(key=lambda field: (-field.rect[1], field.rect[0])) + return fields + + @property + def media_box(self): + return Rect(*map(float, self.page[pdfrw.PdfName.MediaBox])) + + def thumbnail_png(self, width=None): + width = width or self.THUMBNAIL_DEFAULT_WIDTH + + fp = io.BytesIO( + subprocess.check_output( + [ + 'pdftoppm', + '-png', + '-scale-to-x', + str(width or '-1'), + '-scale-to-y', + '-1', + '-f', + str(self.page_number + 1), + '-l', + str(self.page_number + 1), + '-', + ], + input=self.pdf.content, + ) + ) + return fp.getvalue() + + def thumbnail_field_rects(self, width=None): + '''Transform coordinates of fields to coordindates in thumbnail image.''' + width = width or self.THUMBNAIL_DEFAULT_WIDTH + media_box = self.media_box + media_width = media_box.x2 - media_box.x1 + media_height = media_box.y2 - media_box.y1 + height = int(width / media_width * media_height) + + for field in self.fields: + field_rect = field.rect + yield field, Rect( + # PDF coordinates origin is in the bottom-left corner but img + # tag origin is in the top-left corner + x1=int((field_rect.x1 - media_box.x1) / media_width * width), + y1=int((media_box.y2 - field_rect.y1) / media_height * height), + x2=int((field_rect.x2 - media_box.x1) / media_width * width), + y2=int((media_box.y2 - field_rect.y2) / media_height * height), + ) + + def fields_image_map(self, width=None, sep='\n', id_prefix='', id_suffix=''): + tags = [] + for field, area_rect in self.thumbnail_field_rects(width=width): + coords = ','.join(map(str, area_rect)) + tags.append( + f'' + ) + return sep.join(tags) + + +class PDF: + def __init__(self, content): + if hasattr(content, 'read'): + content = content.read() + self.content = content + + @functools.cached_property + def _pdf_reader(self): + return pdfrw.PdfReader(fdata=self.content) + + @property + def number_of_pages(self): + return len(self._pdf_reader.pages) + + def page(self, page_number): + return Page(pdf=self, page_number=page_number) + + @property + def pages(self): + for i in range(self.number_of_pages): + yield self.page(i) + + def write(self, file_object, flatten=False): + assert hasattr(file_object, 'write') + if not flatten: + pdfrw.PdfWriter().write(file_object, self._pdf_reader) + else: + with io.BytesIO() as fd: + pdfrw.PdfWriter().write(fd, self._pdf_reader) + original_content = fd.getvalue() + + with tempfile.NamedTemporaryFile() as output: + try: + subprocess.check_output( + [ + 'gs', + '-dSAFER', + '-dBATCH', + '-dNOPAUSE', + '-dNOCACHE', + '-sDEVICE=pdfwrite', + '-dPreserveAnnots=false', + f'-sOutputFile={output.name}', + '-', + ], + input=original_content, + ) + except subprocess.CalledProcessError as e: + raise Exception(f'gs error={e.returncode} output={e.output}') + output.seek(0) + new_content = output.read() + file_object.write(new_content) diff --git a/tests/data/cerfa_10072-02.pdf b/tests/data/cerfa_10072-02.pdf new file mode 100644 index 00000000..eb0ab173 Binary files /dev/null and b/tests/data/cerfa_10072-02.pdf differ diff --git a/tests/test_utils_pdf.py b/tests/test_utils_pdf.py new file mode 100644 index 00000000..c9b6ce58 --- /dev/null +++ b/tests/test_utils_pdf.py @@ -0,0 +1,95 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2023 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import io +import re + +import pytest +from PIL import Image + +from passerelle.utils.pdf import PDF + + +@pytest.fixture +def pdf(): + with open('tests/data/cerfa_10072-02.pdf', 'rb') as fd: + return PDF(content=fd) + + +def test_number_of_pages(pdf): + assert pdf.number_of_pages == 5 + + +def test_page(pdf): + assert pdf.page(0) is not None + assert pdf.page(0).media_box == (0, 0, 595.32, 841.92) + + +def test_page_len_fields(pdf): + assert len(list(pdf.page(0).fields)) == 53 + + +def test_page_fields(pdf): + page = pdf.page(0) + field = page.fields[0] + assert field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[2]' + assert field.widget_type == 'checkbox' + assert field.rect == (550.292, 691.02, 558.292, 699.02) + assert all(field.digest_id == field.digest_id.upper() for field in page.fields) + assert all(len(field.digest_id) >= 25 for field in page.fields) + # digests are unique + assert len(page.fields) == len({field.digest_id for field in page.fields}) + assert page.fields[0] != page.fields[1] + assert page.fields[0] == page.fields[0] + + +def test_thumbnail_png(pdf): + png = pdf.page(0).thumbnail_png() + assert png[:10] == b'\x89PNG\r\n\x1a\n\x00\x00' + image = Image.open(io.BytesIO(png)) + assert (image.width, image.height) == (800, 1132) + + +def test_fields_image_map(pdf): + image_map = pdf.page(0).fields_image_map() + assert len(list(re.findall('area', image_map))) == 53 + + +def test_field_set(pdf): + for field in pdf.page(0).fields: + if field.name == 'topmostSubform[0].Page1[0].Champ_de_texte1[0]': + field.set('coucou') + elif field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[0]': + field.set(True) + with io.BytesIO() as fd: + pdf.write(fd) + new_pdf = PDF(fd.getvalue()) + new_page = new_pdf.page(0) + check = set() + for field in new_page.fields: + if field.name == 'topmostSubform[0].Page1[0].Champ_de_texte1[0]': + check.add(1) + assert field.value == 'coucou' + elif field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[0]': + check.add(2) + assert field.value is True + elif field.widget_type == 'checkbox': + assert field.value is False + elif field.widget_type == 'text': + assert field.value == '' + else: + raise NotImplementedError + assert check == {1, 2}