utils: add a PDF class (#74797)
This commit is contained in:
parent
965a6015da
commit
687e05b28e
|
@ -13,7 +13,9 @@ Homepage: https://dev.entrouvert.org/projects/passerelle
|
||||||
|
|
||||||
Package: python3-passerelle
|
Package: python3-passerelle
|
||||||
Architecture: all
|
Architecture: all
|
||||||
Depends: pdftk,
|
Depends: ghostscript,
|
||||||
|
pdftk,
|
||||||
|
poppler-utils,
|
||||||
python3-cmislib,
|
python3-cmislib,
|
||||||
python3-dateutil,
|
python3-dateutil,
|
||||||
python3-distutils,
|
python3-distutils,
|
||||||
|
|
|
@ -36,6 +36,11 @@ LOGGING['loggers']['paramiko.transport'] = {
|
||||||
'propagate': True,
|
'propagate': True,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# silence pdfrw
|
||||||
|
LOGGING['loggers']['pdfrw'] = {
|
||||||
|
'propagate': False,
|
||||||
|
}
|
||||||
|
|
||||||
exec(open('/etc/%s/settings.py' % PROJECT_NAME).read())
|
exec(open('/etc/%s/settings.py' % PROJECT_NAME).read())
|
||||||
|
|
||||||
# run additional settings snippets
|
# run additional settings snippets
|
||||||
|
|
|
@ -0,0 +1,247 @@
|
||||||
|
# passerelle - uniform access to multiple data sources and services
|
||||||
|
# Copyright (C) 2023 Entr'ouvert
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify it
|
||||||
|
# under the terms of the GNU Affero General Public License as published
|
||||||
|
# by the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import dataclasses
|
||||||
|
import functools
|
||||||
|
import hashlib
|
||||||
|
import io
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import typing
|
||||||
|
|
||||||
|
import pdfrw
|
||||||
|
|
||||||
|
|
||||||
|
class Rect(typing.NamedTuple):
|
||||||
|
x1: float
|
||||||
|
y1: float
|
||||||
|
x2: float
|
||||||
|
y2: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Widget:
|
||||||
|
page: 'Page' = dataclasses.field(compare=False, repr=False)
|
||||||
|
name: str
|
||||||
|
widget_type: str = dataclasses.field(compare=False)
|
||||||
|
rect: Rect = dataclasses.field(compare=False)
|
||||||
|
on_value: str = dataclasses.field(compare=False, default=pdfrw.PdfName.On)
|
||||||
|
annotation: pdfrw.PdfDict = dataclasses.field(default=None, repr=False)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def digest_id(self):
|
||||||
|
if not self.name:
|
||||||
|
return ''
|
||||||
|
name_bytes = self.name.encode()
|
||||||
|
digest_algo = hashlib.md5(name_bytes)
|
||||||
|
digest = digest_algo.digest()
|
||||||
|
b32_encoded = base64.b32encode(digest).decode()
|
||||||
|
return b32_encoded.strip('=').upper()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def value(self):
|
||||||
|
if self.widget_type == 'text':
|
||||||
|
if self.annotation[pdfrw.PdfName.V]:
|
||||||
|
return self.annotation[pdfrw.PdfName.V].decode()
|
||||||
|
return ''
|
||||||
|
elif self.widget_type == 'checkbox':
|
||||||
|
return self.annotation[pdfrw.PdfName.V] == self.on_value
|
||||||
|
|
||||||
|
def set(self, value):
|
||||||
|
# allow rendering of values in Acrobat Reader
|
||||||
|
self.page.pdf._pdf_reader.Root.AcroForm.update(pdfrw.PdfDict(NeedAppearances=pdfrw.PdfObject('true')))
|
||||||
|
if self.widget_type == 'text':
|
||||||
|
str_value = str(value)
|
||||||
|
self.annotation.update(pdfrw.PdfDict(V=str_value, AS=str_value))
|
||||||
|
elif self.widget_type == 'checkbox':
|
||||||
|
bool_value = self.on_value if value else pdfrw.PdfName.Off
|
||||||
|
self.annotation.update(pdfrw.PdfDict(V=bool_value, AS=bool_value))
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Page:
|
||||||
|
pdf: object
|
||||||
|
page_number: object
|
||||||
|
|
||||||
|
THUMBNAIL_DEFAULT_WIDTH = 800
|
||||||
|
|
||||||
|
@property
|
||||||
|
def page(self):
|
||||||
|
return self.pdf._pdf_reader.pages[self.page_number]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fields(self):
|
||||||
|
fields = []
|
||||||
|
for annotation in self.page[pdfrw.PdfName.Annots] or ():
|
||||||
|
if annotation[pdfrw.PdfName.Subtype] != pdfrw.PdfName.Widget:
|
||||||
|
continue
|
||||||
|
if not annotation[pdfrw.PdfName.T]:
|
||||||
|
continue
|
||||||
|
name = annotation[pdfrw.PdfName.T].decode()
|
||||||
|
parent = annotation[pdfrw.PdfName.Parent]
|
||||||
|
while parent and parent[pdfrw.PdfName.T]:
|
||||||
|
name = f'{parent[pdfrw.PdfName.T].decode()}.{name}'
|
||||||
|
parent = parent[pdfrw.PdfName.Parent]
|
||||||
|
if not annotation[pdfrw.PdfName.FT]:
|
||||||
|
continue
|
||||||
|
pdf_field_type = annotation[pdfrw.PdfName.FT]
|
||||||
|
pdf_field_flags = annotation[pdfrw.PdfName.Ff] or 0
|
||||||
|
RADIO_FLAG = 2**16
|
||||||
|
PUSH_BUTTON_FLAG = 2**17
|
||||||
|
if (
|
||||||
|
pdf_field_type == pdfrw.PdfName.Btn
|
||||||
|
and not (pdf_field_flags & RADIO_FLAG)
|
||||||
|
and not (pdf_field_flags & PUSH_BUTTON_FLAG)
|
||||||
|
):
|
||||||
|
widget_type = 'checkbox'
|
||||||
|
elif pdf_field_type == pdfrw.PdfName.Tx:
|
||||||
|
widget_type = 'text'
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
on_value = None
|
||||||
|
if widget_type == 'checkbox':
|
||||||
|
try:
|
||||||
|
on_values = list(annotation[pdfrw.PdfName.AP][pdfrw.PdfName.N].keys())
|
||||||
|
except KeyError:
|
||||||
|
on_value = pdfrw.PdfName.On
|
||||||
|
else:
|
||||||
|
if pdfrw.PdfName.Off in on_values:
|
||||||
|
on_values.remove(pdfrw.PdfName.Off)
|
||||||
|
on_value = on_values[0]
|
||||||
|
fields.append(
|
||||||
|
Widget(
|
||||||
|
name=name,
|
||||||
|
widget_type=widget_type,
|
||||||
|
rect=Rect(*map(float, annotation[pdfrw.PdfName.Rect])),
|
||||||
|
on_value=on_value,
|
||||||
|
page=self,
|
||||||
|
annotation=annotation,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
fields.sort(key=lambda field: (-field.rect[1], field.rect[0]))
|
||||||
|
return fields
|
||||||
|
|
||||||
|
@property
|
||||||
|
def media_box(self):
|
||||||
|
return Rect(*map(float, self.page[pdfrw.PdfName.MediaBox]))
|
||||||
|
|
||||||
|
def thumbnail_png(self, width=None):
|
||||||
|
width = width or self.THUMBNAIL_DEFAULT_WIDTH
|
||||||
|
|
||||||
|
fp = io.BytesIO(
|
||||||
|
subprocess.check_output(
|
||||||
|
[
|
||||||
|
'pdftoppm',
|
||||||
|
'-png',
|
||||||
|
'-scale-to-x',
|
||||||
|
str(width or '-1'),
|
||||||
|
'-scale-to-y',
|
||||||
|
'-1',
|
||||||
|
'-f',
|
||||||
|
str(self.page_number + 1),
|
||||||
|
'-l',
|
||||||
|
str(self.page_number + 1),
|
||||||
|
'-',
|
||||||
|
],
|
||||||
|
input=self.pdf.content,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return fp.getvalue()
|
||||||
|
|
||||||
|
def thumbnail_field_rects(self, width=None):
|
||||||
|
'''Transform coordinates of fields to coordindates in thumbnail image.'''
|
||||||
|
width = width or self.THUMBNAIL_DEFAULT_WIDTH
|
||||||
|
media_box = self.media_box
|
||||||
|
media_width = media_box.x2 - media_box.x1
|
||||||
|
media_height = media_box.y2 - media_box.y1
|
||||||
|
height = int(width / media_width * media_height)
|
||||||
|
|
||||||
|
for field in self.fields:
|
||||||
|
field_rect = field.rect
|
||||||
|
yield field, Rect(
|
||||||
|
# PDF coordinates origin is in the bottom-left corner but img
|
||||||
|
# tag origin is in the top-left corner
|
||||||
|
x1=int((field_rect.x1 - media_box.x1) / media_width * width),
|
||||||
|
y1=int((media_box.y2 - field_rect.y1) / media_height * height),
|
||||||
|
x2=int((field_rect.x2 - media_box.x1) / media_width * width),
|
||||||
|
y2=int((media_box.y2 - field_rect.y2) / media_height * height),
|
||||||
|
)
|
||||||
|
|
||||||
|
def fields_image_map(self, width=None, sep='\n', id_prefix='', id_suffix=''):
|
||||||
|
tags = []
|
||||||
|
for field, area_rect in self.thumbnail_field_rects(width=width):
|
||||||
|
coords = ','.join(map(str, area_rect))
|
||||||
|
tags.append(
|
||||||
|
f'<area shape="rect" '
|
||||||
|
f'href="#{id_prefix}{field.digest_id}{id_suffix}" '
|
||||||
|
f'coords="{coords}">'
|
||||||
|
)
|
||||||
|
return sep.join(tags)
|
||||||
|
|
||||||
|
|
||||||
|
class PDF:
|
||||||
|
def __init__(self, content):
|
||||||
|
if hasattr(content, 'read'):
|
||||||
|
content = content.read()
|
||||||
|
self.content = content
|
||||||
|
|
||||||
|
@functools.cached_property
|
||||||
|
def _pdf_reader(self):
|
||||||
|
return pdfrw.PdfReader(fdata=self.content)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def number_of_pages(self):
|
||||||
|
return len(self._pdf_reader.pages)
|
||||||
|
|
||||||
|
def page(self, page_number):
|
||||||
|
return Page(pdf=self, page_number=page_number)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pages(self):
|
||||||
|
for i in range(self.number_of_pages):
|
||||||
|
yield self.page(i)
|
||||||
|
|
||||||
|
def write(self, file_object, flatten=False):
|
||||||
|
assert hasattr(file_object, 'write')
|
||||||
|
if not flatten:
|
||||||
|
pdfrw.PdfWriter().write(file_object, self._pdf_reader)
|
||||||
|
else:
|
||||||
|
with io.BytesIO() as fd:
|
||||||
|
pdfrw.PdfWriter().write(fd, self._pdf_reader)
|
||||||
|
original_content = fd.getvalue()
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile() as output:
|
||||||
|
try:
|
||||||
|
subprocess.check_output(
|
||||||
|
[
|
||||||
|
'gs',
|
||||||
|
'-dSAFER',
|
||||||
|
'-dBATCH',
|
||||||
|
'-dNOPAUSE',
|
||||||
|
'-dNOCACHE',
|
||||||
|
'-sDEVICE=pdfwrite',
|
||||||
|
'-dPreserveAnnots=false',
|
||||||
|
f'-sOutputFile={output.name}',
|
||||||
|
'-',
|
||||||
|
],
|
||||||
|
input=original_content,
|
||||||
|
)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise Exception(f'gs error={e.returncode} output={e.output}')
|
||||||
|
output.seek(0)
|
||||||
|
new_content = output.read()
|
||||||
|
file_object.write(new_content)
|
Binary file not shown.
|
@ -0,0 +1,95 @@
|
||||||
|
# passerelle - uniform access to multiple data sources and services
|
||||||
|
# Copyright (C) 2023 Entr'ouvert
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify it
|
||||||
|
# under the terms of the GNU Affero General Public License as published
|
||||||
|
# by the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import io
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
from passerelle.utils.pdf import PDF
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def pdf():
|
||||||
|
with open('tests/data/cerfa_10072-02.pdf', 'rb') as fd:
|
||||||
|
return PDF(content=fd)
|
||||||
|
|
||||||
|
|
||||||
|
def test_number_of_pages(pdf):
|
||||||
|
assert pdf.number_of_pages == 5
|
||||||
|
|
||||||
|
|
||||||
|
def test_page(pdf):
|
||||||
|
assert pdf.page(0) is not None
|
||||||
|
assert pdf.page(0).media_box == (0, 0, 595.32, 841.92)
|
||||||
|
|
||||||
|
|
||||||
|
def test_page_len_fields(pdf):
|
||||||
|
assert len(list(pdf.page(0).fields)) == 53
|
||||||
|
|
||||||
|
|
||||||
|
def test_page_fields(pdf):
|
||||||
|
page = pdf.page(0)
|
||||||
|
field = page.fields[0]
|
||||||
|
assert field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[2]'
|
||||||
|
assert field.widget_type == 'checkbox'
|
||||||
|
assert field.rect == (550.292, 691.02, 558.292, 699.02)
|
||||||
|
assert all(field.digest_id == field.digest_id.upper() for field in page.fields)
|
||||||
|
assert all(len(field.digest_id) >= 25 for field in page.fields)
|
||||||
|
# digests are unique
|
||||||
|
assert len(page.fields) == len({field.digest_id for field in page.fields})
|
||||||
|
assert page.fields[0] != page.fields[1]
|
||||||
|
assert page.fields[0] == page.fields[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_thumbnail_png(pdf):
|
||||||
|
png = pdf.page(0).thumbnail_png()
|
||||||
|
assert png[:10] == b'\x89PNG\r\n\x1a\n\x00\x00'
|
||||||
|
image = Image.open(io.BytesIO(png))
|
||||||
|
assert (image.width, image.height) == (800, 1132)
|
||||||
|
|
||||||
|
|
||||||
|
def test_fields_image_map(pdf):
|
||||||
|
image_map = pdf.page(0).fields_image_map()
|
||||||
|
assert len(list(re.findall('area', image_map))) == 53
|
||||||
|
|
||||||
|
|
||||||
|
def test_field_set(pdf):
|
||||||
|
for field in pdf.page(0).fields:
|
||||||
|
if field.name == 'topmostSubform[0].Page1[0].Champ_de_texte1[0]':
|
||||||
|
field.set('coucou')
|
||||||
|
elif field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[0]':
|
||||||
|
field.set(True)
|
||||||
|
with io.BytesIO() as fd:
|
||||||
|
pdf.write(fd)
|
||||||
|
new_pdf = PDF(fd.getvalue())
|
||||||
|
new_page = new_pdf.page(0)
|
||||||
|
check = set()
|
||||||
|
for field in new_page.fields:
|
||||||
|
if field.name == 'topmostSubform[0].Page1[0].Champ_de_texte1[0]':
|
||||||
|
check.add(1)
|
||||||
|
assert field.value == 'coucou'
|
||||||
|
elif field.name == 'topmostSubform[0].Page1[0].Case_à_cocher1[0]':
|
||||||
|
check.add(2)
|
||||||
|
assert field.value is True
|
||||||
|
elif field.widget_type == 'checkbox':
|
||||||
|
assert field.value is False
|
||||||
|
elif field.widget_type == 'text':
|
||||||
|
assert field.value == ''
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
assert check == {1, 2}
|
Loading…
Reference in New Issue