passerelle/passerelle/utils/pdf.py

386 lines
13 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import dataclasses
import functools
import hashlib
import io
import subprocess
import tempfile
import typing
import pdfrw
RADIO_FLAG = 1 << 15 # bit 16
PUSH_BUTTON_FLAG = 1 << 16 # bit 17
LIST_FLAG = 1 << 17 # bit 18
class Rect(typing.NamedTuple):
x1: float
y1: float
x2: float
y2: float
@classmethod
def from_pdf_annotation(cls, annotation):
return cls(*map(float, annotation.Rect))
def rect_compare(rect1, rect2):
'''Sort rect top to bottom and left to right, PDF origin is in the
bottom-left corner.
Rect on the same horizontal band are considered at the same height.
'''
if -rect1.y1 < -rect2.y2:
return -1
if -rect2.y1 < -rect1.y1:
return 1
if rect1.x1 < rect2.x1:
return -1
if rect1.x1 > rect2.x1:
return 1
return 0
class FieldFlags(int):
@property
def is_radio(self):
return self & RADIO_FLAG
@property
def is_push_button(self):
return self & PUSH_BUTTON_FLAG
@property
def is_list(self):
return self & LIST_FLAG
@dataclasses.dataclass(frozen=True)
class Widget:
page: 'Page' = dataclasses.field(compare=False, repr=False)
annotation: pdfrw.PdfDict = dataclasses.field(repr=False)
@functools.cached_property
def name(self):
annot = self.annotation
name = annot.T.decode()
while annot.Parent:
annot = annot.Parent
name = f'{annot.T.decode()}.{name}'
return name
@property
def field_flags(self):
return FieldFlags(int(self.annotation.Ff or 0))
@property
def field_type(self):
return self.annotation.FT
@property
def widget_type(self):
if (
self.field_type == pdfrw.PdfName.Btn
and self.field_flags.is_radio
and not self.field_flags.is_push_button
):
return 'radio'
elif (
self.field_type == pdfrw.PdfName.Btn
and not self.field_flags.is_radio
and not self.field_flags.is_push_button
):
return 'checkbox'
elif self.field_type == pdfrw.PdfName.Tx:
return 'text'
elif self.field_type == pdfrw.PdfName.Ch:
if self.field_flags.is_list:
return 'list'
else:
return 'combo'
else:
raise NotImplementedError
@property
def rect(self):
return self.rects[0]
@property
def rects(self):
if self.widget_type == 'radio':
return [Rect.from_pdf_annotation(kid) for kid in self.kids_ordered_by_rect]
else:
return [Rect.from_pdf_annotation(self.annotation)]
@property
def digest_id(self):
if not self.name:
return ''
name_bytes = self.name.encode()
digest_algo = hashlib.md5(name_bytes)
digest = digest_algo.digest()
b32_encoded = base64.b32encode(digest).decode()
return b32_encoded.strip('=').upper()
@property
def checkbox_true_value(self):
assert self.widget_type == 'checkbox'
try:
true_values = list(self.annotation.AP.N.keys())
except KeyError:
return pdfrw.PdfName.On
else:
if pdfrw.PdfName.Off in true_values:
true_values.remove(pdfrw.PdfName.Off)
return true_values[0]
@property
def kids_ordered_by_rect(self):
assert self.widget_type == 'radio'
kids = list(self.annotation.Kids or [])
def compare(kid1, kid2):
return rect_compare(Rect.from_pdf_annotation(kid1), Rect.from_pdf_annotation(kid2))
kids.sort(key=functools.cmp_to_key(compare))
return kids
@property
def radio_possible_values(self):
assert self.widget_type == 'radio'
return list(list(kid.AP.N.keys())[0][1:] for kid in self.kids_ordered_by_rect if kid.AP and kid.AP.N)
@property
def combo_possible_values(self):
assert self.widget_type in ('list', 'combo')
return [(option[0].decode(), option[1].decode()) for option in self.annotation.Opt]
@property
def value(self):
if self.widget_type == 'text':
if self.annotation.V:
return self.annotation.V.decode()
return ''
elif self.widget_type == 'checkbox':
return self.annotation.V == self.checkbox_true_value
elif self.widget_type == 'radio':
return self.annotation.V.lstrip('/') if self.annotation.V else None
elif self.widget_type in ('list', 'combo'):
return self.annotation.V.decode() if self.annotation.V is not None else None
def set(self, value):
# allow rendering of values in Acrobat Reader
self.page.pdf._pdf_reader.Root.AcroForm.update(pdfrw.PdfDict(NeedAppearances=pdfrw.PdfObject('true')))
if self.widget_type == 'text':
str_value = str(value)
self.annotation.update(pdfrw.PdfDict(V=str_value, AS=str_value))
elif self.widget_type == 'checkbox':
bool_value = self.checkbox_true_value if value else pdfrw.PdfName.Off
self.annotation.update(pdfrw.PdfDict(V=bool_value, AS=bool_value))
elif self.widget_type == 'radio':
if value not in self.radio_possible_values:
raise ValueError(f'"{value}" is not one of {self.radio_possible_values}')
radio_value = pdfrw.PdfName(value)
for kid in self.annotation.Kids:
if kid.AP and kid.AP.N and radio_value in kid.AP.N:
kid.update(pdfrw.PdfDict(AS=radio_value))
else:
kid.update(pdfrw.PdfDict(AS=pdfrw.PdfName.Off))
self.annotation.update(pdfrw.PdfDict(V=radio_value))
elif self.widget_type in ('list', 'combo'):
for export, combo_value in self.combo_possible_values:
if combo_value == value:
self.annotation.update(
pdfrw.PdfDict(
V=pdfrw.PdfString.from_unicode(export), AS=pdfrw.PdfString.from_unicode(export)
)
)
break
@classmethod
def from_pdf_widget(cls, page, pdf_widget):
widget = cls(page=page, annotation=pdf_widget)
try:
widget.widget_type
except NotImplementedError:
return None
return widget
def __repr__(self):
return f'<Widget {self.name!r} : {self.widget_type}>'
@dataclasses.dataclass
class Page:
pdf: object
page_number: object
THUMBNAIL_DEFAULT_WIDTH = 800
@property
def page(self):
return self.pdf._pdf_reader.pages[self.page_number]
@property
def fields(self):
def widgets():
'''Find annotation which are widgets, if Subtype is not defined,
look at the parent (case of radio fields)'''
seen = set()
for annotation in self.page.Annots or ():
field = annotation
if field.Subtype != pdfrw.PdfName.Widget:
continue
while not field.T and field.Parent:
field = field.Parent
# skip field without name
if not field.T:
continue
# radio checkboxes have the same parent, to prevent duplicate
# fields
if field.T in seen:
continue
seen.add(field.T)
yield field
fields = []
for widget in widgets():
widget = Widget.from_pdf_widget(self, widget)
if widget:
fields.append(widget)
def compare(field1, field2):
return rect_compare(field1.rect, field2.rect)
fields.sort(key=functools.cmp_to_key(compare))
return fields
@property
def media_box(self):
return Rect(*map(float, self.page.MediaBox))
def thumbnail_png(self, width=None):
width = width or self.THUMBNAIL_DEFAULT_WIDTH
fp = io.BytesIO(
subprocess.check_output(
[
'pdftoppm',
'-png',
'-scale-to-x',
str(width or '-1'),
'-scale-to-y',
'-1',
'-f',
str(self.page_number + 1),
'-l',
str(self.page_number + 1),
'-',
],
stderr=subprocess.DEVNULL,
input=self.pdf.content,
)
)
return fp.getvalue()
def thumbnail_field_rects(self, width=None):
'''Transform coordinates of fields to coordindates in thumbnail image.'''
width = width or self.THUMBNAIL_DEFAULT_WIDTH
media_box = self.media_box
media_width = media_box.x2 - media_box.x1
media_height = media_box.y2 - media_box.y1
height = int(width / media_width * media_height)
for i, field in enumerate(self.fields):
for field_rect in field.rects:
yield i, field, Rect(
# PDF coordinates origin is in the bottom-left corner but img
# tag origin is in the top-left corner
x1=int((field_rect.x1 - media_box.x1) / media_width * width),
y1=int((media_box.y2 - field_rect.y1) / media_height * height),
x2=int((field_rect.x2 - media_box.x1) / media_width * width),
y2=int((media_box.y2 - field_rect.y2) / media_height * height),
)
def fields_image_map(self, width=None, sep='\n', id_prefix='', id_suffix=''):
tags = []
for _, field, area_rect in self.thumbnail_field_rects(width=width):
coords = ','.join(map(str, area_rect))
tags.append(
f'<area shape="rect" '
f'href="#{id_prefix}{field.digest_id}{id_suffix}" '
f'coords="{coords}">'
)
return sep.join(tags)
class PDF:
def __init__(self, content):
if hasattr(content, 'read'):
content = content.read()
self.content = content
@functools.cached_property
def _pdf_reader(self):
return pdfrw.PdfReader(fdata=self.content)
@property
def number_of_pages(self):
return len(self._pdf_reader.pages)
def page(self, page_number):
return Page(pdf=self, page_number=page_number)
@property
def pages(self):
for i in range(self.number_of_pages):
yield self.page(i)
def write(self, file_object, flatten=False):
assert hasattr(file_object, 'write')
if not flatten:
pdfrw.PdfWriter().write(file_object, self._pdf_reader)
else:
with io.BytesIO() as fd:
pdfrw.PdfWriter().write(fd, self._pdf_reader)
original_content = fd.getvalue()
with tempfile.NamedTemporaryFile() as output:
try:
subprocess.check_output(
[
'gs',
'-dSAFER',
'-dBATCH',
'-dNOPAUSE',
'-dNOCACHE',
'-sDEVICE=pdfwrite',
'-dPreserveAnnots=false',
f'-sOutputFile={output.name}',
'-',
],
stderr=subprocess.DEVNULL,
input=original_content,
)
except subprocess.CalledProcessError as e:
raise Exception(f'gs error={e.returncode} output={e.output}')
output.seek(0)
new_content = output.read()
file_object.write(new_content)