This commit is contained in:
parent
d43865b2a5
commit
7f897b7c9b
|
@ -1,6 +1,7 @@
|
|||
import os
|
||||
import re
|
||||
import urllib.parse
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import responses
|
||||
|
@ -679,3 +680,74 @@ def test_file_auto_convert_heic(pub):
|
|||
resp = resp.follow()
|
||||
assert resp.click('image.jpeg').follow().content_type == 'image/jpeg'
|
||||
assert b'JFIF' in resp.click('image.jpeg').follow().body
|
||||
|
||||
|
||||
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
|
||||
def test_form_file_field_no_clamd(pub, enable_tracking_codes):
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.enable_tracking_codes = enable_tracking_codes
|
||||
formdef.fields = [
|
||||
fields.FileField(
|
||||
id='0',
|
||||
label='file',
|
||||
)
|
||||
]
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
upload = Upload('test.txt', b'foobar', 'text/plain')
|
||||
resp = get_app(pub).get('/test/')
|
||||
resp.forms[0]['f0$file'] = upload
|
||||
|
||||
with mock.patch('wcs.clamd.subprocess') as subp:
|
||||
attrs = {'run.return_value': mock.Mock(returncode=0)}
|
||||
subp.configure_mock(**attrs)
|
||||
resp = resp.form.submit('submit') # -> validation
|
||||
subp.run.assert_not_called() # -> no scan
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
subp.run.assert_not_called() # -> no scan
|
||||
|
||||
formdata = formdef.data_class().select()[0]
|
||||
assert formdata.data['0'].clamd == {}
|
||||
|
||||
|
||||
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
|
||||
def test_form_file_field_clamd(pub, enable_tracking_codes):
|
||||
pub.load_site_options()
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'enable-clamd', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
pub.load_site_options()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.enable_tracking_codes = enable_tracking_codes
|
||||
formdef.fields = [
|
||||
fields.FileField(
|
||||
id='0',
|
||||
label='file',
|
||||
)
|
||||
]
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
upload = Upload('test.txt', b'foobar', 'text/plain')
|
||||
resp = get_app(pub).get('/test/')
|
||||
resp.forms[0]['f0$file'] = upload
|
||||
|
||||
with mock.patch('wcs.clamd.subprocess') as subp:
|
||||
attrs = {'run.return_value': mock.Mock(returncode=0)}
|
||||
subp.configure_mock(**attrs)
|
||||
resp = resp.form.submit('submit') # -> validation
|
||||
subp.run.assert_not_called() # -> no scan
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
formdata = formdef.data_class().select()[0]
|
||||
subp.run.assert_called_once_with(
|
||||
['clamdscan', '--fdpass', formdata.data['0'].get_fs_filename()], check=False
|
||||
)
|
||||
assert formdata.data['0'].clamd == {'returncode': 0, 'waiting_scan': False}
|
||||
|
|
|
@ -411,6 +411,52 @@ def test_formdata_attachment_pick_from_portfolio(pub, fargo_url):
|
|||
assert 'use-file-from-fargo' in resp.text
|
||||
|
||||
|
||||
def test_formdata_attachment_clamd(pub):
|
||||
pub.load_site_options()
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'enable-clamd', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
pub.load_site_options()
|
||||
|
||||
create_user(pub)
|
||||
wf = Workflow(name='status')
|
||||
st1 = wf.add_status('Status1', 'st1')
|
||||
attach = st1.add_action('addattachment', id='_attach')
|
||||
attach.by = ['_submitter']
|
||||
wf.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
|
||||
resp = resp.forms[0].submit('submit')
|
||||
assert 'Check values then click submit.' in resp.text
|
||||
resp = resp.forms[0].submit('submit')
|
||||
assert resp.status_int == 302
|
||||
resp = resp.follow()
|
||||
assert 'The form has been recorded' in resp.text
|
||||
|
||||
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain')
|
||||
with mock.patch('wcs.clamd.subprocess') as subp:
|
||||
attrs = {'run.return_value': mock.Mock(returncode=0)}
|
||||
subp.configure_mock(**attrs)
|
||||
|
||||
resp = resp.forms[0].submit('button_attach')
|
||||
assert formdef.data_class().count() == 1
|
||||
formdata = formdef.data_class().select()[0]
|
||||
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
|
||||
attachment = formdata.evolution[-1].parts[0]
|
||||
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
|
||||
assert attachment.clamd == {'returncode': 0, 'waiting_scan': False}
|
||||
|
||||
|
||||
def test_formdata_generated_document_download(pub):
|
||||
create_user(pub)
|
||||
wf = Workflow(name='status')
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
# w.c.s. - web application for online forms
|
||||
# Copyright (C) 2005-2024 Entr'ouvert
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import subprocess
|
||||
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.qommon import _
|
||||
from wcs.qommon.afterjobs import AfterJob
|
||||
|
||||
RETURNCODE_MAP = {
|
||||
0: _('No virus found'),
|
||||
1: _('Virus(es) found'),
|
||||
2: _('An error occurred'),
|
||||
}
|
||||
|
||||
|
||||
class PickableClamD:
|
||||
@property
|
||||
def clamd(self):
|
||||
if getattr(self, '_clamd', None) is None:
|
||||
self._clamd = {}
|
||||
return self._clamd
|
||||
|
||||
def set_waiting_scan(self):
|
||||
if get_publisher().has_site_option('enable-clamd') and 'waiting_scan' not in self.clamd:
|
||||
self.clamd['waiting_scan'] = True
|
||||
|
||||
|
||||
class ClamDScanJob(AfterJob):
|
||||
def __init__(self, formdef, formdata_id, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.formdef = formdef
|
||||
self.formdata_id = formdata_id
|
||||
|
||||
def run_clamd(self, obj, path):
|
||||
if obj.clamd.get('waiting_scan', False):
|
||||
clamd = subprocess.run(['clamdscan', '--fdpass', path], check=False)
|
||||
obj.clamd['returncode'] = clamd.returncode
|
||||
obj.clamd['waiting_scan'] = False
|
||||
return True
|
||||
return False
|
||||
|
||||
def execute(self):
|
||||
formdata = self.formdef.data_class().get(self.formdata_id)
|
||||
store = False
|
||||
|
||||
from wcs.fields import FileField
|
||||
|
||||
for field in self.formdef.iter_fields(include_block_fields=True):
|
||||
if isinstance(field, FileField):
|
||||
upload = formdata.data.get(field.id)
|
||||
store = self.run_clamd(upload, upload.get_fs_filename()) or store
|
||||
|
||||
from wcs.workflows import AttachmentEvolutionPart
|
||||
|
||||
for p in formdata.iter_evolution_parts(AttachmentEvolutionPart):
|
||||
store = self.run_clamd(p, p.get_file_path()) or store
|
||||
|
||||
if store:
|
||||
formdata.store()
|
||||
self.status = 'completed'
|
|
@ -38,6 +38,7 @@ from quixote.util import randbytes
|
|||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.categories import Category
|
||||
from wcs.clamd import ClamDScanJob
|
||||
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
|
||||
from wcs.formdata import Evolution, FormData
|
||||
from wcs.formdef import FormDef
|
||||
|
@ -1969,6 +1970,11 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
|
|||
url = filled.get_url(backoffice=True)
|
||||
else:
|
||||
url = filled.get_url(language=get_publisher().current_language)
|
||||
|
||||
if get_publisher().has_site_option('enable-clamd'):
|
||||
job = get_response().add_after_job(ClamDScanJob(formdef=self.formdef, formdata_id=filled.id))
|
||||
job.store()
|
||||
|
||||
return redirect(url)
|
||||
|
||||
def cancelled(self):
|
||||
|
|
|
@ -24,13 +24,16 @@ from django.utils.module_loading import import_string
|
|||
from quixote import get_publisher
|
||||
from quixote.http_request import Upload
|
||||
|
||||
from wcs.clamd import PickableClamD
|
||||
|
||||
from .errors import ConnectionError
|
||||
from .misc import Image, can_thumbnail, file_digest
|
||||
from .storage import atomic_write
|
||||
|
||||
|
||||
class PicklableUpload(Upload):
|
||||
class PicklableUpload(Upload, PickableClamD):
|
||||
def __getstate__(self):
|
||||
super().set_waiting_scan()
|
||||
odict = self.__dict__.copy()
|
||||
if 'fp' in odict:
|
||||
del odict['fp']
|
||||
|
|
|
@ -18,8 +18,9 @@ import os
|
|||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from quixote import get_request, redirect
|
||||
from quixote import get_publisher, get_request, get_response, redirect
|
||||
|
||||
from wcs.clamd import ClamDScanJob
|
||||
from wcs.forms.common import FileDirectory, FormStatusPage
|
||||
from wcs.portfolio import has_portfolio, push_document
|
||||
from wcs.workflows import AttachmentEvolutionPart, WorkflowStatusItem, register_item_class
|
||||
|
@ -172,6 +173,12 @@ class AddAttachmentWorkflowStatusItem(WorkflowStatusItem):
|
|||
evo_part.display_in_history = self.attach_to_history
|
||||
evo.add_part(evo_part)
|
||||
|
||||
if get_publisher().has_site_option('enable-clamd'):
|
||||
job = get_response().add_after_job(
|
||||
ClamDScanJob(formdef=formdata.formdef, formdata_id=formdata.id)
|
||||
)
|
||||
job.store()
|
||||
|
||||
def get_parameters(self):
|
||||
parameters = (
|
||||
'by',
|
||||
|
|
|
@ -35,6 +35,7 @@ from quixote import get_publisher, get_request, get_response, get_session
|
|||
from quixote.html import TemplateIO, htmlescape, htmltext
|
||||
|
||||
import wcs.qommon.storage as st
|
||||
from wcs.clamd import PickableClamD
|
||||
from wcs.qommon.storage import StorableObject, atomic_write
|
||||
from wcs.sql_criterias import Contains, LessOrEqual, Null, StatusReachedTimeoutCriteria, StrictNotEqual
|
||||
|
||||
|
@ -364,7 +365,7 @@ class EvolutionPart:
|
|||
return illegal_fts_chars.sub(' ', misc.html2text(self.view() or ''))
|
||||
|
||||
|
||||
class AttachmentEvolutionPart(EvolutionPart):
|
||||
class AttachmentEvolutionPart(EvolutionPart, PickableClamD):
|
||||
orig_filename = None
|
||||
base_filename = None
|
||||
content_type = None
|
||||
|
@ -423,6 +424,7 @@ class AttachmentEvolutionPart(EvolutionPart):
|
|||
return open(self.get_file_path(), 'rb') # pylint: disable=consider-using-with
|
||||
|
||||
def __getstate__(self):
|
||||
super().set_waiting_scan()
|
||||
odict = self.__dict__.copy()
|
||||
if not odict.get('fp') and 'filename' not in odict:
|
||||
# we need a filename as an identifier: create one from nothing
|
||||
|
|
Loading…
Reference in New Issue