294 lines
12 KiB
Python
294 lines
12 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2023 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import os
|
|
import subprocess
|
|
import xml.etree.ElementTree as ET
|
|
from io import BytesIO
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files import File
|
|
from django.urls import reverse
|
|
from pdfrw import PdfReader
|
|
|
|
from passerelle.apps.pdf.models import Resource
|
|
from tests.test_manager import login
|
|
from tests.utils import generic_endpoint_url, setup_access_rights
|
|
|
|
with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd:
|
|
pdf_content = fd.read()
|
|
pdf_b64content = base64.b64encode(pdf_content).decode()
|
|
with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd:
|
|
acroform_content = fd.read()
|
|
acroform_b64content = base64.b64encode(acroform_content).decode()
|
|
|
|
|
|
@pytest.fixture
|
|
def pdf(db):
|
|
return setup_access_rights(Resource.objects.create(slug='test', title='test', description='test'))
|
|
|
|
|
|
@mock.patch('subprocess.check_output')
|
|
def test_pdf_assemble(mocked_check_output, app, pdf):
|
|
endpoint = generic_endpoint_url('pdf', 'assemble', slug=pdf.slug)
|
|
|
|
payload = {'filename': 'foo.pdf', 'files/0': {'content': pdf_b64content}}
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="foo.pdf"'
|
|
assert mocked_check_output.call_count == 1
|
|
pdftk_call = mocked_check_output.call_args.args[0]
|
|
assert len(pdftk_call) == 5
|
|
assert pdftk_call[0] == '/usr/bin/pdftk'
|
|
assert pdftk_call[1].endswith('/pdf-0.pdf')
|
|
assert pdftk_call[2] == 'cat'
|
|
assert pdftk_call[3] == 'output'
|
|
assert pdftk_call[4] == '-'
|
|
assert mocked_check_output.call_args.kwargs['timeout'] == 20
|
|
|
|
payload = {
|
|
'filename': 'bar.pdf',
|
|
'files/0': {'content': ''},
|
|
'files/1': {'content': pdf_b64content},
|
|
'files/2': None,
|
|
'files/3': pdf_b64content,
|
|
'files/4': '',
|
|
}
|
|
mocked_check_output.reset_mock()
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="bar.pdf"'
|
|
assert mocked_check_output.call_count == 1
|
|
pdftk_call = mocked_check_output.call_args.args[0]
|
|
assert len(pdftk_call) == 6
|
|
assert pdftk_call[0] == '/usr/bin/pdftk'
|
|
assert pdftk_call[1].endswith('/pdf-1.pdf') # file 0
|
|
assert pdftk_call[2].endswith('/pdf-3.pdf') # file 2
|
|
|
|
# pdftk errors (faked)
|
|
payload = {'filename': 'out.pdf', 'files/0': {'content': pdf_b64content}}
|
|
mocked_check_output.reset_mock()
|
|
mocked_check_output.side_effect = subprocess.TimeoutExpired(cmd=[], timeout=20)
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert mocked_check_output.call_count == 1
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('pdftk timed out after 20 seconds')
|
|
|
|
mocked_check_output.reset_mock()
|
|
mocked_check_output.side_effect = subprocess.CalledProcessError(cmd=[], returncode=42, output='ooops')
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert mocked_check_output.call_count == 1
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('pdftk returned non-zero exit status 42')
|
|
assert 'ooops' in resp.json['err_desc']
|
|
|
|
# bad calls errors
|
|
resp = app.post(endpoint, status=400)
|
|
assert resp.headers['content-type'].startswith('application/json')
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('could not decode body to json')
|
|
|
|
payload = {}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "'filename' is a required property"
|
|
|
|
payload = {'filename': 'out.pdf'}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "'files' is a required property"
|
|
|
|
payload = {'filename': 'out.pdf', 'files/0': 42}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "42 is not of type 'object'"
|
|
|
|
resp = app.get(endpoint, status=405)
|
|
|
|
|
|
def test_pdf_real_pdftk_assemble(app, pdf, settings):
|
|
if not os.path.exists(settings.PDFTK_PATH):
|
|
pytest.skip('pdftk (%s) not found' % settings.PDFTK_PATH)
|
|
|
|
endpoint = generic_endpoint_url('pdf', 'assemble', slug=pdf.slug)
|
|
payload = {
|
|
'filename': 'twopages.pdf',
|
|
'files/0': {'content': pdf_b64content},
|
|
'files/1': {'content': pdf_b64content},
|
|
}
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="twopages.pdf"'
|
|
assert resp.content[:5] == b'%PDF-'
|
|
assert PdfReader(fdata=resp.content).numPages == 2
|
|
|
|
|
|
@mock.patch('subprocess.check_output')
|
|
def test_pdf_fill_form(mocked_check_output, app, pdf):
|
|
endpoint = generic_endpoint_url('pdf', 'fill-form', slug=pdf.slug)
|
|
|
|
def check_xml(args, **kwargs):
|
|
# check XML FDF file
|
|
xfdf = ET.parse(args[3]).getroot()
|
|
assert xfdf.tag == '{http://ns.adobe.com/xfdf/}xfdf'
|
|
assert xfdf.find('{http://ns.adobe.com/xfdf/}f').attrib['href'].endswith('.pdf')
|
|
field = xfdf.find('{http://ns.adobe.com/xfdf/}fields').find('{http://ns.adobe.com/xfdf/}field')
|
|
assert field.attrib['name'] == 'fname'
|
|
assert field.find('{http://ns.adobe.com/xfdf/}value').text == 'John'
|
|
|
|
payload = {
|
|
'filename': 'foo.pdf',
|
|
'xfdf/fname': 'John',
|
|
'input-form': {'content': acroform_b64content},
|
|
}
|
|
mocked_check_output.side_effect = check_xml
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="foo.pdf"'
|
|
assert mocked_check_output.call_count == 1
|
|
pdftk_call = mocked_check_output.call_args.args[0]
|
|
assert len(pdftk_call) == 6
|
|
assert pdftk_call[0] == '/usr/bin/pdftk'
|
|
assert pdftk_call[1].endswith('/input-form.pdf')
|
|
assert pdftk_call[2] == 'fill_form'
|
|
assert pdftk_call[3].endswith('/fields.xfdf')
|
|
assert pdftk_call[4] == 'output'
|
|
assert pdftk_call[5] == '-'
|
|
assert mocked_check_output.call_args.kwargs['timeout'] == 20
|
|
|
|
pdf.fill_form_file = File(BytesIO(acroform_content), 'default.pdf')
|
|
pdf.save()
|
|
payload = {
|
|
'filename': 'bar.pdf',
|
|
'xfdf/fname': 'John',
|
|
}
|
|
mocked_check_output.reset_mock()
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="bar.pdf"'
|
|
assert mocked_check_output.call_count == 1
|
|
pdftk_call = mocked_check_output.call_args.args[0]
|
|
assert len(pdftk_call) == 6
|
|
assert pdftk_call[0] == '/usr/bin/pdftk'
|
|
assert pdftk_call[1].endswith('media/pdf/test/default.pdf')
|
|
assert pdftk_call[2] == 'fill_form'
|
|
assert pdftk_call[3].endswith('/fields.xfdf')
|
|
assert pdftk_call[4] == 'output'
|
|
assert pdftk_call[5] == '-'
|
|
assert mocked_check_output.call_args.kwargs['timeout'] == 20
|
|
|
|
# pdftk errors (faked)
|
|
payload = {
|
|
'filename': 'foo.pdf',
|
|
'xfdf/fname': 'Bill',
|
|
'input-form': {'content': acroform_b64content},
|
|
}
|
|
mocked_check_output.reset_mock()
|
|
mocked_check_output.side_effect = subprocess.TimeoutExpired(cmd=[], timeout=20)
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert mocked_check_output.call_count == 1
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('pdftk timed out after 20 seconds')
|
|
|
|
mocked_check_output.reset_mock()
|
|
mocked_check_output.side_effect = subprocess.CalledProcessError(cmd=[], returncode=42, output='ooops')
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert mocked_check_output.call_count == 1
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('pdftk returned non-zero exit status 42')
|
|
assert 'ooops' in resp.json['err_desc']
|
|
|
|
# bad calls errors
|
|
resp = app.post(endpoint, status=400)
|
|
assert resp.headers['content-type'].startswith('application/json')
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'].startswith('could not decode body to json')
|
|
|
|
payload = {}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "'filename' is a required property"
|
|
|
|
payload = {'filename': 'out.pdf'}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "missing 'xfdf' property (no XFDF template)"
|
|
|
|
payload = {'filename': 'out.pdf', 'xfdf': 'not-a-dict'}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "xfdf: 'not-a-dict' is not of type 'object'"
|
|
|
|
pdf.fill_form_file = None # no default PDF form
|
|
pdf.save()
|
|
payload = {
|
|
'filename': 'bar.pdf',
|
|
'xfdf/fname': 'Alice',
|
|
}
|
|
resp = app.post_json(endpoint, params=payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == "missing or bad 'input-form' property (no default input file)"
|
|
|
|
resp = app.get(endpoint, status=405)
|
|
|
|
|
|
def test_pdf_real_pdftk_fillform(admin_user, app, pdf, settings):
|
|
if not os.path.exists(settings.PDFTK_PATH):
|
|
pytest.skip('pdftk (%s) not found' % settings.PDFTK_PATH)
|
|
|
|
endpoint = generic_endpoint_url('pdf', 'fill-form', slug=pdf.slug)
|
|
payload = {
|
|
'filename': 'filled.pdf',
|
|
'xfdf/fname': 'ThisIsMyFirstName',
|
|
'input-form': {'content': acroform_b64content},
|
|
}
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.headers['content-type'] == 'application/pdf'
|
|
assert resp.headers['content-disposition'] == 'attachment; filename="filled.pdf"'
|
|
assert PdfReader(fdata=resp.content).numPages == 1
|
|
assert resp.content[:5] == b'%PDF-'
|
|
# TODO: found an easy way to verify 'ThisIsMyFirstName' in resp.content
|
|
|
|
# dump fields in manager view
|
|
pdf.fill_form_file = File(BytesIO(acroform_content), 'pdf-form.pdf')
|
|
pdf.save()
|
|
manage_url = reverse('view-connector', kwargs={'connector': 'pdf', 'slug': pdf.slug})
|
|
resp = app.get(manage_url)
|
|
assert 'panel-dumpfields' not in resp.text
|
|
assert '<b>xfdf/fname</b>' not in resp.text
|
|
app = login(app)
|
|
resp = app.get(manage_url)
|
|
assert 'panel-dumpfields' in resp.text
|
|
assert '<b>xfdf/fname</b>' in resp.text
|
|
|
|
|
|
def test_pdf_validator(pdf):
|
|
pdf.fill_form_file = File(BytesIO(pdf_content), 'default.pdf')
|
|
pdf.save()
|
|
pdf.full_clean()
|
|
|
|
pdf.fill_form_file = File(BytesIO(acroform_content), 'default.pdf')
|
|
pdf.save()
|
|
pdf.full_clean()
|
|
|
|
pdf.fill_form_file = File(BytesIO(b'not a pdf'), 'test.txt')
|
|
pdf.save()
|
|
with pytest.raises(ValidationError):
|
|
pdf.full_clean()
|