misc: accept non-local storage as attachments in evolution (#47704)

This commit is contained in:
Thomas NOËL 2020-10-15 15:45:18 +02:00
parent fd9afcc57c
commit 015af97502
3 changed files with 113 additions and 22 deletions

View File

@ -11,6 +11,7 @@ from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.formdef import FormDef
from wcs.categories import Category
from wcs import fields
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
@ -221,3 +222,67 @@ def test_form_file_field_upload_storage(wscall, pub):
'redirect_url': 'https://crypto.example.net/',
'file_size': 1834
}
@mock.patch('wcs.wscalls.call_webservice')
def test_remoteopaque_in_attachmentevolutionpart(wscall, pub):
create_user_and_admin(pub)
formdef = create_formdef()
formdef.fields[1].storage = 'remote-bo'
formdef.store()
formdef.data_class().wipe()
wscall.return_value = None, 200, json.dumps(
{"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}})
image_content = open(os.path.join(os.path.dirname(__file__),
'image-with-gps-data.jpeg'), 'rb').read()
upload_0 = Upload('local-file.jpg', image_content, 'image/jpeg')
upload_1 = Upload('remote-file.jpg', image_content, 'image/jpeg')
user_app = login(get_app(pub), username='foo', password='foo')
admin_app = login(get_app(pub), username='admin', password='admin')
resp = user_app.get('/test/')
resp.forms[0]['f0$file'] = upload_0
resp.forms[0]['f1$file'] = upload_1
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.text
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.text
# register a comment = create a AttachmentEvolutionPart
formdata = formdef.data_class().select()[0]
item = RegisterCommenterWorkflowStatusItem()
item.attachments = ['form_var_file_raw', 'form_var_remote_file_raw']
item.comment = 'text in form history'
item.perform(formdata)
# links on frontoffice: no link to remote file
resp = user_app.get('/test/%s/' % formdata.id)
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 1
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 0
# links on backoffice: links to local and remote file
resp = admin_app.get('/backoffice/management/test/%s/' % formdata.id)
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 2
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 1
local_file = formdata.evolution[-1].parts[0]
local_file_id = os.path.basename(local_file.filename)
remote_file = formdata.evolution[-1].parts[1]
remote_file_id = remote_file.filename
assert not local_file_id.startswith('uuid-')
assert remote_file_id.startswith('uuid-')
# clic on remote file in frontoffice: redirect... but forbidden
resp = user_app.get('/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
assert resp.status_int == 302
resp = resp.follow(status=404)
# clic in backoffice, redirect to decryption system
resp = admin_app.get('/backoffice/management/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
assert resp.status_int == 302
resp = resp.follow()
assert resp.location.startswith('https://crypto.example.net/')
assert '&signature=' in resp.location

View File

@ -41,7 +41,7 @@ class PicklableUpload(Upload):
def get_file_pointer(self):
if 'fp' in self.__dict__ and self.__dict__.get('fp') is not None:
return self.__dict__.get('fp')
elif hasattr(self, 'qfilename'):
elif getattr(self, 'qfilename', None):
basedir = os.path.join(get_publisher().app_dir, 'uploads')
self.fp = open(os.path.join(basedir, self.qfilename), 'rb')
return self.fp

View File

@ -39,6 +39,7 @@ from .qommon import emails, get_cfg, get_logger
from quixote.html import htmltext
from .qommon import errors
from .qommon.template import Template, TemplateError
from .qommon.upload_storage import PicklableUpload, get_storage_object
from .conditions import Condition
from .roles import Role, logged_users_role, get_user_roles
@ -117,7 +118,10 @@ class AttachmentSubstitutionProxy(object):
@property
def content(self):
return self.attachment_evolution_part.get_file_pointer().read()
fp = self.attachment_evolution_part.get_file_pointer()
if fp:
return fp.read()
return b''
@property
def b64_content(self):
@ -168,32 +172,45 @@ class AttachmentEvolutionPart: #pylint: disable=C1001
content_type = None
charset = None
varname = None
storage = None
storage_attrs = None
def __init__(self, base_filename, fp, orig_filename=None, content_type=None,
charset=None, varname=None):
charset=None, varname=None, storage=None, storage_attrs=None):
self.base_filename = base_filename
self.orig_filename = orig_filename or base_filename
self.content_type = content_type
self.charset = charset
self.fp = fp
self.varname = varname
self.storage = storage
self.storage_attrs = storage_attrs
@classmethod
def from_upload(cls, upload, varname=None):
return AttachmentEvolutionPart(
upload.base_filename,
upload.fp,
getattr(upload, 'fp', None),
upload.orig_filename,
upload.content_type,
upload.charset,
varname=varname)
varname=varname,
storage=getattr(upload, 'storage', None),
storage_attrs=getattr(upload, 'storage_attrs', None))
def get_file_pointer(self):
if self.filename.startswith('uuid-'):
return None
return open(self.filename, 'rb')
def __getstate__(self):
odict = self.__dict__.copy()
if not 'fp' in odict:
if not odict.get('fp'):
if 'filename' not in odict:
# we need a filename as an identifier: create one from nothing
# instead of file_digest(self.fp) (see below)
odict['filename'] = 'uuid-%s' % uuid.uuid4()
self.filename = odict['filename']
return odict
del odict['fp']
@ -201,7 +218,8 @@ class AttachmentEvolutionPart: #pylint: disable=C1001
if not os.path.exists(dirname):
os.mkdir(dirname)
if not 'filename' in odict:
# there is not filename, or it was a temporary one: create it
if not 'filename' in odict or odict['filename'].startswith('uuid-'):
filename = file_digest(self.fp)
dirname = os.path.join(dirname, filename[:4])
if not os.path.exists(dirname):
@ -213,8 +231,15 @@ class AttachmentEvolutionPart: #pylint: disable=C1001
return odict
def view(self):
return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a>' % (
os.path.basename(self.filename), self.orig_filename))
show_link = True
if self.has_redirect_url():
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice())
show_link = bool(self.get_redirect_url(backoffice=is_in_backoffice))
if show_link:
return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>' % (
os.path.basename(self.filename), self.orig_filename))
else:
return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename)
@classmethod
def get_substitution_variables(cls, formdata):
@ -224,14 +249,14 @@ class AttachmentEvolutionPart: #pylint: disable=C1001
# mimic PicklableUpload methods:
def can_thumbnail(self):
return True
return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self)
def has_redirect_url(self):
return False
return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self)
def get_redirect_url(self, upload, backoffice=False):
# should never be called, has_redirect_url is False
raise AssertionError('no get_redirect_url on AttachmentEvolutionPart object')
def get_redirect_url(self, backoffice=False):
return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(self,
backoffice=backoffice)
class DuplicateGlobalActionNameError(Exception):
@ -2179,14 +2204,15 @@ class WorkflowStatusItem(XmlSerialisable):
if not picklableupload:
continue
try:
# convert any value to a PicklableUpload; it will ususally
# be a dict like one provided by qommon/evalutils:attachment()
picklableupload = FileField.convert_value_from_anything(picklableupload)
except ValueError:
get_publisher().notify_of_exception(sys.exc_info(),
context='[workflow/attachments]')
continue
if not isinstance(picklableupload, PicklableUpload):
try:
# convert any value to a PicklableUpload; it will ususally
# be a dict like one provided by qommon/evalutils:attachment()
picklableupload = FileField.convert_value_from_anything(picklableupload)
except ValueError:
get_publisher().notify_of_exception(sys.exc_info(),
context='[workflow/attachments]')
continue
uploads.append(picklableupload)