misc: scan uploads with clamd (#87739)
gitea/wcs/pipeline/head This commit looks good Details

This commit is contained in:
Emmanuel Cazenave 2024-04-02 18:01:19 +02:00
parent cc0f8dda1c
commit b28cf47c7e
9 changed files with 381 additions and 2 deletions

View File

@ -1,6 +1,7 @@
import os
import re
import urllib.parse
from unittest import mock
import pytest
import responses
@ -679,3 +680,166 @@ def test_file_auto_convert_heic(pub):
resp = resp.follow()
assert resp.click('image.jpeg').follow().content_type == 'image/jpeg'
assert b'JFIF' in resp.click('image.jpeg').follow().body
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_no_clamd(pub, enable_tracking_codes):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['0'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_no_clamd(pub, enable_tracking_codes):
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['1']['data'][0]['234'].clamd == {}
assert formdata.data['1']['data'][1]['234'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
subp.run.assert_called_once_with(
['clamdscan', '--fdpass', formdata.data['0'].get_fs_filename()], check=False
)
assert formdata.data['0'].clamd == {'returncode': 0, 'waiting_scan': False}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
assert subp.run.call_count == 2
calls = [
mock.call(
['clamdscan', '--fdpass', formdata.data['1']['data'][0]['234'].get_fs_filename()], check=False
),
mock.call(
['clamdscan', '--fdpass', formdata.data['1']['data'][1]['234'].get_fs_filename()], check=False
),
]
subp.run.assert_has_calls(calls)
assert formdata.data['1']['data'][0]['234'].clamd == {'returncode': 0, 'waiting_scan': False}
assert formdata.data['1']['data'][1]['234'].clamd == {'returncode': 0, 'waiting_scan': False}

View File

@ -411,6 +411,52 @@ def test_formdata_attachment_pick_from_portfolio(pub, fargo_url):
assert 'use-file-from-fargo' in resp.text
def test_formdata_attachment_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
create_user(pub)
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
attach = st1.add_action('addattachment', id='_attach')
attach.by = ['_submitter']
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.text
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.text
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.forms[0].submit('button_attach')
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
attachment = formdata.evolution[-1].parts[0]
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
assert attachment.clamd == {'returncode': 0, 'waiting_scan': False}
def test_formdata_generated_document_download(pub):
create_user(pub)
wf = Workflow(name='status')

View File

@ -1,6 +1,7 @@
import io
import json
import os
from unittest import mock
import pytest
import responses
@ -903,3 +904,85 @@ def test_workflow_form_block_condition(pub):
assert live_resp.json['result']['blah_1-234-0']['visible'] is True
assert live_resp.json['result']['blah_1-234-1']['visible'] is False
assert live_resp.json['result']['blah_1-234-2']['visible'] is True
def test_workflow_form_file_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
Workflow.wipe()
BlockDef.wipe()
user = create_user(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='123', required=True, label='Test', varname='test'),
]
block.store()
wf = Workflow(name='test')
status = wf.add_status('New', 'st1')
status.items = []
display_form = status.add_action('form', id='_display_form')
display_form.by = ['_submitter']
display_form.varname = 'blah'
display_form.hide_submit_button = False
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar', varname='fooblock', max_items=3),
fields.FileField(id='2', label='test2', varname='file'),
]
jump = status.add_action('jumponsubmit', id='_jump')
jump.status = status.id
wf.store()
formdef = create_formdef()
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
app = login(get_app(pub), username='foo', password='foo')
resp = app.get(formdata.get_url(backoffice=False))
resp.form['fblah_1$element0$f123$file'] = Upload('test1.txt', b'foobar1', 'text/plain')
resp = resp.form.submit('fblah_1$add_element')
resp.form['fblah_1$element1$f123$file'] = Upload('test2.txt', b'foobar2', 'text/plain')
resp.form['fblah_2$file'] = Upload('test3.txt', b'foobar3', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit').follow()
assert subp.run.call_count == 3
formdata = formdef.data_class().select()[0]
data_part = formdata.evolution[0].parts[-1].data
calls = [
mock.call(
['clamdscan', '--fdpass', data_part['blah_1']['data'][0]['123'].get_fs_filename()],
check=False,
),
mock.call(
['clamdscan', '--fdpass', data_part['blah_1']['data'][1]['123'].get_fs_filename()],
check=False,
),
mock.call(['clamdscan', '--fdpass', data_part['blah_2'].get_fs_filename()], check=False),
]
subp.run.assert_has_calls(calls)

72
wcs/clamd.py Normal file
View File

@ -0,0 +1,72 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2024 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import subprocess
from quixote import get_publisher, get_response
from wcs.qommon import _
from wcs.qommon.afterjobs import AfterJob
RETURNCODE_MAP = {
0: _('No virus found'),
1: _('Virus(es) found'),
2: _('An error occurred'),
}
class PickableClamD:
@property
def clamd(self):
if getattr(self, '_clamd', None) is None:
self._clamd = {}
return self._clamd
def init_clamd(self):
if get_publisher().has_site_option('enable-clamd') and 'waiting_scan' not in self.clamd:
self.clamd['waiting_scan'] = True
class ClamDScanJob(AfterJob):
def __init__(self, formdata, **kwargs):
super().__init__(**kwargs)
self.formdata = formdata
def run_clamd(self, obj):
path = None
if hasattr(obj, 'get_fs_filename'):
path = obj.get_fs_filename()
elif hasattr(obj, 'get_file_path'):
path = obj.get_file_path()
clamd = subprocess.run(['clamdscan', '--fdpass', path], check=False)
obj.clamd['returncode'] = clamd.returncode
obj.clamd['waiting_scan'] = False
def execute(self):
store = False
for field_data in self.formdata.get_all_file_data(with_history=False):
if field_data.clamd.get('waiting_scan', False):
self.run_clamd(field_data)
store = True
if store:
self.formdata.store()
self.status = 'completed'
def add_clamd_scan_job(formdata):
if get_publisher().has_site_option('enable-clamd'):
job = get_response().add_after_job(ClamDScanJob(formdata=formdata))
job.store()

View File

@ -38,6 +38,7 @@ from quixote.util import randbytes
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.clamd import add_clamd_scan_job
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
from wcs.formdata import Evolution, FormData
from wcs.formdef import FormDef
@ -1971,6 +1972,9 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
url = filled.get_url(backoffice=True)
else:
url = filled.get_url(language=get_publisher().current_language)
add_clamd_scan_job(filled)
return redirect(url)
def cancelled(self):

View File

@ -24,13 +24,16 @@ from django.utils.module_loading import import_string
from quixote import get_publisher
from quixote.http_request import Upload
from wcs.clamd import PickableClamD
from .errors import ConnectionError
from .misc import Image, can_thumbnail, file_digest
from .storage import atomic_write
class PicklableUpload(Upload):
class PicklableUpload(Upload, PickableClamD):
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if 'fp' in odict:
del odict['fp']

View File

@ -20,6 +20,7 @@ import xml.etree.ElementTree as ET
from quixote import get_request, redirect
from wcs.clamd import add_clamd_scan_job
from wcs.forms.common import FileDirectory, FormStatusPage
from wcs.portfolio import has_portfolio, push_document
from wcs.workflows import AttachmentEvolutionPart, WorkflowStatusItem, register_item_class
@ -171,6 +172,7 @@ class AddAttachmentWorkflowStatusItem(WorkflowStatusItem):
evo_part = AttachmentEvolutionPart.from_upload(f, varname=self.varname)
evo_part.display_in_history = self.attach_to_history
evo.add_part(evo_part)
add_clamd_scan_job(formdata)
def get_parameters(self):
parameters = (

View File

@ -20,6 +20,7 @@ from quixote import get_publisher
from quixote.html import TemplateIO, htmltext
from wcs.admin.fields import FieldDefPage, FieldsDirectory
from wcs.clamd import add_clamd_scan_job
from wcs.fields import SetValueError
from wcs.formdata import get_dict_with_varnames
from wcs.formdef import FormDef
@ -384,6 +385,8 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
if button and not getattr(button, 'ignore_form_errors', False):
self.evaluate_live_form(form, formdata, user, submit=True)
formdata.store()
add_clamd_scan_job(formdata)
get_publisher().substitutions.unfeed(lambda x: x.__class__.__name__ == 'ConditionVars')
def get_parameters_view(self):

View File

@ -36,6 +36,7 @@ from quixote.html import TemplateIO, htmlescape, htmltext
import wcs.qommon.storage as st
from wcs.api_utils import is_url_signed
from wcs.clamd import PickableClamD
from wcs.qommon.storage import StorableObject, atomic_write
from wcs.sql_criterias import Contains, LessOrEqual, Null, StatusReachedTimeoutCriteria, StrictNotEqual
@ -365,7 +366,7 @@ class EvolutionPart:
return illegal_fts_chars.sub(' ', misc.html2text(self.view() or ''))
class AttachmentEvolutionPart(EvolutionPart):
class AttachmentEvolutionPart(EvolutionPart, PickableClamD):
orig_filename = None
base_filename = None
content_type = None
@ -424,6 +425,7 @@ class AttachmentEvolutionPart(EvolutionPart):
return open(self.get_file_path(), 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if not odict.get('fp') and 'filename' not in odict:
# we need a filename as an identifier: create one from nothing