forms: add direct download of files from workflow forms (#59672)
This commit is contained in:
parent
9e3c9ec3a9
commit
c8519aaa48
|
@ -9051,6 +9051,86 @@ def test_workflow_form_structured_data(pub):
|
|||
assert not formdata.workflow_data
|
||||
|
||||
|
||||
def test_workflow_form_file_access(pub):
|
||||
FormDef.wipe()
|
||||
Workflow.wipe()
|
||||
BlockDef.wipe()
|
||||
|
||||
user = create_user(pub)
|
||||
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [
|
||||
fields.FileField(id='123', required=True, label='Test', type='file', varname='test'),
|
||||
]
|
||||
block.store()
|
||||
|
||||
wf = Workflow(name='test')
|
||||
status = wf.add_status('New', 'st1')
|
||||
next_status = wf.add_status('Next', 'st2')
|
||||
|
||||
status.items = []
|
||||
display_form = status.add_action('form', id='_display_form')
|
||||
display_form.by = ['_submitter']
|
||||
display_form.varname = 'blah'
|
||||
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
||||
display_form.formdef.fields = [
|
||||
fields.BlockField(id='1', label='test', type='block:foobar', varname='fooblock', max_items=3),
|
||||
fields.FileField(id='2', label='test2', type='file', varname='file'),
|
||||
]
|
||||
|
||||
jump = status.add_action('jumponsubmit', id='_jump')
|
||||
jump.status = next_status.id
|
||||
|
||||
register_comment = next_status.add_action('register-comment', id='_register')
|
||||
register_comment.comment = '''<p>
|
||||
<a href="{{ form_workflow_form_blah_var_fooblock_0_test_url }}" id="t1">1st file in block</a>
|
||||
<a href="{{ form_workflow_form_blah_var_fooblock_1_test_url }}" id="t2">2nd file in block</a>
|
||||
<a href="{{ form_workflow_form_blah_0_var_fooblock_0_test_url }}" id="t3">again 1st file in block</a>
|
||||
<a href="{{ form_workflow_form_blah_0_var_file_url }}" id="t4">file field</a>
|
||||
</p>'''
|
||||
|
||||
wf.store()
|
||||
|
||||
formdef = create_formdef()
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
|
||||
formdef.data_class().wipe()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.user_id = user.id
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
app = login(get_app(pub), username='foo', password='foo')
|
||||
resp = app.get(formdata.get_url(backoffice=False))
|
||||
resp.form['fblah_1$element0$f123$file'] = Upload('test1.txt', b'foobar1', 'text/plain')
|
||||
resp = resp.form.submit('fblah_1$add_element')
|
||||
resp.form['fblah_1$element1$f123$file'] = Upload('test2.txt', b'foobar2', 'text/plain')
|
||||
resp.form['fblah_2$file'] = Upload('test3.txt', b'foobar3', 'text/plain')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert app.get(resp.pyquery('#t1').attr.href).body == b'foobar1'
|
||||
assert app.get(resp.pyquery('#t2').attr.href).body == b'foobar2'
|
||||
assert app.get(resp.pyquery('#t3').attr.href).body == b'foobar1'
|
||||
assert app.get(resp.pyquery('#t4').attr.href).body == b'foobar3'
|
||||
app.get(resp.pyquery('#t4').attr.href + 'X', status=404) # wrong URL, unknown file
|
||||
|
||||
# unlogged user
|
||||
assert '/login' in get_app(pub).get(resp.pyquery('#t1').attr.href).location
|
||||
|
||||
# other user
|
||||
user = pub.user_class()
|
||||
user.name = 'Second user'
|
||||
user.store()
|
||||
account = PasswordAccount(id='foo2')
|
||||
account.set_password('foo2')
|
||||
account.user_id = user.id
|
||||
account.store()
|
||||
login(get_app(pub), username='foo2', password='foo2').get(resp.pyquery('#t1').attr.href, status=403)
|
||||
|
||||
|
||||
def test_rich_commentable_action(pub):
|
||||
create_user(pub)
|
||||
|
||||
|
|
|
@ -1485,6 +1485,8 @@ class FileField(WidgetField):
|
|||
return self.get_view_value(value, include_image_thumbnail=False, max_len=max_len, **kwargs)
|
||||
|
||||
def get_download_query_string(self, **kwargs):
|
||||
if kwargs.get('file_value'):
|
||||
return 'hash=%s' % kwargs.get('file_value').file_digest()
|
||||
if kwargs.get('parent_field'):
|
||||
return 'f=%s$%s$%s' % (kwargs['parent_field'].id, kwargs['parent_field_index'], self.id)
|
||||
return 'f=%s' % self.id
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
import collections
|
||||
import copy
|
||||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
|
@ -393,6 +394,26 @@ class FormData(StorableObject):
|
|||
empty &= self.data.get(key) is None
|
||||
return empty
|
||||
|
||||
def get_all_file_data(self):
|
||||
from wcs.wf.form import WorkflowFormEvolutionPart
|
||||
|
||||
for field_data in itertools.chain((self.data or {}).values(), (self.workflow_data or {}).values()):
|
||||
if misc.is_upload(field_data):
|
||||
yield field_data
|
||||
elif isinstance(field_data, dict) and isinstance(field_data.get('data'), list):
|
||||
for subfield_rowdata in field_data.get('data'):
|
||||
if isinstance(subfield_rowdata, dict):
|
||||
for block_field_data in subfield_rowdata.values():
|
||||
if misc.is_upload(block_field_data):
|
||||
yield block_field_data
|
||||
for part in self.iter_evolution_parts():
|
||||
if misc.is_attachment(part):
|
||||
yield part
|
||||
elif isinstance(part, WorkflowFormEvolutionPart):
|
||||
for field_data in (part.data or {}).values():
|
||||
if misc.is_upload(field_data):
|
||||
yield field_data
|
||||
|
||||
@classmethod
|
||||
def get_actionable_count(cls, user_roles):
|
||||
if get_publisher().is_using_postgresql():
|
||||
|
|
|
@ -42,7 +42,7 @@ from .qommon import PICKLE_KWARGS, _, force_str, get_cfg
|
|||
from .qommon.admin.emails import EmailsDirectory
|
||||
from .qommon.cron import CronJob
|
||||
from .qommon.form import Form, HtmlWidget, UploadedFile
|
||||
from .qommon.misc import JSONEncoder, get_as_datetime, simplify, xml_node_text
|
||||
from .qommon.misc import JSONEncoder, get_as_datetime, is_attachment, is_upload, simplify, xml_node_text
|
||||
from .qommon.publisher import get_publisher_class
|
||||
from .qommon.storage import Equal, StorableObject, StrictNotEqual, fix_key
|
||||
from .qommon.substitution import Substitutions
|
||||
|
@ -2069,41 +2069,19 @@ def clean_unused_files(publisher, **kwargs):
|
|||
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
|
||||
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
|
||||
|
||||
def is_upload(obj):
|
||||
# we can't use isinstance() because obj can be a
|
||||
# wcs.qommon.form.PicklableUpload or a qommon.form.PicklableUpload
|
||||
return obj.__class__.__name__ == 'PicklableUpload'
|
||||
|
||||
def is_attachment(obj):
|
||||
return obj.__class__.__name__ == 'AttachmentEvolutionPart'
|
||||
|
||||
def accumulate_filenames():
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.wf.form import WorkflowFormEvolutionPart
|
||||
|
||||
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
|
||||
for option_data in (formdef.workflow_options or {}).values():
|
||||
if is_upload(option_data):
|
||||
yield option_data.get_fs_filename()
|
||||
for formdata in formdef.data_class().select_iterator(ignore_errors=True, itersize=200):
|
||||
for field_data in itertools.chain(
|
||||
(formdata.data or {}).values(), (formdata.workflow_data or {}).values()
|
||||
):
|
||||
for field_data in formdata.get_all_file_data():
|
||||
if is_upload(field_data):
|
||||
yield field_data.get_fs_filename()
|
||||
elif isinstance(field_data, dict) and isinstance(field_data.get('data'), list):
|
||||
for subfield_rowdata in field_data.get('data'):
|
||||
if isinstance(subfield_rowdata, dict):
|
||||
for block_field_data in subfield_rowdata.values():
|
||||
if is_upload(block_field_data):
|
||||
yield block_field_data.get_fs_filename()
|
||||
for part in formdata.iter_evolution_parts():
|
||||
if is_attachment(part):
|
||||
yield part.filename
|
||||
elif isinstance(part, WorkflowFormEvolutionPart):
|
||||
for field_data in (part.data or {}).values():
|
||||
if is_upload(field_data):
|
||||
yield field_data.get_fs_filename()
|
||||
elif is_attachment(field_data):
|
||||
yield field_data.filename
|
||||
for user in publisher.user_class.select():
|
||||
for field_data in (user.form_data or {}).values():
|
||||
if is_upload(field_data):
|
||||
|
|
|
@ -77,14 +77,18 @@ class FileDirectory(Directory):
|
|||
redirect_url = sign_url_auto_orig(redirect_url)
|
||||
return redirect(redirect_url)
|
||||
|
||||
return self.serve_file(file, thumbnail=self.thumbnails)
|
||||
|
||||
@classmethod
|
||||
def serve_file(cls, file, thumbnail=False):
|
||||
response = get_response()
|
||||
|
||||
if self.thumbnails:
|
||||
if thumbnail:
|
||||
if file.can_thumbnail():
|
||||
try:
|
||||
thumbnail = misc.get_thumbnail(file.get_fs_filename(), content_type=file.content_type)
|
||||
content = misc.get_thumbnail(file.get_fs_filename(), content_type=file.content_type)
|
||||
response.set_content_type('image/png')
|
||||
return thumbnail
|
||||
return content
|
||||
except misc.ThumbnailError:
|
||||
raise errors.TraversalError()
|
||||
else:
|
||||
|
@ -698,16 +702,26 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
|||
def download(self):
|
||||
if not is_url_signed():
|
||||
self.check_receiver()
|
||||
try:
|
||||
fn = get_request().form['f']
|
||||
if '$' in fn:
|
||||
# path to block field contents
|
||||
fn2, idx, sub = fn.split('$', 2)
|
||||
file = self.filled.data[fn2]['data'][int(idx)][sub]
|
||||
else:
|
||||
file = self.filled.data[fn]
|
||||
except (KeyError, ValueError):
|
||||
raise errors.TraversalError()
|
||||
file = None
|
||||
if get_request().form and get_request().form.get('hash'):
|
||||
# look in all known formdata files for file with given hash
|
||||
file_digest = get_request().form.get('hash')
|
||||
for field_data in self.filled.get_all_file_data():
|
||||
if not hasattr(field_data, 'file_digest'):
|
||||
continue
|
||||
if field_data.file_digest() == file_digest:
|
||||
return FileDirectory.serve_file(field_data)
|
||||
elif get_request().form and get_request().form.get('f'):
|
||||
try:
|
||||
fn = get_request().form['f']
|
||||
if '$' in fn:
|
||||
# path to block field contents
|
||||
fn2, idx, sub = fn.split('$', 2)
|
||||
file = self.filled.data[fn2]['data'][int(idx)][sub]
|
||||
else:
|
||||
file = self.filled.data[fn]
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
|
||||
if not hasattr(file, 'content_type'):
|
||||
raise errors.TraversalError()
|
||||
|
|
|
@ -1036,6 +1036,17 @@ def get_order_by_or_400(value):
|
|||
return value
|
||||
|
||||
|
||||
def is_upload(obj):
|
||||
# we can't use isinstance() because obj can be a
|
||||
# wcs.qommon.form.PicklableUpload or a qommon.form.PicklableUpload
|
||||
return obj.__class__.__name__ == 'PicklableUpload'
|
||||
|
||||
|
||||
def is_attachment(obj):
|
||||
# ditto
|
||||
return obj.__class__.__name__ == 'AttachmentEvolutionPart'
|
||||
|
||||
|
||||
class QLookupRedirect:
|
||||
"""
|
||||
Class to use to interrupt a _q_lookup method and redirect.
|
||||
|
|
|
@ -53,6 +53,12 @@ class PicklableUpload(Upload):
|
|||
self.fp = io.BytesIO(self.data)
|
||||
del self.data
|
||||
|
||||
def file_digest(self):
|
||||
if getattr(self, 'qfilename', None):
|
||||
# last file part is created using misc.file_digest()
|
||||
return self.qfilename.split('/')[-1]
|
||||
return None
|
||||
|
||||
def get_file(self):
|
||||
# quack like UploadedFile
|
||||
return self.get_file_pointer()
|
||||
|
@ -180,6 +186,9 @@ class RemoteOpaqueUploadStorage:
|
|||
self.frontoffice_redirect = bool(frontoffice_redirect == 'true')
|
||||
self.backoffice_redirect = bool(backoffice_redirect == 'true')
|
||||
|
||||
def file_digest(self):
|
||||
return None
|
||||
|
||||
def save_tempfile(self, upload):
|
||||
if getattr(upload, 'storage_attrs', None):
|
||||
# upload is already a remote PicklableUpload, it does not
|
||||
|
|
|
@ -699,10 +699,11 @@ class LazyFormData(LazyFormDef):
|
|||
|
||||
|
||||
class LazyFormDataVar:
|
||||
def __init__(self, fields, data, formdata=None):
|
||||
def __init__(self, fields, data, formdata=None, base_formdata=None):
|
||||
self._fields = fields
|
||||
self._data = data or {}
|
||||
self._formdata = formdata
|
||||
self._base_formdata = base_formdata
|
||||
|
||||
def inspect_keys(self):
|
||||
return self.varnames.keys()
|
||||
|
@ -729,7 +730,12 @@ class LazyFormDataVar:
|
|||
return self._varnames
|
||||
|
||||
def get_field_kwargs(self, field):
|
||||
return {'data': self._data, 'field': field, 'formdata': self._formdata}
|
||||
return {
|
||||
'data': self._data,
|
||||
'field': field,
|
||||
'formdata': self._formdata,
|
||||
'base_formdata': self._base_formdata,
|
||||
}
|
||||
|
||||
def __getitem__(self, key):
|
||||
try:
|
||||
|
@ -789,10 +795,11 @@ class LazyFormDataVar:
|
|||
|
||||
|
||||
class LazyFieldVar:
|
||||
def __init__(self, data, field, formdata=None, **kwargs):
|
||||
def __init__(self, data, field, formdata=None, base_formdata=None, **kwargs):
|
||||
self._data = data
|
||||
self._field = field
|
||||
self._formdata = formdata
|
||||
self._base_formdata = base_formdata
|
||||
self._field_kwargs = kwargs
|
||||
|
||||
@property
|
||||
|
@ -1155,27 +1162,33 @@ class LazyFieldVarPassword(LazyFieldVar):
|
|||
class LazyFieldVarFile(LazyFieldVar):
|
||||
def inspect_keys(self):
|
||||
keys = ['raw']
|
||||
if hasattr(self._formdata, 'get_file_base_url'):
|
||||
if hasattr(self._formdata, 'get_file_base_url') or self._base_formdata:
|
||||
keys.append('url')
|
||||
return keys
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
if not hasattr(self._formdata, 'get_file_base_url'):
|
||||
if 'url' not in self.inspect_keys():
|
||||
return None
|
||||
if self._base_formdata:
|
||||
return self._field.get_download_url(formdata=self._base_formdata, file_value=self.raw)
|
||||
return self._field.get_download_url(formdata=self._formdata, **self._field_kwargs)
|
||||
|
||||
|
||||
class LazyBlockDataVar(LazyFormDataVar):
|
||||
def __init__(self, fields, data, formdata=None, parent_field=None, parent_field_index=0):
|
||||
def __init__(
|
||||
self, fields, data, formdata=None, parent_field=None, parent_field_index=0, base_formdata=None
|
||||
):
|
||||
super().__init__(fields, data, formdata=formdata)
|
||||
self.parent_field = parent_field
|
||||
self.parent_field_index = parent_field_index
|
||||
self.base_formdata = base_formdata
|
||||
|
||||
def get_field_kwargs(self, field):
|
||||
kwargs = super().get_field_kwargs(field)
|
||||
kwargs['parent_field'] = self.parent_field
|
||||
kwargs['parent_field_index'] = self.parent_field_index
|
||||
kwargs['base_formdata'] = self.base_formdata
|
||||
return kwargs
|
||||
|
||||
|
||||
|
@ -1205,6 +1218,7 @@ class LazyFieldVarBlock(LazyFieldVar):
|
|||
formdata=self._formdata,
|
||||
parent_field=self._field,
|
||||
parent_field_index=int(key),
|
||||
base_formdata=self._base_formdata,
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
|
|
|
@ -331,7 +331,7 @@ class LazyFormDataWorkflowForms:
|
|||
if not isinstance(part, WorkflowFormEvolutionPart):
|
||||
continue
|
||||
if part.varname == varname and part.data:
|
||||
wfform_formdatas.append(LazyFormDataWorkflowFormsItem(part))
|
||||
wfform_formdatas.append(LazyFormDataWorkflowFormsItem(part, base_formdata=self._formdata))
|
||||
if wfform_formdatas:
|
||||
return LazyFormDataWorkflowFormsItems(wfform_formdatas)
|
||||
raise AttributeError(varname)
|
||||
|
@ -372,9 +372,10 @@ class LazyFormDataWorkflowFormsItems:
|
|||
|
||||
|
||||
class LazyFormDataWorkflowFormsItem:
|
||||
def __init__(self, part):
|
||||
def __init__(self, part, base_formdata):
|
||||
self._part = part
|
||||
self.data = part.data
|
||||
self.base_formdata = base_formdata
|
||||
|
||||
def inspect_keys(self):
|
||||
return ['var']
|
||||
|
@ -382,4 +383,9 @@ class LazyFormDataWorkflowFormsItem:
|
|||
@property
|
||||
def var(self):
|
||||
# pass self as formdata, it will be used to access self.data in LazyFieldVarBlock
|
||||
return LazyFormDataVar(self._part.formdef.get_all_fields(), self._part.data, formdata=self)
|
||||
return LazyFormDataVar(
|
||||
self._part.formdef.get_all_fields(),
|
||||
self._part.data,
|
||||
formdata=self,
|
||||
base_formdata=self.base_formdata,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue