WIP: misc: scan uploads with clamd (#87739) #1415

Draft
ecazenave wants to merge 2 commits from wip/87739-clamav-cli into main
12 changed files with 485 additions and 3 deletions

3
debian/control vendored
View File

@ -53,7 +53,8 @@ Recommends: graphicsmagick,
python3-magic,
python3-qrcode,
python3-workalendar,
Suggests: python3-libxml2,
Suggests: clamdscan,
python3-libxml2,
Description: web application to design and set up online forms
w.c.s. is a web application which allows to design and set up online forms.
.

View File

@ -1,6 +1,7 @@
import os
import re
import urllib.parse
from unittest import mock
import pytest
import responses
@ -704,3 +705,159 @@ def test_file_auto_convert_heic(pub):
resp = resp.follow()
assert resp.click('image.jpeg').follow().content_type == 'image/jpeg'
assert b'JFIF' in resp.click('image.jpeg').follow().body
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_no_clamd(pub, enable_tracking_codes):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['0'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_no_clamd(pub, enable_tracking_codes):
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['1']['data'][0]['234'].clamd == {}
assert formdata.data['1']['data'][1]['234'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
subp.run.assert_called_once_with(
['clamdscan', '--fdpass', formdata.data['0'].get_fs_filename()], check=False
)
assert formdata.data['0'].clamd == {'returncode': 0, 'waiting_scan': False}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
assert subp.run.call_count == 2
for file_data in formdata.get_all_file_data(with_history=False):
assert file_data.clamd['waiting_scan'] is False
assert file_data.clamd['returncode'] == 0
subp.run.assert_any_call(['clamdscan', '--fdpass', file_data.get_fs_filename()], check=False)

View File

@ -411,6 +411,122 @@ def test_formdata_attachment_pick_from_portfolio(pub, fargo_url):
assert 'use-file-from-fargo' in resp.text
def test_formdata_attachment_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
create_user(pub)
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
attach = st1.add_action('addattachment', id='_attach')
attach.by = ['_submitter']
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.text
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.text
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.forms[0].submit('button_attach')
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
attachment = formdata.evolution[-1].parts[0]
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
assert attachment.clamd == {'returncode': 0, 'waiting_scan': False}
def test_formdata_attachment_clamd_download(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
create_user(pub)
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
attach = st1.add_action('addattachment', id='_attach')
attach.by = ['_submitter']
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.text
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.text
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=1)}
subp.configure_mock(**attrs)
resp = resp.forms[0].submit('button_attach')
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
attachment = formdata.evolution[-1].parts[0]
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
assert attachment.clamd == {'returncode': 1, 'waiting_scan': False}
resp = resp.follow() # back to form page
malware_resp = resp.click('test.txt')
assert 'A malware was detected in this file.' in malware_resp.text
resp.forms[0]['attachment_attach$file'] = Upload('test1.txt', b'foobaz', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.forms[0].submit('button_attach')
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
attachment = formdata.evolution[-1].parts[0]
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
assert attachment.clamd == {'returncode': 0, 'waiting_scan': False}
resp = resp.follow() # back to form page
resp = resp.click('test1.txt')
assert resp.location.endswith('/test1.txt')
resp = resp.follow()
assert resp.content_type == 'text/plain'
assert resp.text == 'foobaz'
def test_formdata_generated_document_download(pub):
create_user(pub)
wf = Workflow(name='status')

View File

@ -1,6 +1,7 @@
import io
import json
import os
from unittest import mock
import pytest
import responses
@ -917,3 +918,76 @@ def test_workflow_form_block_condition(pub):
assert live_resp.json['result'][f'blah_{display_form.id}_1-234-0']['visible'] is True
assert live_resp.json['result'][f'blah_{display_form.id}_1-234-1']['visible'] is False
assert live_resp.json['result'][f'blah_{display_form.id}_1-234-2']['visible'] is True
def test_workflow_form_file_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
Workflow.wipe()
BlockDef.wipe()
user = create_user(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='123', required=True, label='Test', varname='test'),
]
block.store()
wf = Workflow(name='test')
status = wf.add_status('New', 'st1')
status.items = []
display_form = status.add_action('form', id='_display_form')
display_form.by = ['_submitter']
display_form.varname = 'blah'
display_form.hide_submit_button = False
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar', varname='fooblock', max_items=3),
fields.FileField(id='2', label='test2', varname='file'),
]
jump = status.add_action('jumponsubmit', id='_jump')
jump.status = status.id
wf.store()
formdef = create_formdef()
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
app = login(get_app(pub), username='foo', password='foo')
resp = app.get(formdata.get_url(backoffice=False))
resp.form[f'fblah_{display_form.id}_1$element0$f123$file'] = Upload('test1.txt', b'foobar1', 'text/plain')
resp = resp.form.submit(f'fblah_{display_form.id}_1$add_element')
resp.form[f'fblah_{display_form.id}_1$element1$f123$file'] = Upload('test2.txt', b'foobar2', 'text/plain')
resp.form[f'fblah_{display_form.id}_2$file'] = Upload('test3.txt', b'foobar3', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit').follow()
assert subp.run.call_count == 6 # 3 files but each file is stored in a part and in workflow_data
formdata = formdef.data_class().select()[0]
for file_data in formdata.get_all_file_data(with_history=False):
assert file_data.clamd['waiting_scan'] is False
assert file_data.clamd['returncode'] == 0
subp.run.assert_any_call(['clamdscan', '--fdpass', file_data.get_fs_filename()], check=False)

105
wcs/clamd.py Normal file
View File

@ -0,0 +1,105 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2024 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import subprocess
from quixote import get_publisher, get_response
from wcs.qommon import _, template
from wcs.qommon.afterjobs import AfterJob
RETURNCODE_MAP = {
0: _('No virus found'),
1: _('Virus(es) found'),
2: _('An error occurred'),
}
class PickableClamD:
@property
def clamd(self):
if getattr(self, '_clamd', None) is None:
self._clamd = {}
return self._clamd
def init_clamd(self):
if get_publisher().has_site_option('enable-clamd') and 'waiting_scan' not in self.clamd:
self.clamd['waiting_scan'] = True
def is_waiting_for_scan(self):
if not get_publisher().has_site_option('enable-clamd'):
return False
return self.clamd.get('waiting_scan', False)
def has_malware(self):
if not get_publisher().has_site_option('enable-clamd'):
return False
if 'waiting_scan' not in self.clamd:
# files uploaded before clamd was enabled
return False
if self.clamd['waiting_scan']:
# file not scanned yet
return False
if self.clamd['returncode'] == 0:
return False
# scan dit not run properly or detected a virus
return True
def allow_download(self):
return not (self.is_waiting_for_scan() or self.has_malware())
class ClamDScanJob(AfterJob):
def __init__(self, formdata, **kwargs):
super().__init__(
formdef_class=formdata.formdef.__class__, formdef_id=formdata.formdef.id, formdata_id=formdata.id
)
def execute(self):
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
formdata = formdef.data_class().get(self.kwargs['formdata_id'])
store = False
for file_data in formdata.get_all_file_data(with_history=False):
if file_data.clamd.get('waiting_scan', False):
path = (
file_data.get_fs_filename()
if hasattr(file_data, 'get_fs_filename')
else file_data.get_file_path()
)
clamd = subprocess.run(['clamdscan', '--fdpass', path], check=False)
file_data.clamd['returncode'] = clamd.returncode
file_data.clamd['waiting_scan'] = False
store = True
if store:
formdata._store_all_evolution = True
formdata.store()
def add_clamd_scan_job(formdata):
if get_publisher().has_site_option('enable-clamd'):
job = get_response().add_after_job(ClamDScanJob(formdata=formdata))
job.store()
def malware_response(file_data):
return template.QommonTemplateResponse(
templates=['wcs/malware.html'],
context={
'is_waiting_for_scan': file_data.is_waiting_for_scan(),
'has_malware': file_data.has_malware(),
},
is_django_native=True,
)

View File

@ -31,6 +31,7 @@ from quixote.util import randbytes
from wcs import data_sources
from wcs.api_utils import get_query_flag, get_user_from_api_query_string, is_url_signed, sign_url_auto_orig
from wcs.blocks import BlockSubWidget, BlockWidget
from wcs.clamd import malware_response
from wcs.fields import FileField
from wcs.qommon.admin.texts import TextsDirectory
from wcs.qommon.upload_storage import get_storage_object
@ -807,6 +808,8 @@ class FormStatusPage(Directory, FormTemplateMixin):
extra_label=str(field_data),
file_digest=file_digest,
)
if not field_data.allow_download():
return malware_response(field_data)
return FileDirectory.serve_file(field_data, thumbnail=thumbnail)
elif get_request().form and get_request().form.get('f'):
try:
@ -823,6 +826,9 @@ class FormStatusPage(Directory, FormTemplateMixin):
if not hasattr(file, 'content_type'):
raise errors.TraversalError()
if not file.allow_download():
return malware_response(file)
if file.has_redirect_url():
redirect_url = file.get_redirect_url(backoffice=get_request().is_in_backoffice())
if not redirect_url:

View File

@ -38,6 +38,7 @@ from quixote.util import randbytes
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.clamd import add_clamd_scan_job
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
from wcs.formdata import Evolution, FormData
from wcs.formdef import FormDef
@ -1971,6 +1972,9 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
url = filled.get_url(backoffice=True)
else:
url = filled.get_url(language=get_publisher().current_language)
add_clamd_scan_job(filled)
return redirect(url)
def cancelled(self):

View File

@ -24,18 +24,21 @@ from django.utils.module_loading import import_string
from quixote import get_publisher
from quixote.http_request import Upload
from wcs.clamd import PickableClamD
from .errors import ConnectionError
from .misc import Image, can_thumbnail, file_digest
from .storage import atomic_write
class PicklableUpload(Upload):
class PicklableUpload(Upload, PickableClamD):
def __init__(self, orig_filename, content_type=None, charset=None):
if orig_filename:
orig_filename = orig_filename.strip()
super().__init__(orig_filename, content_type=content_type, charset=charset)
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if 'fp' in odict:
del odict['fp']

View File

@ -0,0 +1,7 @@
{% load i18n %}
<html>
<body>
<h1>{% if is_waiting_for_scan %}{% trans "The file is waiting to be checked for malware." %}{% elif has_malware %}{% trans "A malware was detected in this file." %}{% endif %}</h1>
</body>
</html>

View File

@ -20,6 +20,7 @@ import xml.etree.ElementTree as ET
from quixote import get_request, redirect
from wcs.clamd import add_clamd_scan_job, malware_response
from wcs.forms.common import FileDirectory, FormStatusPage
from wcs.portfolio import has_portfolio, push_document
from wcs.workflows import AttachmentEvolutionPart, WorkflowStatusItem, register_item_class
@ -71,6 +72,8 @@ def form_attachment(self):
if not isinstance(p, AttachmentEvolutionPart):
continue
if os.path.basename(p.filename) == fn:
if not p.allow_download():
return malware_response(p)
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice())
return redirect(
'%sfiles/attachment-%s/%s'
@ -171,6 +174,7 @@ class AddAttachmentWorkflowStatusItem(WorkflowStatusItem):
evo_part = AttachmentEvolutionPart.from_upload(f, varname=self.varname)
evo_part.display_in_history = self.attach_to_history
evo.add_part(evo_part)
add_clamd_scan_job(formdata)
def get_parameters(self):
parameters = (

View File

@ -20,6 +20,7 @@ from quixote import get_publisher
from quixote.html import TemplateIO, htmltext
from wcs.admin.fields import FieldDefPage, FieldsDirectory
from wcs.clamd import add_clamd_scan_job
from wcs.fields import SetValueError
from wcs.formdata import get_dict_with_varnames
from wcs.formdef import FormDef
@ -384,6 +385,8 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
if button and not getattr(button, 'ignore_form_errors', False):
self.evaluate_live_form(form, formdata, user, submit=True)
formdata.store()
add_clamd_scan_job(formdata)
get_publisher().substitutions.unfeed(lambda x: x.__class__.__name__ == 'ConditionVars')
def get_parameters_view(self):

View File

@ -36,6 +36,7 @@ from quixote.html import TemplateIO, htmlescape, htmltext
import wcs.qommon.storage as st
from wcs.api_utils import is_url_signed
from wcs.clamd import PickableClamD
from wcs.qommon.storage import StorableObject, atomic_write
from wcs.sql_criterias import Contains, LessOrEqual, Null, StatusReachedTimeoutCriteria, StrictNotEqual
@ -365,7 +366,7 @@ class EvolutionPart:
return illegal_fts_chars.sub(' ', misc.html2text(self.view() or ''))
class AttachmentEvolutionPart(EvolutionPart):
class AttachmentEvolutionPart(EvolutionPart, PickableClamD):
orig_filename = None
base_filename = None
content_type = None
@ -424,6 +425,7 @@ class AttachmentEvolutionPart(EvolutionPart):
return open(self.get_file_path(), 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if not odict.get('fp') and 'filename' not in odict:
# we need a filename as an identifier: create one from nothing