misc: check template syntax in model files (#14304) #968
|
@ -1,4 +1,3 @@
|
|||
import io
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
@ -7,8 +6,7 @@ import xml.etree.ElementTree as ET
|
|||
import pytest
|
||||
import responses
|
||||
from pyquery import PyQuery
|
||||
from quixote.http_request import Upload as QuixoteUpload
|
||||
from webtest import Radio, Upload
|
||||
from webtest import Upload
|
||||
|
||||
from wcs import fields
|
||||
from wcs.blocks import BlockDef
|
||||
|
@ -18,7 +16,6 @@ from wcs.formdef import FormDef
|
|||
from wcs.mail_templates import MailTemplate
|
||||
from wcs.qommon.afterjobs import AfterJob
|
||||
from wcs.qommon.errors import ConnectionError
|
||||
from wcs.qommon.form import UploadedFile
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
|
||||
from wcs.wf.form import WorkflowFormFieldsFormDef
|
||||
|
@ -1879,55 +1876,6 @@ def test_workflows_choice_action_line_details_markup(pub):
|
|||
assert resp.pyquery('svg a text')[1].text == 'hello 🦁'
|
||||
|
||||
|
||||
def test_workflows_edit_export_to_model_action(pub):
|
||||
create_superuser(pub)
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
workflow.add_status(name='baz')
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/workflows/1/')
|
||||
resp = resp.click('baz')
|
||||
|
||||
resp.forms[0]['action-interaction'] = 'Document Creation'
|
||||
resp = resp.forms[0].submit()
|
||||
resp = resp.follow()
|
||||
|
||||
resp = resp.click('Document Creation')
|
||||
with open(os.path.join(os.path.dirname(__file__), '../template.odt'), 'rb') as fd:
|
||||
model_content = fd.read()
|
||||
resp.form['model_file'] = Upload('test.odt', model_content)
|
||||
resp = resp.form.submit('submit')
|
||||
resp = resp.follow()
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Document Creation')
|
||||
resp_model_content = resp.click('test.odt')
|
||||
assert resp_model_content.body == model_content
|
||||
resp = resp.form.submit('submit').follow().follow()
|
||||
# check file model is still there
|
||||
resp = resp.click('Document Creation')
|
||||
resp_model_content = resp.click('test.odt')
|
||||
assert resp_model_content.body == model_content
|
||||
|
||||
# check with RTF, disallowed by default
|
||||
resp.form['model_file'] = Upload('test.rtf', b'{\\rtf...')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.pyquery('.widget-with-error .error').text() == 'Only OpenDocument and XML files can be used.'
|
||||
|
||||
# allow RTF
|
||||
pub.site_options.set('options', 'disable-rtf-support', 'false')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
resp.form['model_file'] = Upload('test.rtf', b'{\\rtf...')
|
||||
resp = resp.form.submit('submit').follow().follow()
|
||||
assert (
|
||||
resp.pyquery('.biglistitem--content')
|
||||
.text()
|
||||
.startswith('Document Creation (with model named test.rtf')
|
||||
)
|
||||
|
||||
|
||||
def test_workflows_action_subpath(pub):
|
||||
create_superuser(pub)
|
||||
Workflow.wipe()
|
||||
|
@ -2163,47 +2111,6 @@ def test_workflows_variables_delete(pub):
|
|||
assert Workflow.get(workflow.id).variables_formdef is None
|
||||
|
||||
|
||||
def test_workflows_export_to_model_action_display(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
baz_status = workflow.add_status(name='baz')
|
||||
export_to = baz_status.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (no model set)' in resp
|
||||
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
export_to.id = '_export_to'
|
||||
export_to.by = ['_submitter']
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (with model named test.rtf of 11 bytes)' in resp
|
||||
|
||||
upload.fp.write(b'HELLO WORLD' * 4242)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (with model named test.rtf of 45.6 KB)' in resp
|
||||
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
resp.form['method'] = 'Non interactive'
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert not workflow.possible_status[0].items[0].by
|
||||
|
||||
|
||||
def test_workflows_variables_with_export_to_model_action(pub):
|
||||
test_workflows_variables(pub)
|
||||
|
||||
|
@ -2218,45 +2125,6 @@ def test_workflows_variables_with_export_to_model_action(pub):
|
|||
resp = resp.click('Edit', href='%s/' % workflow.variables_formdef.fields[0].id)
|
||||
|
||||
|
||||
def test_workflows_export_to_model_in_status(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
baz_status = workflow.add_status(name='baz')
|
||||
export_to = baz_status.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
assert isinstance(resp.form['method'], Radio)
|
||||
resp.form['label'] = 'export label'
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert workflow.possible_status[0].items[0].method == 'interactive'
|
||||
assert workflow.possible_status[0].items[0].label == 'export label'
|
||||
|
||||
|
||||
def test_workflows_export_to_model_in_global_action(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
ac1 = workflow.add_global_action('Action', 'ac1')
|
||||
export_to = ac1.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
assert not isinstance(resp.form['method'], Radio)
|
||||
assert 'label' not in resp.form.fields
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert workflow.global_actions[0].items[0].method == 'non-interactive'
|
||||
|
||||
|
||||
def test_workflows_variables_replacement(pub):
|
||||
create_superuser(pub)
|
||||
pub.site_options.set('options', 'enable-workflow-variable-parameter', 'true')
|
||||
|
|
|
@ -10,6 +10,7 @@ from pyzbar.pyzbar import ZBarSymbol
|
|||
from pyzbar.pyzbar import decode as zbar_decode_qrcode
|
||||
from quixote import cleanup
|
||||
from quixote.http_request import Upload as QuixoteUpload
|
||||
from webtest import Radio, Upload
|
||||
|
||||
from wcs import sessions
|
||||
from wcs.blocks import BlockDef
|
||||
|
@ -37,7 +38,8 @@ from wcs.qommon.upload_storage import PicklableUpload
|
|||
from wcs.wf.export_to_model import ExportToModel, UploadValidationError, transform_to_pdf
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
|
||||
|
||||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
from ..admin_pages.test_all import create_superuser
|
||||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
||||
|
||||
def setup_module(module):
|
||||
|
@ -508,3 +510,167 @@ def test_interactive_create_doc_and_jump_on_submit(pub):
|
|||
assert resp.content_type != 'text/html'
|
||||
assert resp.body.startswith(b'PK') # odt
|
||||
assert formdef.data_class().select()[0].status == f'wf-{st1.id}' # no change
|
||||
|
||||
|
||||
def test_workflows_edit_export_to_model_action(pub):
|
||||
create_superuser(pub)
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
workflow.add_status(name='baz')
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/workflows/1/')
|
||||
resp = resp.click('baz')
|
||||
|
||||
resp.forms[0]['action-interaction'] = 'Document Creation'
|
||||
resp = resp.forms[0].submit()
|
||||
resp = resp.follow()
|
||||
|
||||
resp = resp.click('Document Creation')
|
||||
with open(os.path.join(os.path.dirname(__file__), '../template.odt'), 'rb') as fd:
|
||||
model_content = fd.read()
|
||||
resp.form['model_file'] = Upload('test.odt', model_content)
|
||||
resp = resp.form.submit('submit')
|
||||
resp = resp.follow()
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Document Creation')
|
||||
resp_model_content = resp.click('test.odt')
|
||||
assert resp_model_content.body == model_content
|
||||
resp = resp.form.submit('submit').follow().follow()
|
||||
# check file model is still there
|
||||
resp = resp.click('Document Creation')
|
||||
resp_model_content = resp.click('test.odt')
|
||||
assert resp_model_content.body == model_content
|
||||
|
||||
# check with RTF, disallowed by default
|
||||
resp.form['model_file'] = Upload('test.rtf', b'{\\rtf...')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.pyquery('.widget-with-error .error').text() == 'Only OpenDocument and XML files can be used.'
|
||||
|
||||
# allow RTF
|
||||
pub.site_options.set('options', 'disable-rtf-support', 'false')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
resp.form['model_file'] = Upload('test.rtf', b'{\\rtf...')
|
||||
resp = resp.form.submit('submit').follow().follow()
|
||||
assert (
|
||||
resp.pyquery('.biglistitem--content')
|
||||
.text()
|
||||
.startswith('Document Creation (with model named test.rtf')
|
||||
)
|
||||
|
||||
|
||||
def test_workflows_export_to_model_action_display(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
baz_status = workflow.add_status(name='baz')
|
||||
export_to = baz_status.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (no model set)' in resp
|
||||
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
export_to.id = '_export_to'
|
||||
export_to.by = ['_submitter']
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (with model named test.rtf of 11 bytes)' in resp
|
||||
|
||||
upload.fp.write(b'HELLO WORLD' * 4242)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||||
assert 'Document Creation (with model named test.rtf of 45.6 KB)' in resp
|
||||
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
resp.form['method'] = 'Non interactive'
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert not workflow.possible_status[0].items[0].by
|
||||
|
||||
|
||||
def test_workflows_export_to_model_in_status(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
baz_status = workflow.add_status(name='baz')
|
||||
export_to = baz_status.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
assert isinstance(resp.form['method'], Radio)
|
||||
resp.form['label'] = 'export label'
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert workflow.possible_status[0].items[0].method == 'interactive'
|
||||
assert workflow.possible_status[0].items[0].label == 'export label'
|
||||
|
||||
|
||||
def test_workflows_export_to_model_in_global_action(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
ac1 = workflow.add_global_action('Action', 'ac1')
|
||||
export_to = ac1.add_action('export_to_model')
|
||||
export_to.label = 'create doc'
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(export_to.get_admin_url())
|
||||
assert not isinstance(resp.form['method'], Radio)
|
||||
assert 'label' not in resp.form.fields
|
||||
resp = resp.form.submit('submit')
|
||||
workflow.refresh_from_storage()
|
||||
assert workflow.global_actions[0].items[0].method == 'non-interactive'
|
||||
|
||||
|
||||
def test_workflows_edit_export_to_model_action_check_template(pub):
|
||||
create_superuser(pub)
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='foo')
|
||||
workflow.add_status(name='baz')
|
||||
workflow.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/workflows/1/')
|
||||
resp = resp.click('baz')
|
||||
|
||||
resp.forms[0]['action-interaction'] = 'Document Creation'
|
||||
resp = resp.forms[0].submit()
|
||||
resp = resp.follow()
|
||||
|
||||
resp = resp.click('Document Creation')
|
||||
zip_out_fp = io.BytesIO()
|
||||
with open(os.path.join(os.path.dirname(__file__), '../template.odt'), 'rb') as fd:
|
||||
with zipfile.ZipFile(fd, mode='r') as zip_in, zipfile.ZipFile(zip_out_fp, mode='w') as zip_out:
|
||||
for filename in zip_in.namelist():
|
||||
content = zip_in.read(filename)
|
||||
if filename == 'content.xml':
|
||||
assert b'>[form_name]<' in content
|
||||
content = content.replace(b'>[form_name]<', b'>{% if foo %}{{ foo }}{% end %}<')
|
||||
zip_out.writestr(filename, content)
|
||||
model_content = zip_out_fp.getvalue()
|
||||
resp.form['model_file'] = Upload('test.odt', model_content)
|
||||
resp = resp.form.submit('submit')
|
||||
assert (
|
||||
resp.pyquery('#form_error_model_file')
|
||||
.text()
|
||||
.startswith('syntax error in Django template: Invalid block')
|
||||
)
|
||||
|
|
|
@ -858,6 +858,9 @@ class UploadedFile:
|
|||
def get_file(self):
|
||||
return open(self.build_file_path(), 'rb') # pylint: disable=consider-using-with
|
||||
|
||||
def get_file_pointer(self):
|
||||
return self.get_file()
|
||||
|
||||
def get_content(self):
|
||||
return self.get_file().read()
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ from ..qommon.form import (
|
|||
WidgetList,
|
||||
WysiwygTextWidget,
|
||||
)
|
||||
from ..qommon.template import TemplateError
|
||||
from ..qommon.template import Template, TemplateError
|
||||
|
||||
OO_TEXT_NS = 'urn:oasis:names:tc:opendocument:xmlns:text:1.0'
|
||||
OO_OFFICE_NS = 'urn:oasis:names:tc:opendocument:xmlns:office:1.0'
|
||||
|
@ -376,6 +376,15 @@ class ExportToModel(WorkflowStatusItem):
|
|||
return 'xml'
|
||||
raise UploadValidationError(_('Only OpenDocument and XML files can be used.'))
|
||||
|
||||
def model_file_content_validation(self, upload):
|
||||
for string in self.get_model_file_template_strings(upload):
|
||||
if string:
|
||||
try:
|
||||
Template(string, raises=True)
|
||||
except TemplateError as e:
|
||||
raise UploadValidationError(str(e))
|
||||
upload.fp.seek(0)
|
||||
|
||||
def get_parameters(self):
|
||||
parameters = ('model_file',)
|
||||
if transform_to_pdf is not None:
|
||||
|
@ -438,7 +447,7 @@ class ExportToModel(WorkflowStatusItem):
|
|||
filename=filename,
|
||||
title=_('Model'),
|
||||
hint=hint,
|
||||
validation=self.model_file_validation,
|
||||
validation=self.model_file_content_validation,
|
||||
value=value,
|
||||
)
|
||||
if 'convert_to_pdf' in parameters:
|
||||
|
@ -843,23 +852,32 @@ class ExportToModel(WorkflowStatusItem):
|
|||
yield from super().get_computed_strings()
|
||||
yield self.filename
|
||||
if self.model_file:
|
||||
try:
|
||||
kind = self.model_file_validation(self.model_file, allow_rtf=True)
|
||||
except FileNotFoundError:
|
||||
kind = None
|
||||
if kind in ('rtf', 'xml'):
|
||||
yield self.model_file.get_file().read().decode(errors='surrogateescape')
|
||||
elif kind == 'opendocument':
|
||||
with zipfile.ZipFile(self.model_file.get_file(), mode='r') as zin:
|
||||
content = zin.read('content.xml')
|
||||
root = ET.fromstring(content)
|
||||
for node in root.iter():
|
||||
if node.tag == DRAW_FRAME:
|
||||
yield node.attrib.get(DRAW_NAME)
|
||||
elif node.tag == USER_FIELD_DECL and STRING_VALUE in node.attrib:
|
||||
yield node.attrib[STRING_VALUE]
|
||||
yield getattr(node, 'text', None)
|
||||
yield getattr(node, 'tail', None)
|
||||
yield from self.get_model_file_template_strings(model_file=self.model_file, allow_rtf=True)
|
||||
|
||||
def get_model_file_template_strings(self, model_file, allow_rtf=False):
|
||||
try:
|
||||
kind = self.model_file_validation(model_file, allow_rtf=allow_rtf)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
|
||||
if hasattr(model_file, 'get_file_pointer'):
|
||||
model_file_fp = model_file.get_file_pointer()
|
||||
else:
|
||||
model_file_fp = model_file.fp
|
||||
|
||||
if kind in ('rtf', 'xml'):
|
||||
yield model_file_fp.read().decode(errors='surrogateescape')
|
||||
elif kind == 'opendocument':
|
||||
with zipfile.ZipFile(model_file_fp, mode='r') as zin:
|
||||
content = zin.read('content.xml')
|
||||
root = ET.fromstring(content)
|
||||
for node in root.iter():
|
||||
if node.tag == DRAW_FRAME:
|
||||
yield node.attrib.get(DRAW_NAME)
|
||||
elif node.tag == USER_FIELD_DECL and STRING_VALUE in node.attrib:
|
||||
yield node.attrib[STRING_VALUE]
|
||||
yield getattr(node, 'text', None)
|
||||
yield getattr(node, 'tail', None)
|
||||
|
||||
def perform(self, formdata):
|
||||
if self.method == 'interactive':
|
||||
|
|
Loading…
Reference in New Issue