misc: audit redirects to files on remote storage (#73481)
gitea/wcs/pipeline/head This commit looks good
Details
gitea/wcs/pipeline/head This commit looks good
Details
This commit is contained in:
parent
8dd2436885
commit
60c5618065
|
@ -1,4 +1,5 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
import os
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
@ -184,6 +185,24 @@ def test_audit_journal(pub, superuser):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_audit_journal_remote_access(pub, superuser):
|
||||||
|
app = login(get_app(pub))
|
||||||
|
resp = app.get('/backoffice/journal/')
|
||||||
|
assert 'Redirect to remote stored file' not in [x[2] for x in resp.form['action'].options]
|
||||||
|
|
||||||
|
if not pub.site_options.has_section('options'):
|
||||||
|
pub.site_options.add_section('options')
|
||||||
|
pub.site_options.add_section('storage-remote')
|
||||||
|
pub.site_options.set('storage-remote', 'label', 'remote')
|
||||||
|
pub.site_options.set('storage-remote', 'class', 'wcs.qommon.upload_storage.RemoteOpaqueUploadStorage')
|
||||||
|
pub.site_options.set('storage-remote', 'ws', 'https://crypto.example.net/')
|
||||||
|
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||||
|
pub.site_options.write(fd)
|
||||||
|
|
||||||
|
resp = app.get('/backoffice/journal/')
|
||||||
|
assert 'Redirect to remote stored file' in [x[2] for x in resp.form['action'].options]
|
||||||
|
|
||||||
|
|
||||||
def test_audit_journal_access(pub, superuser):
|
def test_audit_journal_access(pub, superuser):
|
||||||
role = pub.role_class(name='foobar')
|
role = pub.role_class(name='foobar')
|
||||||
role.allows_backoffice_access = True
|
role.allows_backoffice_access = True
|
||||||
|
|
|
@ -8,8 +8,10 @@ from django.utils.encoding import force_bytes
|
||||||
from webtest import Upload
|
from webtest import Upload
|
||||||
|
|
||||||
from wcs import fields
|
from wcs import fields
|
||||||
|
from wcs.audit import Audit
|
||||||
from wcs.formdef import FormDef
|
from wcs.formdef import FormDef
|
||||||
from wcs.qommon.ident.password_accounts import PasswordAccount
|
from wcs.qommon.ident.password_accounts import PasswordAccount
|
||||||
|
from wcs.sql import Equal
|
||||||
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
||||||
|
|
||||||
from .utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
from .utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||||
|
@ -200,6 +202,12 @@ def test_form_file_field_upload_storage(wscall, pub):
|
||||||
assert 'href="download?f=0"' in resp.text
|
assert 'href="download?f=0"' in resp.text
|
||||||
assert 'href="download?f=1"' in resp.text # link is present on backoffice
|
assert 'href="download?f=1"' in resp.text # link is present on backoffice
|
||||||
|
|
||||||
|
# check access is recorded
|
||||||
|
Audit.wipe()
|
||||||
|
resp = resp.click('remote.jpg')
|
||||||
|
assert resp.status_code == 302
|
||||||
|
assert Audit.count([Equal('action', 'redirect remote stored file')]) == 1
|
||||||
|
|
||||||
# file size limit verification
|
# file size limit verification
|
||||||
formdef.fields[1].max_file_size = '1ko'
|
formdef.fields[1].max_file_size = '1ko'
|
||||||
formdef.store()
|
formdef.store()
|
||||||
|
@ -343,9 +351,12 @@ def test_remoteopaque_in_attachmentevolutionpart(wscall, pub):
|
||||||
resp = user_app.get('/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
|
resp = user_app.get('/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
|
||||||
assert resp.status_int == 302
|
assert resp.status_int == 302
|
||||||
resp = resp.follow(status=404)
|
resp = resp.follow(status=404)
|
||||||
# clic in backoffice, redirect to decryption system
|
# click in backoffice, redirect to decryption system
|
||||||
|
Audit.wipe()
|
||||||
resp = admin_app.get('/backoffice/management/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
|
resp = admin_app.get('/backoffice/management/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
|
||||||
assert resp.status_int == 302
|
assert resp.status_int == 302
|
||||||
resp = resp.follow()
|
resp = resp.follow()
|
||||||
assert resp.location.startswith('https://crypto.example.net/')
|
assert resp.location.startswith('https://crypto.example.net/')
|
||||||
assert '&signature=' in resp.location
|
assert '&signature=' in resp.location
|
||||||
|
# check access is recorded
|
||||||
|
assert Audit.count([Equal('action', 'redirect remote stored file')]) == 1
|
||||||
|
|
|
@ -77,6 +77,7 @@ class Audit(sql.Audit):
|
||||||
'export.ods': _('ODS Export'),
|
'export.ods': _('ODS Export'),
|
||||||
'download file': _('Download of attached file'),
|
'download file': _('Download of attached file'),
|
||||||
'download files': _('Download of attached files (bundle)'),
|
'download files': _('Download of attached files (bundle)'),
|
||||||
|
'redirect to remote stored file': _('Redirect to remote stored file'),
|
||||||
'view': _('View Data'),
|
'view': _('View Data'),
|
||||||
'settings': _('Change to global settings'),
|
'settings': _('Change to global settings'),
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,11 +148,14 @@ class JournalDirectory(Directory):
|
||||||
widget = form.add(StringWidget, 'object_id', title=_('Form/Card Identifier'))
|
widget = form.add(StringWidget, 'object_id', title=_('Form/Card Identifier'))
|
||||||
if not form.get_widget('object').parse():
|
if not form.get_widget('object').parse():
|
||||||
widget.is_hidden = True
|
widget.is_hidden = True
|
||||||
|
options = Audit.get_action_labels().items()
|
||||||
|
if not get_publisher().get_site_storages():
|
||||||
|
options = [x for x in options if x[0] != 'redirect to remote stored file']
|
||||||
form.add(
|
form.add(
|
||||||
SingleSelectWidget,
|
SingleSelectWidget,
|
||||||
'action',
|
'action',
|
||||||
title=_('Action'),
|
title=_('Action'),
|
||||||
options=[('', '', '')] + [(x[0], x[1], x[0]) for x in Audit.get_action_labels().items()],
|
options=[('', '', '')] + [(x[0], x[1], x[0]) for x in options],
|
||||||
)
|
)
|
||||||
form.add_submit('submit', _('Search'))
|
form.add_submit('submit', _('Search'))
|
||||||
return form
|
return form
|
||||||
|
|
|
@ -82,6 +82,7 @@ class FileDirectory(Directory):
|
||||||
if not redirect_url:
|
if not redirect_url:
|
||||||
raise errors.TraversalError()
|
raise errors.TraversalError()
|
||||||
redirect_url = sign_url_auto_orig(redirect_url)
|
redirect_url = sign_url_auto_orig(redirect_url)
|
||||||
|
audit('redirect remote stored file', obj=self.formdata, extra_label=component)
|
||||||
return redirect(redirect_url)
|
return redirect(redirect_url)
|
||||||
|
|
||||||
if not self.thumbnails:
|
if not self.thumbnails:
|
||||||
|
@ -788,6 +789,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
||||||
if not redirect_url:
|
if not redirect_url:
|
||||||
raise errors.TraversalError()
|
raise errors.TraversalError()
|
||||||
redirect_url = sign_url_auto_orig(redirect_url)
|
redirect_url = sign_url_auto_orig(redirect_url)
|
||||||
|
audit('redirect remote stored file', obj=self.filled)
|
||||||
return redirect(redirect_url)
|
return redirect(redirect_url)
|
||||||
|
|
||||||
file_url = 'files/%s/' % fn
|
file_url = 'files/%s/' % fn
|
||||||
|
|
Loading…
Reference in New Issue