deprecations: forbid import of new python expressions (#72093) #1212
|
@ -53,6 +53,7 @@ def teardown_module(module):
|
|||
|
||||
|
||||
def test_empty_site(pub):
|
||||
pub.user_class.wipe()
|
||||
resp = get_app(pub).get('/backoffice/users/')
|
||||
resp = resp.click('New User')
|
||||
resp = get_app(pub).get('/backoffice/settings/')
|
||||
|
|
|
@ -220,6 +220,25 @@ def test_block_export_import(pub):
|
|||
assert 'Invalid File (Unknown referenced objects)' in resp
|
||||
assert '<ul><li>Unknown datasources: foobar</li></ul>' in resp
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
block.fields = [
|
||||
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
block.store()
|
||||
resp = app.get('/backoffice/forms/blocks/%s/' % block.id)
|
||||
resp = resp.click(href=re.compile('^export$'))
|
||||
xml_export = resp.text
|
||||
resp = app.get('/backoffice/forms/blocks/')
|
||||
resp = resp.click(href='import')
|
||||
resp.form['file'] = Upload('block', xml_export.encode('utf-8'))
|
||||
resp = resp.form.submit()
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_block_delete(pub):
|
||||
create_superuser(pub)
|
||||
|
|
|
@ -1039,6 +1039,23 @@ def test_data_sources_import(pub):
|
|||
resp = resp.form.submit()
|
||||
assert 'Invalid File' in resp.text
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
|
||||
data_source.store()
|
||||
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
|
||||
resp = resp.click(href=re.compile('^export$'))
|
||||
xml_export = resp.text
|
||||
resp = app.get('/backoffice/settings/data-sources/')
|
||||
resp = resp.click(href='import')
|
||||
resp.form['file'] = Upload('ds', xml_export.encode('utf-8'))
|
||||
resp = resp.form.submit()
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_data_sources_edit_slug(pub):
|
||||
create_superuser(pub)
|
||||
|
|
|
@ -7,11 +7,11 @@ import pytest
|
|||
from quixote.http_request import Upload as QuixoteUpload
|
||||
|
||||
from wcs import fields
|
||||
from wcs.backoffice.deprecations import DeprecationsScanAfterJob
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
from wcs.blocks import BlockDef, BlockdefImportError
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
|
||||
from wcs.formdef import FormDef, FormdefImportError
|
||||
from wcs.mail_templates import MailTemplate
|
||||
from wcs.qommon.form import UploadedFile
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
|
@ -23,8 +23,8 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
|
|||
from wcs.wf.jump import JumpWorkflowStatusItem
|
||||
from wcs.wf.notification import SendNotificationWorkflowStatusItem
|
||||
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
|
||||
from wcs.wscalls import NamedWsCall
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError
|
||||
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
|
||||
|
||||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
from .test_all import create_superuser
|
||||
|
@ -293,7 +293,7 @@ def test_deprecations_choice_label(pub):
|
|||
accept = st0.add_action('choice', id='_choice')
|
||||
accept.label = '[test] action'
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert not job.report_lines
|
||||
|
||||
|
@ -305,7 +305,7 @@ def test_deprecations_skip_invalid_ezt(pub):
|
|||
display = st0.add_action('displaymsg')
|
||||
display.message = 'message with invalid [if-any] ezt'
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert not job.report_lines
|
||||
|
||||
|
@ -316,19 +316,19 @@ def test_deprecations_ignore_ezt_looking_tag(pub):
|
|||
sendmail = st0.add_action('sendmail')
|
||||
sendmail.subject = '[REMINDER] your appointment'
|
||||
workflow.store()
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert not job.report_lines
|
||||
|
||||
sendmail.subject = '[reminder]'
|
||||
workflow.store()
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert job.report_lines
|
||||
|
||||
sendmail.subject = '[if-any plop]test[end]'
|
||||
workflow.store()
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert job.report_lines
|
||||
|
||||
|
@ -397,7 +397,7 @@ def test_deprecations_document_models(pub):
|
|||
export_to2.by = ['_submitter']
|
||||
workflow.store()
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
assert job.report_lines == [
|
||||
{
|
||||
|
@ -446,7 +446,7 @@ def test_deprecations_inspect_pages(pub):
|
|||
display.message = 'message with [ezt] info'
|
||||
workflow.store()
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
|
||||
create_superuser(pub)
|
||||
|
@ -485,7 +485,7 @@ def test_deprecations_inspect_pages(pub):
|
|||
display.message = 'message with {{django}} info'
|
||||
workflow.store()
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
|
||||
resp = app.get(formdef.get_admin_url() + 'inspect')
|
||||
|
@ -506,7 +506,7 @@ def test_deprecations_inspect_pages_old_format(pub):
|
|||
]
|
||||
formdef.store()
|
||||
|
||||
job = DeprecationsScanAfterJob()
|
||||
job = DeprecationsScan()
|
||||
job.execute()
|
||||
|
||||
with open(os.path.join(pub.app_dir, 'deprecations.json')) as f:
|
||||
|
@ -525,3 +525,124 @@ def test_deprecations_inspect_pages_old_format(pub):
|
|||
|
||||
resp = app.get('/backoffice/studio/deprecations/')
|
||||
assert resp.pyquery('.section--python-condition li a')
|
||||
|
||||
|
||||
def test_deprecations_on_import(pub):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foobar'
|
||||
formdef.fields = [
|
||||
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
blockdef = BlockDef()
|
||||
blockdef.name = 'foobar'
|
||||
blockdef.fields = [
|
||||
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
blockdef.store()
|
||||
|
||||
workflow = Workflow(name='test')
|
||||
st0 = workflow.add_status('Status0', 'st0')
|
||||
sendsms = st0.add_action('sendsms', id='_sendsms')
|
||||
sendsms.to = 'xxx'
|
||||
sendsms.condition = {'type': 'python', 'value': 'True'}
|
||||
sendsms.parent = st0
|
||||
st0.items.append(sendsms)
|
||||
workflow.store()
|
||||
|
||||
data_source = NamedDataSource(name='ds_python')
|
||||
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
|
||||
data_source.store()
|
||||
|
||||
wscall = NamedWsCall()
|
||||
wscall.name = 'Hello'
|
||||
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
|
||||
wscall.store()
|
||||
|
||||
mail_template = MailTemplate() # no python expression in mail templates
|
||||
mail_template.name = 'Hello2'
|
||||
mail_template.subject = 'plop'
|
||||
mail_template.body = 'plop [ezt] plop'
|
||||
mail_template.store()
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(formdef)
|
||||
formdef_xml = formdef.export_to_xml()
|
||||
FormDef.import_from_xml_tree(formdef_xml)
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(blockdef)
|
||||
blockdef_xml = blockdef.export_to_xml()
|
||||
BlockDef.import_from_xml_tree(blockdef_xml)
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(workflow)
|
||||
workflow_xml = workflow.export_to_xml()
|
||||
Workflow.import_from_xml_tree(workflow_xml)
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(data_source)
|
||||
data_source_xml = data_source.export_to_xml()
|
||||
NamedDataSource.import_from_xml_tree(data_source_xml)
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(wscall)
|
||||
wscall_xml = wscall.export_to_xml()
|
||||
NamedWsCall.import_from_xml_tree(wscall_xml)
|
||||
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(mail_template)
|
||||
mail_template_xml = mail_template.export_to_xml()
|
||||
MailTemplate.import_from_xml_tree(mail_template_xml)
|
||||
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
|
||||
job = DeprecationsScan()
|
||||
with pytest.raises(DeprecatedElementsDetected) as excinfo:
|
||||
job.check_deprecated_elements_in_object(formdef)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
with pytest.raises(FormdefImportError) as excinfo:
|
||||
FormDef.import_from_xml_tree(formdef_xml)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
|
||||
job = DeprecationsScan()
|
||||
with pytest.raises(DeprecatedElementsDetected) as excinfo:
|
||||
job.check_deprecated_elements_in_object(blockdef)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
with pytest.raises(BlockdefImportError) as excinfo:
|
||||
BlockDef.import_from_xml_tree(blockdef_xml)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
|
||||
job = DeprecationsScan()
|
||||
with pytest.raises(DeprecatedElementsDetected) as excinfo:
|
||||
job.check_deprecated_elements_in_object(workflow)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
with pytest.raises(WorkflowImportError) as excinfo:
|
||||
Workflow.import_from_xml_tree(workflow_xml)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
|
||||
job = DeprecationsScan()
|
||||
with pytest.raises(DeprecatedElementsDetected) as excinfo:
|
||||
job.check_deprecated_elements_in_object(data_source)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
with pytest.raises(NamedDataSourceImportError) as excinfo:
|
||||
NamedDataSource.import_from_xml_tree(data_source_xml)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
|
||||
job = DeprecationsScan()
|
||||
with pytest.raises(DeprecatedElementsDetected) as excinfo:
|
||||
job.check_deprecated_elements_in_object(wscall)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
with pytest.raises(NamedWsCallImportError) as excinfo:
|
||||
NamedWsCall.import_from_xml_tree(wscall_xml)
|
||||
assert str(excinfo.value) == 'Python expression detected'
|
||||
|
||||
# no python expressions
|
||||
job = DeprecationsScan()
|
||||
job.check_deprecated_elements_in_object(mail_template)
|
||||
MailTemplate.import_from_xml_tree(mail_template_xml)
|
||||
|
|
|
@ -4077,6 +4077,39 @@ def test_form_overwrite(pub):
|
|||
assert resp.pyquery('.error').text() == 'Invalid File'
|
||||
|
||||
|
||||
def test_form_export_import_export(pub):
|
||||
create_superuser(pub)
|
||||
create_role(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'form title'
|
||||
formdef.table_name = 'xxx'
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
formdef.fields = [
|
||||
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
formdef.store()
|
||||
resp = app.get('/backoffice/forms/%s/' % formdef.id)
|
||||
resp = resp.click(href=re.compile('^export$'))
|
||||
xml_export = resp.text
|
||||
resp = app.get('/backoffice/forms/')
|
||||
resp = resp.click(href='import')
|
||||
resp.form['file'] = Upload('formdef', xml_export.encode('utf-8'))
|
||||
resp = resp.form.submit()
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_form_export_import_export_overwrite(pub):
|
||||
create_superuser(pub)
|
||||
create_role(pub)
|
||||
|
@ -4134,6 +4167,23 @@ def test_form_export_import_export_overwrite(pub):
|
|||
field_ow = formdef_overwrited.fields[i]
|
||||
assert (field.id, field.label, field.key) == (field_ow.id, field_ow.label, field_ow.key)
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
formdef2.fields = [
|
||||
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
formdef2.store()
|
||||
formdef2_xml = ET.tostring(formdef2.export_to_xml(include_id=True))
|
||||
resp = app.get('/backoffice/forms/%s/' % formdef.id)
|
||||
resp = resp.click(href='overwrite')
|
||||
resp.forms[0]['file'] = Upload('formdef.wcs', formdef2_xml)
|
||||
resp = resp.forms[0].submit()
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_form_overwrite_from_url(pub):
|
||||
create_superuser(pub)
|
||||
|
|
|
@ -399,6 +399,93 @@ def test_settings_export_import(pub):
|
|||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Unknown referenced objects [Unknown datasources: foobar]' in resp
|
||||
BlockDef.wipe()
|
||||
|
||||
# python expressions
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foobar'
|
||||
formdef.fields = [
|
||||
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
|
||||
]
|
||||
formdef.store()
|
||||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
resp = app.get('/backoffice/settings/import')
|
||||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp.form['confirm'].checked = True
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Python expression detected' in resp
|
||||
FormDef.wipe()
|
||||
|
||||
blockdef = BlockDef()
|
||||
blockdef.name = 'foobar'
|
||||
blockdef.fields = [
|
||||
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
blockdef.store()
|
||||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
resp = app.get('/backoffice/settings/import')
|
||||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Python expression detected' in resp
|
||||
BlockDef.wipe()
|
||||
|
||||
workflow = Workflow(name='test')
|
||||
st0 = workflow.add_status('Status0', 'st0')
|
||||
sendsms = st0.add_action('sendsms', id='_sendsms')
|
||||
sendsms.to = 'xxx'
|
||||
sendsms.condition = {'type': 'python', 'value': 'True'}
|
||||
sendsms.parent = st0
|
||||
st0.items.append(sendsms)
|
||||
workflow.store()
|
||||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
resp = app.get('/backoffice/settings/import')
|
||||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp.form['confirm'].checked = True
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Python expression detected' in resp
|
||||
Workflow.wipe()
|
||||
|
||||
data_source = NamedDataSource(name='ds_python')
|
||||
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
|
||||
data_source.store()
|
||||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
resp = app.get('/backoffice/settings/import')
|
||||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Python expression detected' in resp
|
||||
NamedDataSource.wipe()
|
||||
|
||||
wscall = NamedWsCall()
|
||||
wscall.name = 'Hello'
|
||||
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
|
||||
wscall.store()
|
||||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit').follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
resp = app.get('/backoffice/settings/import')
|
||||
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert 'Python expression detected' in resp
|
||||
NamedWsCall.wipe()
|
||||
|
||||
# check a backup of settings has been created
|
||||
assert [x for x in os.listdir(pub.app_dir) if x.startswith('config.pck.backup-')]
|
||||
|
|
|
@ -990,6 +990,28 @@ def test_workflows_export_import(pub):
|
|||
assert 'Invalid File' in resp.text
|
||||
assert Workflow.count() == 2
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
st0 = workflow.add_status('Status0', 'st0')
|
||||
sendsms = st0.add_action('sendsms', id='_sendsms')
|
||||
sendsms.to = 'xxx'
|
||||
sendsms.condition = {'type': 'python', 'value': 'True'}
|
||||
sendsms.parent = st0
|
||||
st0.items.append(sendsms)
|
||||
workflow.store()
|
||||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||||
resp = resp.click(href=re.compile('^export$'))
|
||||
xml_export = resp.text
|
||||
resp = app.get('/backoffice/workflows/')
|
||||
resp = resp.click('Import')
|
||||
resp.form['file'] = Upload('wf.wcs', xml_export.encode('utf-8'))
|
||||
resp = resp.form.submit('submit')
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_workflows_import_from_url(pub):
|
||||
create_superuser(pub)
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import io
|
||||
import os
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
import pytest
|
||||
|
@ -207,6 +209,23 @@ def test_wscalls_import(pub, wscall):
|
|||
resp = resp.form.submit()
|
||||
assert 'Invalid File' in resp.text
|
||||
|
||||
# python expression
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
|
||||
wscall.store()
|
||||
resp = app.get('/backoffice/settings/wscalls/%s/' % wscall.id)
|
||||
resp = resp.click(href=re.compile('^export$'))
|
||||
xml_export = resp.text
|
||||
resp = app.get('/backoffice/settings/wscalls/')
|
||||
resp = resp.click(href='import')
|
||||
resp.form['file'] = Upload('wscall', xml_export.encode('utf-8'))
|
||||
resp = resp.form.submit()
|
||||
assert 'Python expression detected' in resp
|
||||
|
||||
|
||||
def test_wscalls_empty_param_values(pub):
|
||||
create_superuser(pub)
|
||||
|
|
|
@ -2046,6 +2046,97 @@ def test_export_import_workflow_options(pub):
|
|||
assert formdef.workflow_options == {'foo': 'bar2'}
|
||||
|
||||
|
||||
def test_export_import_with_deprecated(pub):
|
||||
pub.load_site_options()
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foo'
|
||||
formdef.fields = [
|
||||
PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
|
||||
]
|
||||
formdef.store()
|
||||
bundle = create_bundle(
|
||||
[
|
||||
{'type': 'forms', 'slug': 'foo', 'name': 'foo'},
|
||||
],
|
||||
('forms/foo', formdef),
|
||||
)
|
||||
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
|
||||
afterjob_url = resp.json['url']
|
||||
resp = get_app(pub).put(sign_uri(afterjob_url))
|
||||
assert resp.json['data']['status'] == 'failed'
|
||||
|
||||
blockdef = BlockDef()
|
||||
blockdef.name = 'foo'
|
||||
blockdef.fields = [
|
||||
StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
|
||||
]
|
||||
blockdef.store()
|
||||
bundle = create_bundle(
|
||||
[
|
||||
{'type': 'blocks', 'slug': 'foo', 'name': 'foo'},
|
||||
],
|
||||
('blocks/foo', blockdef),
|
||||
)
|
||||
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
|
||||
afterjob_url = resp.json['url']
|
||||
resp = get_app(pub).put(sign_uri(afterjob_url))
|
||||
assert resp.json['data']['status'] == 'failed'
|
||||
|
||||
workflow = Workflow(name='foo')
|
||||
st0 = workflow.add_status('Status0', 'st0')
|
||||
sendsms = st0.add_action('sendsms', id='_sendsms')
|
||||
sendsms.to = 'xxx'
|
||||
sendsms.condition = {'type': 'python', 'value': 'True'}
|
||||
sendsms.parent = st0
|
||||
st0.items.append(sendsms)
|
||||
workflow.store()
|
||||
bundle = create_bundle(
|
||||
[
|
||||
{'type': 'workflows', 'slug': 'foo', 'name': 'foo'},
|
||||
],
|
||||
('workflows/foo', workflow),
|
||||
)
|
||||
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
|
||||
afterjob_url = resp.json['url']
|
||||
resp = get_app(pub).put(sign_uri(afterjob_url))
|
||||
assert resp.json['data']['status'] == 'failed'
|
||||
|
||||
data_source = NamedDataSource(name='foo')
|
||||
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
|
||||
data_source.store()
|
||||
bundle = create_bundle(
|
||||
[
|
||||
{'type': 'data-sources', 'slug': 'foo', 'name': 'foo'},
|
||||
],
|
||||
('data-sources/foo', data_source),
|
||||
)
|
||||
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
|
||||
afterjob_url = resp.json['url']
|
||||
resp = get_app(pub).put(sign_uri(afterjob_url))
|
||||
assert resp.json['data']['status'] == 'failed'
|
||||
|
||||
wscall = NamedWsCall()
|
||||
wscall.name = 'foo'
|
||||
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
|
||||
wscall.store()
|
||||
bundle = create_bundle(
|
||||
[
|
||||
{'type': 'wscalls', 'slug': 'foo', 'name': 'foo'},
|
||||
],
|
||||
('wscalls/foo', wscall),
|
||||
)
|
||||
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
|
||||
afterjob_url = resp.json['url']
|
||||
resp = get_app(pub).put(sign_uri(afterjob_url))
|
||||
assert resp.json['data']['status'] == 'failed'
|
||||
|
||||
|
||||
def test_api_export_import_invalid_slug(pub):
|
||||
pub.role_class.wipe()
|
||||
role1 = pub.role_class(name='Test role 1')
|
||||
|
|
|
@ -27,6 +27,7 @@ from wcs.categories import CardDefCategory, DataSourceCategory
|
|||
from wcs.data_sources import (
|
||||
DataSourceSelectionWidget,
|
||||
NamedDataSource,
|
||||
NamedDataSourceImportError,
|
||||
RefreshAgendas,
|
||||
get_structured_items,
|
||||
has_chrono,
|
||||
|
@ -667,15 +668,22 @@ class NamedDataSourcesDirectory(Directory):
|
|||
def import_submit(self, form):
|
||||
fp = form.get_widget('file').parse().fp
|
||||
|
||||
error = False
|
||||
error, reason = False, None
|
||||
try:
|
||||
datasource = NamedDataSource.import_from_xml(fp)
|
||||
get_session().message = ('info', _('This datasource has been successfully imported.'))
|
||||
except NamedDataSourceImportError as e:
|
||||
error = True
|
||||
reason = str(e)
|
||||
except ValueError:
|
||||
error = True
|
||||
|
||||
if error:
|
||||
form.set_error('file', _('Invalid File'))
|
||||
if reason:
|
||||
msg = _('Invalid File (%s)') % reason
|
||||
else:
|
||||
msg = _('Invalid File')
|
||||
form.set_error('file', msg)
|
||||
raise ValueError()
|
||||
|
||||
try:
|
||||
|
|
|
@ -36,7 +36,7 @@ from quixote.html import TemplateIO, htmltext
|
|||
from wcs.api_access import ApiAccess
|
||||
from wcs.blocks import BlockDef, BlockdefImportError
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
|
||||
from wcs.fields.map import MapOptionsMixin
|
||||
from wcs.formdef import FormDef, FormdefImportError, get_formdefs_of_all_kinds
|
||||
from wcs.qommon import _, audit, errors, get_cfg, ident, misc, pgettext_lazy, template
|
||||
|
@ -61,6 +61,7 @@ from wcs.qommon.form import (
|
|||
TextWidget,
|
||||
)
|
||||
from wcs.workflows import Workflow, WorkflowImportError
|
||||
from wcs.wscalls import NamedWsCallImportError
|
||||
|
||||
from .api_access import ApiAccessDirectory
|
||||
from .data_sources import NamedDataSourcesDirectory
|
||||
|
@ -1512,7 +1513,10 @@ class SiteImportAfterJob(AfterJob):
|
|||
msg = _(e.msg) % e.msg_args
|
||||
if e.details:
|
||||
msg += ' [%s]' % e.details
|
||||
error = _('Failed to import a workflow (%s); site import did not complete.') % msg
|
||||
error = _('Failed to import objects (%s); site import did not complete.') % msg
|
||||
except (NamedDataSourceImportError, NamedWsCallImportError) as e:
|
||||
results = None
|
||||
error = _('Failed to import objects (%s); site import did not complete.') % str(e)
|
||||
|
||||
self.results = results
|
||||
if error:
|
||||
|
|
|
@ -26,7 +26,7 @@ from wcs.backoffice.snapshots import SnapshotsDirectory
|
|||
from wcs.qommon import _, errors, misc, template
|
||||
from wcs.qommon.form import CheckboxWidget, FileWidget, Form, HtmlWidget, SlugWidget, StringWidget, TextWidget
|
||||
from wcs.utils import grep_strings
|
||||
from wcs.wscalls import NamedWsCall, WsCallRequestWidget
|
||||
from wcs.wscalls import NamedWsCall, NamedWsCallImportError, WsCallRequestWidget
|
||||
|
||||
|
||||
class NamedWsCallUI:
|
||||
|
@ -317,15 +317,22 @@ class NamedWsCallsDirectory(Directory):
|
|||
def import_submit(self, form):
|
||||
fp = form.get_widget('file').parse().fp
|
||||
|
||||
error = False
|
||||
error, reason = False, None
|
||||
try:
|
||||
wscall = NamedWsCall.import_from_xml(fp)
|
||||
get_session().message = ('info', _('This webservice call has been successfully imported.'))
|
||||
except NamedWsCallImportError as e:
|
||||
error = True
|
||||
reason = str(e)
|
||||
except ValueError:
|
||||
error = True
|
||||
|
||||
if error:
|
||||
form.set_error('file', _('Invalid File'))
|
||||
if reason:
|
||||
msg = _('Invalid File (%s)') % reason
|
||||
else:
|
||||
msg = _('Invalid File')
|
||||
form.set_error('file', msg)
|
||||
raise ValueError()
|
||||
|
||||
try:
|
||||
|
|
|
@ -26,7 +26,7 @@ from quixote import get_publisher, get_response
|
|||
|
||||
from wcs.api_utils import is_url_signed
|
||||
from wcs.applications import Application, ApplicationElement
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.blocks import BlockDef, BlockdefImportError
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.categories import (
|
||||
BlockCategory,
|
||||
|
@ -38,12 +38,12 @@ from wcs.categories import (
|
|||
WorkflowCategory,
|
||||
)
|
||||
from wcs.comment_templates import CommentTemplate
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
|
||||
from wcs.formdef import FormDef, FormdefImportError
|
||||
from wcs.mail_templates import MailTemplate
|
||||
from wcs.sql import Equal, Role
|
||||
from wcs.workflows import Workflow
|
||||
from wcs.wscalls import NamedWsCall
|
||||
from wcs.workflows import Workflow, WorkflowImportError
|
||||
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
|
||||
|
||||
from .qommon import _
|
||||
from .qommon.afterjobs import AfterJob
|
||||
|
@ -454,6 +454,15 @@ class BundleImportJob(AfterJob):
|
|||
|
||||
# remove obsolete application elements
|
||||
self.unlink_obsolete_objects()
|
||||
|
||||
except (
|
||||
BlockdefImportError,
|
||||
FormdefImportError,
|
||||
WorkflowImportError,
|
||||
NamedDataSourceImportError,
|
||||
NamedWsCallImportError,
|
||||
) as e:
|
||||
error = str(e)
|
||||
except tarfile.TarError:
|
||||
error = _('Invalid tar file.')
|
||||
except BundleKeyError as e:
|
||||
|
|
|
@ -22,17 +22,24 @@ import re
|
|||
from quixote import get_publisher, get_request, get_response, redirect
|
||||
from quixote.directory import Directory
|
||||
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.formdef import get_formdefs_of_all_kinds
|
||||
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
|
||||
from wcs.mail_templates import MailTemplate
|
||||
from wcs.qommon import _, ezt, template
|
||||
from wcs.qommon.afterjobs import AfterJob
|
||||
from wcs.qommon.template import Template
|
||||
from wcs.wf.export_to_model import UploadValidationError
|
||||
from wcs.wf.form import FormWorkflowStatusItem
|
||||
from wcs.workflows import Workflow
|
||||
from wcs.wscalls import NamedWsCall
|
||||
|
||||
|
||||
class DeprecatedElementsDetected(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DeprecationsDirectory(Directory):
|
||||
do_not_call_in_templates = True
|
||||
_q_exports = ['', 'scan']
|
||||
|
@ -66,7 +73,7 @@ class DeprecationsDirectory(Directory):
|
|||
|
||||
def scan(self):
|
||||
job = get_response().add_after_job(
|
||||
DeprecationsScanAfterJob(
|
||||
DeprecationsScan(
|
||||
label=_('Scanning for deprecations'),
|
||||
user_id=get_request().user.id,
|
||||
return_url='/backoffice/studio/deprecations/',
|
||||
|
@ -127,7 +134,7 @@ class DeprecationsDirectory(Directory):
|
|||
}
|
||||
|
||||
|
||||
class DeprecationsScanAfterJob(AfterJob):
|
||||
class DeprecationsScan(AfterJob):
|
||||
def done_action_url(self):
|
||||
return self.kwargs['return_url']
|
||||
|
||||
|
@ -155,181 +162,25 @@ class DeprecationsScanAfterJob(AfterJob):
|
|||
)
|
||||
self.store()
|
||||
|
||||
for formdef in formdefs:
|
||||
if formdef.id:
|
||||
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
|
||||
elif formdef.get_workflow():
|
||||
source = f'workflow:{formdef.get_workflow().id}'
|
||||
else:
|
||||
source = '-'
|
||||
for field in formdef.fields or []:
|
||||
location_label = _('%(name)s / Field "%(label)s"') % {
|
||||
'name': formdef.name,
|
||||
'label': field.ellipsized_label,
|
||||
}
|
||||
url = formdef.get_field_admin_url(field)
|
||||
self.check_data_source(
|
||||
getattr(field, 'data_source', None),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
source=source,
|
||||
)
|
||||
prefill = getattr(field, 'prefill', None)
|
||||
if prefill:
|
||||
if prefill.get('type') == 'formula':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-prefill',
|
||||
source=source,
|
||||
)
|
||||
else:
|
||||
self.check_string(
|
||||
prefill.get('value'),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
python_check=False,
|
||||
source=source,
|
||||
)
|
||||
if field.key == 'page':
|
||||
for condition in field.get_conditions():
|
||||
if condition and condition.get('type') == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-condition',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
if field.key in ('title', 'subtitle', 'comment'):
|
||||
self.check_string(
|
||||
field.label,
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
python_check=False,
|
||||
source=source,
|
||||
)
|
||||
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='fields',
|
||||
source=source,
|
||||
)
|
||||
|
||||
self.increment_count()
|
||||
|
||||
for workflow in workflows:
|
||||
source = f'workflow:{workflow.id}'
|
||||
for action in workflow.get_all_items():
|
||||
location_label = '%s / %s' % (workflow.name, action.description)
|
||||
url = action.get_admin_url()
|
||||
for string in action.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
if getattr(action, 'condition', None):
|
||||
if action.condition.get('type') == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-condition',
|
||||
css_class='important' if (action.key == 'jump' and action.timeout) else '',
|
||||
source=source,
|
||||
)
|
||||
if action.key == 'export_to_model':
|
||||
try:
|
||||
kind = action.model_file_validation(action.model_file, allow_rtf=True)
|
||||
except UploadValidationError:
|
||||
pass
|
||||
else:
|
||||
if kind == 'rtf':
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='rtf', source=source
|
||||
)
|
||||
if action.key in ('aggregationemail', 'resubmit'):
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='actions', source=source
|
||||
)
|
||||
if action.key in ('register-comment', 'sendmail'):
|
||||
for attachment in getattr(action, 'attachments', None) or []:
|
||||
if attachment and not ('{%' in attachment or '{{' in attachment):
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-expression',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
if action.key == 'webservice_call':
|
||||
self.check_remote_call_url(
|
||||
action.url, location_label=location_label, url=url, source=source
|
||||
)
|
||||
|
||||
for global_action in workflow.global_actions or []:
|
||||
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
|
||||
for trigger in global_action.triggers or []:
|
||||
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
|
||||
if trigger.key == 'timeout' and trigger.anchor == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-expression',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
|
||||
self.increment_count()
|
||||
|
||||
for named_data_source in named_data_sources:
|
||||
source = f'datasource:{named_data_source.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Data source'),
|
||||
'name': named_data_source.name,
|
||||
}
|
||||
url = named_data_source.get_admin_url()
|
||||
|
||||
self.check_data_source(
|
||||
getattr(named_data_source, 'data_source', None),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
source=source,
|
||||
)
|
||||
self.increment_count()
|
||||
|
||||
for named_ws_call in named_ws_calls:
|
||||
source = f'wscall:{named_ws_call.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Webservice'),
|
||||
'name': named_ws_call.name,
|
||||
}
|
||||
url = named_ws_call.get_admin_url()
|
||||
for string in named_ws_call.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
if named_ws_call.request and named_ws_call.request.get('url'):
|
||||
self.check_remote_call_url(
|
||||
named_ws_call.request['url'], location_label=location_label, url=url, source=source
|
||||
)
|
||||
self.increment_count()
|
||||
|
||||
for mail_template in mail_templates:
|
||||
source = f'mail_template:{mail_template.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Mail Template'),
|
||||
'name': mail_template.name,
|
||||
}
|
||||
url = mail_template.get_admin_url()
|
||||
for string in mail_template.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
for string in mail_template.attachments or []:
|
||||
# legacy was to have straight python expressions (not prefixed by "=").
|
||||
if not Template.is_template_string(string):
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='python-expression', source=source
|
||||
)
|
||||
self.increment_count()
|
||||
self.check_objects(formdefs + workflows + named_data_sources + named_ws_calls + mail_templates)
|
||||
|
||||
self.build_report_file()
|
||||
self.increment_count()
|
||||
|
||||
def check_objects(self, objects):
|
||||
for obj in objects:
|
||||
if isinstance(obj, (FormDef, CardDef, BlockDef)):
|
||||
self.check_formdef(obj)
|
||||
elif isinstance(obj, Workflow):
|
||||
self.check_workflow(obj)
|
||||
elif isinstance(obj, NamedDataSource):
|
||||
self.check_named_data_source(obj)
|
||||
elif isinstance(obj, NamedWsCall):
|
||||
self.check_named_ws_call(obj)
|
||||
elif isinstance(obj, MailTemplate):
|
||||
self.check_mail_template(obj)
|
||||
self.increment_count()
|
||||
|
||||
def check_data_source(self, data_source, location_label, url, source):
|
||||
if not data_source:
|
||||
return
|
||||
|
@ -386,6 +237,169 @@ class DeprecationsScanAfterJob(AfterJob):
|
|||
location_label=location_label, url=url, category='json-data-store', source=source
|
||||
)
|
||||
|
||||
def check_formdef(self, formdef):
|
||||
if formdef.id:
|
||||
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
|
||||
elif hasattr(formdef, 'get_workflow') and formdef.get_workflow():
|
||||
source = f'workflow:{formdef.get_workflow().id}'
|
||||
else:
|
||||
source = '-'
|
||||
for field in formdef.fields or []:
|
||||
location_label = _('%(name)s / Field "%(label)s"') % {
|
||||
'name': formdef.name,
|
||||
'label': field.ellipsized_label,
|
||||
}
|
||||
url = formdef.get_field_admin_url(field)
|
||||
self.check_data_source(
|
||||
getattr(field, 'data_source', None),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
source=source,
|
||||
)
|
||||
prefill = getattr(field, 'prefill', None)
|
||||
if prefill:
|
||||
if prefill.get('type') == 'formula':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-prefill',
|
||||
source=source,
|
||||
)
|
||||
else:
|
||||
self.check_string(
|
||||
prefill.get('value'),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
python_check=False,
|
||||
source=source,
|
||||
)
|
||||
if field.key == 'page':
|
||||
for condition in field.get_conditions():
|
||||
if condition and condition.get('type') == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-condition',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
if field.key in ('title', 'subtitle', 'comment'):
|
||||
self.check_string(
|
||||
field.label,
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
python_check=False,
|
||||
source=source,
|
||||
)
|
||||
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='fields',
|
||||
source=source,
|
||||
)
|
||||
|
||||
def check_workflow(self, workflow):
|
||||
source = f'workflow:{workflow.id}'
|
||||
for action in workflow.get_all_items():
|
||||
location_label = '%s / %s' % (workflow.name, action.description)
|
||||
url = action.get_admin_url()
|
||||
for string in action.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
if getattr(action, 'condition', None):
|
||||
if action.condition.get('type') == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-condition',
|
||||
css_class='important' if (action.key == 'jump' and action.timeout) else '',
|
||||
source=source,
|
||||
)
|
||||
if action.key == 'export_to_model':
|
||||
try:
|
||||
kind = action.model_file_validation(action.model_file, allow_rtf=True)
|
||||
except UploadValidationError:
|
||||
pass
|
||||
else:
|
||||
if kind == 'rtf':
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='rtf', source=source
|
||||
)
|
||||
if action.key in ('aggregationemail', 'resubmit'):
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='actions', source=source
|
||||
)
|
||||
if action.key in ('register-comment', 'sendmail'):
|
||||
for attachment in getattr(action, 'attachments', None) or []:
|
||||
if attachment and not ('{%' in attachment or '{{' in attachment):
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-expression',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
if action.key == 'webservice_call':
|
||||
self.check_remote_call_url(action.url, location_label=location_label, url=url, source=source)
|
||||
|
||||
for global_action in workflow.global_actions or []:
|
||||
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
|
||||
for trigger in global_action.triggers or []:
|
||||
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
|
||||
if trigger.key == 'timeout' and trigger.anchor == 'python':
|
||||
self.add_report_line(
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
category='python-expression',
|
||||
source=source,
|
||||
)
|
||||
break
|
||||
|
||||
def check_named_data_source(self, named_data_source):
|
||||
source = f'datasource:{named_data_source.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Data source'),
|
||||
'name': named_data_source.name,
|
||||
}
|
||||
url = named_data_source.get_admin_url()
|
||||
|
||||
self.check_data_source(
|
||||
getattr(named_data_source, 'data_source', None),
|
||||
location_label=location_label,
|
||||
url=url,
|
||||
source=source,
|
||||
)
|
||||
|
||||
def check_named_ws_call(self, named_ws_call):
|
||||
source = f'wscall:{named_ws_call.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Webservice'),
|
||||
'name': named_ws_call.name,
|
||||
}
|
||||
url = named_ws_call.get_admin_url()
|
||||
for string in named_ws_call.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
if named_ws_call.request and named_ws_call.request.get('url'):
|
||||
self.check_remote_call_url(
|
||||
named_ws_call.request['url'], location_label=location_label, url=url, source=source
|
||||
)
|
||||
|
||||
def check_mail_template(self, mail_template):
|
||||
source = f'mail_template:{mail_template.id}'
|
||||
location_label = _('%(title)s "%(name)s"') % {
|
||||
'title': _('Mail Template'),
|
||||
'name': mail_template.name,
|
||||
}
|
||||
url = mail_template.get_admin_url()
|
||||
for string in mail_template.get_computed_strings():
|
||||
self.check_string(string, location_label=location_label, url=url, source=source)
|
||||
for string in mail_template.attachments or []:
|
||||
# legacy was to have straight python expressions (not prefixed by "=").
|
||||
if not Template.is_template_string(string):
|
||||
self.add_report_line(
|
||||
location_label=location_label, url=url, category='python-expression', source=source
|
||||
)
|
||||
|
||||
def add_report_line(self, **kwargs):
|
||||
if kwargs not in self.report_lines:
|
||||
self.report_lines.append(kwargs)
|
||||
|
@ -400,3 +414,28 @@ class DeprecationsScanAfterJob(AfterJob):
|
|||
fd,
|
||||
indent=2,
|
||||
)
|
||||
|
||||
def check_deprecated_elements_in_object(self, obj):
|
||||
if not get_publisher().has_site_option('forbid-new-python-expressions'):
|
||||
|
||||
# for perfs, don't check object if nothing is forbidden
|
||||
return
|
||||
|
||||
self.report_lines = []
|
||||
objects = [obj]
|
||||
if isinstance(obj, Workflow):
|
||||
for status in obj.possible_status:
|
||||
for item in status.items:
|
||||
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
|
||||
objects.append(item.formdef)
|
||||
if obj.variables_formdef:
|
||||
objects.append(obj.variables_formdef)
|
||||
if obj.backoffice_fields_formdef:
|
||||
objects.append(obj.backoffice_fields_formdef)
|
||||
|
||||
self.check_objects(objects)
|
||||
|
||||
for report_line in self.report_lines:
|
||||
if 'python' in report_line['category'] and get_publisher().has_site_option(
|
||||
'forbid-new-python-expressions'
|
||||
):
|
||||
raise DeprecatedElementsDetected(_('Python expression detected'))
|
||||
|
|
|
@ -179,12 +179,17 @@ class BlockDef(StorableObject):
|
|||
return root
|
||||
|
||||
@classmethod
|
||||
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
|
||||
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
|
||||
try:
|
||||
tree = ET.parse(fd)
|
||||
except Exception:
|
||||
raise ValueError()
|
||||
blockdef = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
|
||||
blockdef = cls.import_from_xml_tree(
|
||||
tree,
|
||||
include_id=include_id,
|
||||
check_datasources=check_datasources,
|
||||
check_deprecated=check_deprecated,
|
||||
)
|
||||
|
||||
if blockdef.slug:
|
||||
try:
|
||||
|
@ -197,7 +202,11 @@ class BlockDef(StorableObject):
|
|||
return blockdef
|
||||
|
||||
@classmethod
|
||||
def import_from_xml_tree(cls, tree, include_id=False, check_datasources=True, **kwargs):
|
||||
def import_from_xml_tree(
|
||||
cls, tree, include_id=False, check_datasources=True, check_deprecated=True, **kwargs
|
||||
):
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
|
||||
blockdef = cls()
|
||||
if tree.find('name') is None or not tree.find('name').text:
|
||||
raise BlockdefImportError(_('Missing name'))
|
||||
|
@ -279,6 +288,14 @@ class BlockDef(StorableObject):
|
|||
details[_('Unknown datasources')].update(unknown_datasources)
|
||||
raise BlockdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
|
||||
|
||||
if check_deprecated:
|
||||
# check for deprecated elements
|
||||
job = DeprecationsScan()
|
||||
try:
|
||||
job.check_deprecated_elements_in_object(blockdef)
|
||||
except DeprecatedElementsDetected as e:
|
||||
raise BlockdefImportError(str(e))
|
||||
|
||||
return blockdef
|
||||
|
||||
def get_usage_fields(self):
|
||||
|
|
|
@ -51,6 +51,10 @@ from .qommon.xml_storage import XmlStorableObject
|
|||
data_source_functions = {}
|
||||
|
||||
|
||||
class NamedDataSourceImportError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DataSourceError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -913,9 +917,22 @@ class NamedDataSource(XmlStorableObject):
|
|||
return root
|
||||
|
||||
@classmethod
|
||||
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
|
||||
data_source = super().import_from_xml_tree(tree, include_id=include_id, **kwargs)
|
||||
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
|
||||
data_source = super().import_from_xml_tree(
|
||||
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
|
||||
)
|
||||
DataSourceCategory.object_category_xml_import(data_source, tree, include_id=include_id)
|
||||
|
||||
if check_deprecated:
|
||||
# check for deprecated elements
|
||||
job = DeprecationsScan()
|
||||
try:
|
||||
job.check_deprecated_elements_in_object(data_source)
|
||||
except DeprecatedElementsDetected as e:
|
||||
raise NamedDataSourceImportError(str(e))
|
||||
|
||||
return data_source
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -1465,7 +1465,9 @@ class FormDef(StorableObject):
|
|||
return root
|
||||
|
||||
@classmethod
|
||||
def import_from_xml(cls, fd, include_id=False, fix_on_error=False, check_datasources=True):
|
||||
def import_from_xml(
|
||||
cls, fd, include_id=False, fix_on_error=False, check_datasources=True, check_deprecated=True
|
||||
):
|
||||
try:
|
||||
tree = ET.parse(fd)
|
||||
except Exception:
|
||||
|
@ -1475,6 +1477,7 @@ class FormDef(StorableObject):
|
|||
include_id=include_id,
|
||||
fix_on_error=fix_on_error,
|
||||
check_datasources=check_datasources,
|
||||
check_deprecated=check_deprecated,
|
||||
)
|
||||
|
||||
if formdef.url_name:
|
||||
|
@ -1497,8 +1500,15 @@ class FormDef(StorableObject):
|
|||
|
||||
@classmethod
|
||||
def import_from_xml_tree(
|
||||
cls, tree, include_id=False, fix_on_error=False, snapshot=False, check_datasources=True
|
||||
cls,
|
||||
tree,
|
||||
include_id=False,
|
||||
fix_on_error=False,
|
||||
snapshot=False,
|
||||
check_datasources=True,
|
||||
check_deprecated=True,
|
||||
):
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
from wcs.carddef import CardDef
|
||||
|
||||
formdef = cls()
|
||||
|
@ -1714,6 +1724,14 @@ class FormDef(StorableObject):
|
|||
details[_('Unknown datasources')].update(unknown_datasources)
|
||||
raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
|
||||
|
||||
if check_deprecated:
|
||||
# check for deprecated elements
|
||||
job = DeprecationsScan()
|
||||
try:
|
||||
job.check_deprecated_elements_in_object(formdef)
|
||||
except DeprecatedElementsDetected as e:
|
||||
raise FormdefImportError(str(e))
|
||||
|
||||
return formdef
|
||||
|
||||
def finish_tests_xml_import(self):
|
||||
|
|
|
@ -183,9 +183,9 @@ class WcsPublisher(QommonPublisher):
|
|||
formdef.register_cronjobs()
|
||||
|
||||
def update_deprecations_report(self, **kwargs):
|
||||
from .backoffice.deprecations import DeprecationsScanAfterJob
|
||||
from .backoffice.deprecations import DeprecationsScan
|
||||
|
||||
DeprecationsScanAfterJob().execute()
|
||||
DeprecationsScan().execute()
|
||||
|
||||
def has_postgresql_config(self):
|
||||
return bool(self.cfg.get('postgresql', {}))
|
||||
|
@ -292,6 +292,8 @@ class WcsPublisher(QommonPublisher):
|
|||
'workflows_xml',
|
||||
'blockdefs_xml',
|
||||
'roles_xml',
|
||||
'datasources',
|
||||
'wscalls',
|
||||
):
|
||||
continue
|
||||
path = os.path.join(self.app_dir, f)
|
||||
|
@ -346,6 +348,22 @@ class WcsPublisher(QommonPublisher):
|
|||
if os.path.split(f)[0] in results:
|
||||
results[os.path.split(f)[0]] += 1
|
||||
|
||||
# import datasources and wscalls
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.wscalls import NamedWsCall
|
||||
lguerin
commented
pour passer sur les dépréciations, sinon on faisait juste une bête copie du fichier pour passer sur les dépréciations, sinon on faisait juste une bête copie du fichier
|
||||
|
||||
for f in z.namelist():
|
||||
if os.path.dirname(f) == 'datasources' and os.path.basename(f):
|
||||
with z.open(f) as fd:
|
||||
data_source = NamedDataSource.import_from_xml(fd, include_id=True)
|
||||
data_source.store()
|
||||
results['datasources'] += 1
|
||||
if os.path.dirname(f) == 'wscalls' and os.path.basename(f):
|
||||
with z.open(f) as fd:
|
||||
wscall = NamedWsCall.import_from_xml(fd, include_id=True)
|
||||
wscall.store()
|
||||
results['wscalls'] += 1
|
||||
|
||||
# second pass, fields blocks
|
||||
from wcs.blocks import BlockDef
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ class XmlStorableObject(StorableObject):
|
|||
first_byte = fd.read(1)
|
||||
fd.seek(0)
|
||||
if first_byte == b'<':
|
||||
return cls.import_from_xml(fd, include_id=True)
|
||||
return cls.import_from_xml(fd, include_id=True, check_deprecated=False)
|
||||
lguerin
commented
ne pas checker les dépréciations lorsqu'on charge un objet depuis sa définition xml stockée sur l'instance ne pas checker les dépréciations lorsqu'on charge un objet depuis sa définition xml stockée sur l'instance
|
||||
else:
|
||||
obj = StorableObject.storage_load(fd)
|
||||
obj._upgrade_must_store = True
|
||||
|
@ -84,15 +84,15 @@ class XmlStorableObject(StorableObject):
|
|||
sub.text = role.name
|
||||
|
||||
@classmethod
|
||||
def import_from_xml(cls, fd, include_id=False):
|
||||
def import_from_xml(cls, fd, include_id=False, check_deprecated=True):
|
||||
try:
|
||||
tree = ET.parse(fd)
|
||||
except Exception:
|
||||
raise ValueError()
|
||||
return cls.import_from_xml_tree(tree, include_id=include_id)
|
||||
return cls.import_from_xml_tree(tree, include_id=include_id, check_deprecated=check_deprecated)
|
||||
|
||||
@classmethod
|
||||
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
|
||||
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
|
||||
obj = cls()
|
||||
|
||||
# if the tree we get is actually a ElementTree for real, we get its
|
||||
|
|
|
@ -1233,12 +1233,17 @@ class Workflow(StorableObject):
|
|||
return root
|
||||
|
||||
@classmethod
|
||||
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
|
||||
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
|
||||
try:
|
||||
tree = ET.parse(fd)
|
||||
except Exception:
|
||||
raise ValueError()
|
||||
workflow = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
|
||||
workflow = cls.import_from_xml_tree(
|
||||
tree,
|
||||
include_id=include_id,
|
||||
check_datasources=check_datasources,
|
||||
check_deprecated=check_deprecated,
|
||||
)
|
||||
|
||||
if workflow.slug and cls.get_by_slug(workflow.slug):
|
||||
# slug already in use, reset so a new one will be generated on store()
|
||||
|
@ -1247,7 +1252,11 @@ class Workflow(StorableObject):
|
|||
return workflow
|
||||
|
||||
@classmethod
|
||||
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
|
||||
def import_from_xml_tree(
|
||||
cls, tree, include_id=False, snapshot=False, check_datasources=True, check_deprecated=True
|
||||
):
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
|
||||
workflow = cls()
|
||||
if tree.find('name') is None or not tree.find('name').text:
|
||||
raise WorkflowImportError(_('Missing name'))
|
||||
|
@ -1359,6 +1368,14 @@ class Workflow(StorableObject):
|
|||
_('Unknown referenced objects'), details=unknown_referenced_objects_details
|
||||
)
|
||||
|
||||
if check_deprecated:
|
||||
# check for deprecated elements
|
||||
job = DeprecationsScan()
|
||||
try:
|
||||
job.check_deprecated_elements_in_object(workflow)
|
||||
except DeprecatedElementsDetected as e:
|
||||
raise WorkflowImportError(str(e))
|
||||
|
||||
return workflow
|
||||
|
||||
def get_list_of_roles(
|
||||
|
|
|
@ -44,6 +44,10 @@ from .qommon.template import Template
|
|||
from .qommon.xml_storage import XmlStorableObject
|
||||
|
||||
|
||||
class NamedWsCallImportError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PayloadError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -280,6 +284,24 @@ class NamedWsCall(XmlStorableObject):
|
|||
if self.request.get('post_data'):
|
||||
yield from self.request.get('post_data').values()
|
||||
|
||||
@classmethod
|
||||
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
|
||||
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
|
||||
|
||||
wscall = super().import_from_xml_tree(
|
||||
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
|
||||
)
|
||||
|
||||
if check_deprecated:
|
||||
# check for deprecated elements
|
||||
job = DeprecationsScan()
|
||||
try:
|
||||
job.check_deprecated_elements_in_object(wscall)
|
||||
except DeprecatedElementsDetected as e:
|
||||
raise NamedWsCallImportError(str(e))
|
||||
|
||||
return wscall
|
||||
|
||||
def export_request_to_xml(self, element, attribute_name, **kwargs):
|
||||
request = getattr(self, attribute_name)
|
||||
for attr in ('url', 'request_signature_key', 'method', 'timeout', 'cache_duration'):
|
||||
|
|
Loading…
Reference in New Issue
si pas d'option, return, pas la peine de perdre du temps à parcourir l'objet