From eba79fdc7767d1274215befdaa2774edeead9b9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laur=C3=A9line=20Gu=C3=A9rin?= Date: Mon, 4 Mar 2024 15:02:16 +0100 Subject: [PATCH] depreciations: errors from deprecated elements on import (#72093) --- tests/admin_pages/test_block.py | 19 ++++++ tests/admin_pages/test_datasource.py | 17 ++++++ tests/admin_pages/test_form.py | 50 +++++++++++++++ tests/admin_pages/test_settings.py | 87 ++++++++++++++++++++++++++ tests/admin_pages/test_workflow.py | 22 +++++++ tests/admin_pages/test_wscall.py | 19 ++++++ tests/api/test_export_import.py | 91 ++++++++++++++++++++++++++++ wcs/admin/data_sources.py | 12 +++- wcs/admin/settings.py | 8 ++- wcs/admin/wscalls.py | 13 +++- wcs/api_export_import.py | 19 ++++-- wcs/publisher.py | 18 ++++++ wcs/qommon/xml_storage.py | 2 +- 13 files changed, 364 insertions(+), 13 deletions(-) diff --git a/tests/admin_pages/test_block.py b/tests/admin_pages/test_block.py index c0246e468..fef9e777e 100644 --- a/tests/admin_pages/test_block.py +++ b/tests/admin_pages/test_block.py @@ -220,6 +220,25 @@ def test_block_export_import(pub): assert 'Invalid File (Unknown referenced objects)' in resp assert '' in resp + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + block.fields = [ + fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}), + ] + block.store() + resp = app.get('/backoffice/forms/blocks/%s/' % block.id) + resp = resp.click(href=re.compile('^export$')) + xml_export = resp.text + resp = app.get('/backoffice/forms/blocks/') + resp = resp.click(href='import') + resp.form['file'] = Upload('block', xml_export.encode('utf-8')) + resp = resp.form.submit() + assert 'Python expression detected' in resp + def test_block_delete(pub): create_superuser(pub) diff --git a/tests/admin_pages/test_datasource.py b/tests/admin_pages/test_datasource.py index e37c8b66a..be4b851ca 100644 --- a/tests/admin_pages/test_datasource.py +++ b/tests/admin_pages/test_datasource.py @@ -1039,6 +1039,23 @@ def test_data_sources_import(pub): resp = resp.form.submit() assert 'Invalid File' in resp.text + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])} + data_source.store() + resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) + resp = resp.click(href=re.compile('^export$')) + xml_export = resp.text + resp = app.get('/backoffice/settings/data-sources/') + resp = resp.click(href='import') + resp.form['file'] = Upload('ds', xml_export.encode('utf-8')) + resp = resp.form.submit() + assert 'Python expression detected' in resp + def test_data_sources_edit_slug(pub): create_superuser(pub) diff --git a/tests/admin_pages/test_form.py b/tests/admin_pages/test_form.py index f4ac3c92a..fd0713334 100644 --- a/tests/admin_pages/test_form.py +++ b/tests/admin_pages/test_form.py @@ -4077,6 +4077,39 @@ def test_form_overwrite(pub): assert resp.pyquery('.error').text() == 'Invalid File' +def test_form_export_import_export(pub): + create_superuser(pub) + create_role(pub) + + FormDef.wipe() + formdef = FormDef() + formdef.name = 'form title' + formdef.table_name = 'xxx' + formdef.fields = [] + formdef.store() + + app = login(get_app(pub)) + + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + formdef.fields = [ + fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}), + ] + formdef.store() + resp = app.get('/backoffice/forms/%s/' % formdef.id) + resp = resp.click(href=re.compile('^export$')) + xml_export = resp.text + resp = app.get('/backoffice/forms/') + resp = resp.click(href='import') + resp.form['file'] = Upload('formdef', xml_export.encode('utf-8')) + resp = resp.form.submit() + assert 'Python expression detected' in resp + + def test_form_export_import_export_overwrite(pub): create_superuser(pub) create_role(pub) @@ -4134,6 +4167,23 @@ def test_form_export_import_export_overwrite(pub): field_ow = formdef_overwrited.fields[i] assert (field.id, field.label, field.key) == (field_ow.id, field_ow.label, field_ow.key) + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + formdef2.fields = [ + fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}), + ] + formdef2.store() + formdef2_xml = ET.tostring(formdef2.export_to_xml(include_id=True)) + resp = app.get('/backoffice/forms/%s/' % formdef.id) + resp = resp.click(href='overwrite') + resp.forms[0]['file'] = Upload('formdef.wcs', formdef2_xml) + resp = resp.forms[0].submit() + assert 'Python expression detected' in resp + def test_form_overwrite_from_url(pub): create_superuser(pub) diff --git a/tests/admin_pages/test_settings.py b/tests/admin_pages/test_settings.py index 79eb6ce67..a749585f4 100644 --- a/tests/admin_pages/test_settings.py +++ b/tests/admin_pages/test_settings.py @@ -399,6 +399,93 @@ def test_settings_export_import(pub): resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) resp = resp.form.submit('submit').follow() assert 'Unknown referenced objects [Unknown datasources: foobar]' in resp + BlockDef.wipe() + + # python expressions + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + + formdef = FormDef() + formdef.name = 'foobar' + formdef.fields = [ + fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}), + ] + formdef.store() + resp = app.get('/backoffice/settings/export') + resp = resp.form.submit('submit').follow() + resp = resp.click('Download Export') + zip_content = io.BytesIO(resp.body) + resp = app.get('/backoffice/settings/import') + resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) + resp.form['confirm'].checked = True + resp = resp.form.submit('submit').follow() + assert 'Python expression detected' in resp + FormDef.wipe() + + blockdef = BlockDef() + blockdef.name = 'foobar' + blockdef.fields = [ + fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}), + ] + blockdef.store() + resp = app.get('/backoffice/settings/export') + resp = resp.form.submit('submit').follow() + resp = resp.click('Download Export') + zip_content = io.BytesIO(resp.body) + resp = app.get('/backoffice/settings/import') + resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) + resp = resp.form.submit('submit').follow() + assert 'Python expression detected' in resp + BlockDef.wipe() + + workflow = Workflow(name='test') + st0 = workflow.add_status('Status0', 'st0') + sendsms = st0.add_action('sendsms', id='_sendsms') + sendsms.to = 'xxx' + sendsms.condition = {'type': 'python', 'value': 'True'} + sendsms.parent = st0 + st0.items.append(sendsms) + workflow.store() + resp = app.get('/backoffice/settings/export') + resp = resp.form.submit('submit').follow() + resp = resp.click('Download Export') + zip_content = io.BytesIO(resp.body) + resp = app.get('/backoffice/settings/import') + resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) + resp.form['confirm'].checked = True + resp = resp.form.submit('submit').follow() + assert 'Python expression detected' in resp + Workflow.wipe() + + data_source = NamedDataSource(name='ds_python') + data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])} + data_source.store() + resp = app.get('/backoffice/settings/export') + resp = resp.form.submit('submit').follow() + resp = resp.click('Download Export') + zip_content = io.BytesIO(resp.body) + resp = app.get('/backoffice/settings/import') + resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) + resp = resp.form.submit('submit').follow() + assert 'Python expression detected' in resp + NamedDataSource.wipe() + + wscall = NamedWsCall() + wscall.name = 'Hello' + wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}} + wscall.store() + resp = app.get('/backoffice/settings/export') + resp = resp.form.submit('submit').follow() + resp = resp.click('Download Export') + zip_content = io.BytesIO(resp.body) + resp = app.get('/backoffice/settings/import') + resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) + resp = resp.form.submit('submit').follow() + assert 'Python expression detected' in resp + NamedWsCall.wipe() # check a backup of settings has been created assert [x for x in os.listdir(pub.app_dir) if x.startswith('config.pck.backup-')] diff --git a/tests/admin_pages/test_workflow.py b/tests/admin_pages/test_workflow.py index 0c1d2ec2c..e0b0e8986 100644 --- a/tests/admin_pages/test_workflow.py +++ b/tests/admin_pages/test_workflow.py @@ -990,6 +990,28 @@ def test_workflows_export_import(pub): assert 'Invalid File' in resp.text assert Workflow.count() == 2 + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + st0 = workflow.add_status('Status0', 'st0') + sendsms = st0.add_action('sendsms', id='_sendsms') + sendsms.to = 'xxx' + sendsms.condition = {'type': 'python', 'value': 'True'} + sendsms.parent = st0 + st0.items.append(sendsms) + workflow.store() + resp = app.get('/backoffice/workflows/%s/' % workflow.id) + resp = resp.click(href=re.compile('^export$')) + xml_export = resp.text + resp = app.get('/backoffice/workflows/') + resp = resp.click('Import') + resp.form['file'] = Upload('wf.wcs', xml_export.encode('utf-8')) + resp = resp.form.submit('submit') + assert 'Python expression detected' in resp + def test_workflows_import_from_url(pub): create_superuser(pub) diff --git a/tests/admin_pages/test_wscall.py b/tests/admin_pages/test_wscall.py index 1d01ada1c..c7fc2efcd 100644 --- a/tests/admin_pages/test_wscall.py +++ b/tests/admin_pages/test_wscall.py @@ -1,4 +1,6 @@ import io +import os +import re import xml.etree.ElementTree as ET import pytest @@ -207,6 +209,23 @@ def test_wscalls_import(pub, wscall): resp = resp.form.submit() assert 'Invalid File' in resp.text + # python expression + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}} + wscall.store() + resp = app.get('/backoffice/settings/wscalls/%s/' % wscall.id) + resp = resp.click(href=re.compile('^export$')) + xml_export = resp.text + resp = app.get('/backoffice/settings/wscalls/') + resp = resp.click(href='import') + resp.form['file'] = Upload('wscall', xml_export.encode('utf-8')) + resp = resp.form.submit() + assert 'Python expression detected' in resp + def test_wscalls_empty_param_values(pub): create_superuser(pub) diff --git a/tests/api/test_export_import.py b/tests/api/test_export_import.py index 7fc568221..b9e0ff663 100644 --- a/tests/api/test_export_import.py +++ b/tests/api/test_export_import.py @@ -2046,6 +2046,97 @@ def test_export_import_workflow_options(pub): assert formdef.workflow_options == {'foo': 'bar2'} +def test_export_import_with_deprecated(pub): + pub.load_site_options() + if not pub.site_options.has_section('options'): + pub.site_options.add_section('options') + pub.site_options.set('options', 'forbid-new-python-expressions', 'true') + with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: + pub.site_options.write(fd) + + formdef = FormDef() + formdef.name = 'foo' + formdef.fields = [ + PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}), + ] + formdef.store() + bundle = create_bundle( + [ + {'type': 'forms', 'slug': 'foo', 'name': 'foo'}, + ], + ('forms/foo', formdef), + ) + resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle) + afterjob_url = resp.json['url'] + resp = get_app(pub).put(sign_uri(afterjob_url)) + assert resp.json['data']['status'] == 'failed' + + blockdef = BlockDef() + blockdef.name = 'foo' + blockdef.fields = [ + StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}), + ] + blockdef.store() + bundle = create_bundle( + [ + {'type': 'blocks', 'slug': 'foo', 'name': 'foo'}, + ], + ('blocks/foo', blockdef), + ) + resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle) + afterjob_url = resp.json['url'] + resp = get_app(pub).put(sign_uri(afterjob_url)) + assert resp.json['data']['status'] == 'failed' + + workflow = Workflow(name='foo') + st0 = workflow.add_status('Status0', 'st0') + sendsms = st0.add_action('sendsms', id='_sendsms') + sendsms.to = 'xxx' + sendsms.condition = {'type': 'python', 'value': 'True'} + sendsms.parent = st0 + st0.items.append(sendsms) + workflow.store() + bundle = create_bundle( + [ + {'type': 'workflows', 'slug': 'foo', 'name': 'foo'}, + ], + ('workflows/foo', workflow), + ) + resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle) + afterjob_url = resp.json['url'] + resp = get_app(pub).put(sign_uri(afterjob_url)) + assert resp.json['data']['status'] == 'failed' + + data_source = NamedDataSource(name='foo') + data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])} + data_source.store() + bundle = create_bundle( + [ + {'type': 'data-sources', 'slug': 'foo', 'name': 'foo'}, + ], + ('data-sources/foo', data_source), + ) + resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle) + afterjob_url = resp.json['url'] + resp = get_app(pub).put(sign_uri(afterjob_url)) + assert resp.json['data']['status'] == 'failed' + + wscall = NamedWsCall() + wscall.name = 'foo' + wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}} + wscall.store() + bundle = create_bundle( + [ + {'type': 'wscalls', 'slug': 'foo', 'name': 'foo'}, + ], + ('wscalls/foo', wscall), + ) + resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle) + afterjob_url = resp.json['url'] + resp = get_app(pub).put(sign_uri(afterjob_url)) + assert resp.json['data']['status'] == 'failed' + + def test_api_export_import_invalid_slug(pub): pub.role_class.wipe() role1 = pub.role_class(name='Test role 1') diff --git a/wcs/admin/data_sources.py b/wcs/admin/data_sources.py index 9523f045f..2a42f919c 100644 --- a/wcs/admin/data_sources.py +++ b/wcs/admin/data_sources.py @@ -27,6 +27,7 @@ from wcs.categories import CardDefCategory, DataSourceCategory from wcs.data_sources import ( DataSourceSelectionWidget, NamedDataSource, + NamedDataSourceImportError, RefreshAgendas, get_structured_items, has_chrono, @@ -667,15 +668,22 @@ class NamedDataSourcesDirectory(Directory): def import_submit(self, form): fp = form.get_widget('file').parse().fp - error = False + error, reason = False, None try: datasource = NamedDataSource.import_from_xml(fp) get_session().message = ('info', _('This datasource has been successfully imported.')) + except NamedDataSourceImportError as e: + error = True + reason = str(e) except ValueError: error = True if error: - form.set_error('file', _('Invalid File')) + if reason: + msg = _('Invalid File (%s)') % reason + else: + msg = _('Invalid File') + form.set_error('file', msg) raise ValueError() try: diff --git a/wcs/admin/settings.py b/wcs/admin/settings.py index 50ea7fde8..101fbe041 100644 --- a/wcs/admin/settings.py +++ b/wcs/admin/settings.py @@ -36,7 +36,7 @@ from quixote.html import TemplateIO, htmltext from wcs.api_access import ApiAccess from wcs.blocks import BlockDef, BlockdefImportError from wcs.carddef import CardDef -from wcs.data_sources import NamedDataSource +from wcs.data_sources import NamedDataSource, NamedDataSourceImportError from wcs.fields.map import MapOptionsMixin from wcs.formdef import FormDef, FormdefImportError, get_formdefs_of_all_kinds from wcs.qommon import _, audit, errors, get_cfg, ident, misc, pgettext_lazy, template @@ -61,6 +61,7 @@ from wcs.qommon.form import ( TextWidget, ) from wcs.workflows import Workflow, WorkflowImportError +from wcs.wscalls import NamedWsCallImportError from .api_access import ApiAccessDirectory from .data_sources import NamedDataSourcesDirectory @@ -1512,7 +1513,10 @@ class SiteImportAfterJob(AfterJob): msg = _(e.msg) % e.msg_args if e.details: msg += ' [%s]' % e.details - error = _('Failed to import a workflow (%s); site import did not complete.') % msg + error = _('Failed to import objects (%s); site import did not complete.') % msg + except (NamedDataSourceImportError, NamedWsCallImportError) as e: + results = None + error = _('Failed to import objects (%s); site import did not complete.') % str(e) self.results = results if error: diff --git a/wcs/admin/wscalls.py b/wcs/admin/wscalls.py index 0bd49d99b..5d9b009c9 100644 --- a/wcs/admin/wscalls.py +++ b/wcs/admin/wscalls.py @@ -26,7 +26,7 @@ from wcs.backoffice.snapshots import SnapshotsDirectory from wcs.qommon import _, errors, misc, template from wcs.qommon.form import CheckboxWidget, FileWidget, Form, HtmlWidget, SlugWidget, StringWidget, TextWidget from wcs.utils import grep_strings -from wcs.wscalls import NamedWsCall, WsCallRequestWidget +from wcs.wscalls import NamedWsCall, NamedWsCallImportError, WsCallRequestWidget class NamedWsCallUI: @@ -317,15 +317,22 @@ class NamedWsCallsDirectory(Directory): def import_submit(self, form): fp = form.get_widget('file').parse().fp - error = False + error, reason = False, None try: wscall = NamedWsCall.import_from_xml(fp) get_session().message = ('info', _('This webservice call has been successfully imported.')) + except NamedWsCallImportError as e: + error = True + reason = str(e) except ValueError: error = True if error: - form.set_error('file', _('Invalid File')) + if reason: + msg = _('Invalid File (%s)') % reason + else: + msg = _('Invalid File') + form.set_error('file', msg) raise ValueError() try: diff --git a/wcs/api_export_import.py b/wcs/api_export_import.py index 2b705c0f8..d1fc722aa 100644 --- a/wcs/api_export_import.py +++ b/wcs/api_export_import.py @@ -26,7 +26,7 @@ from quixote import get_publisher, get_response from wcs.api_utils import is_url_signed from wcs.applications import Application, ApplicationElement -from wcs.blocks import BlockDef +from wcs.blocks import BlockDef, BlockdefImportError from wcs.carddef import CardDef from wcs.categories import ( BlockCategory, @@ -38,12 +38,12 @@ from wcs.categories import ( WorkflowCategory, ) from wcs.comment_templates import CommentTemplate -from wcs.data_sources import NamedDataSource -from wcs.formdef import FormDef +from wcs.data_sources import NamedDataSource, NamedDataSourceImportError +from wcs.formdef import FormDef, FormdefImportError from wcs.mail_templates import MailTemplate from wcs.sql import Equal, Role -from wcs.workflows import Workflow -from wcs.wscalls import NamedWsCall +from wcs.workflows import Workflow, WorkflowImportError +from wcs.wscalls import NamedWsCall, NamedWsCallImportError from .qommon import _ from .qommon.afterjobs import AfterJob @@ -454,6 +454,15 @@ class BundleImportJob(AfterJob): # remove obsolete application elements self.unlink_obsolete_objects() + + except ( + BlockdefImportError, + FormdefImportError, + WorkflowImportError, + NamedDataSourceImportError, + NamedWsCallImportError, + ) as e: + error = str(e) except tarfile.TarError: error = _('Invalid tar file.') except BundleKeyError as e: diff --git a/wcs/publisher.py b/wcs/publisher.py index 167167bf4..48c74cdf2 100644 --- a/wcs/publisher.py +++ b/wcs/publisher.py @@ -292,6 +292,8 @@ class WcsPublisher(QommonPublisher): 'workflows_xml', 'blockdefs_xml', 'roles_xml', + 'datasources', + 'wscalls', ): continue path = os.path.join(self.app_dir, f) @@ -346,6 +348,22 @@ class WcsPublisher(QommonPublisher): if os.path.split(f)[0] in results: results[os.path.split(f)[0]] += 1 + # import datasources and wscalls + from wcs.data_sources import NamedDataSource + from wcs.wscalls import NamedWsCall + + for f in z.namelist(): + if os.path.dirname(f) == 'datasources' and os.path.basename(f): + with z.open(f) as fd: + data_source = NamedDataSource.import_from_xml(fd, include_id=True) + data_source.store() + results['datasources'] += 1 + if os.path.dirname(f) == 'wscalls' and os.path.basename(f): + with z.open(f) as fd: + wscall = NamedWsCall.import_from_xml(fd, include_id=True) + wscall.store() + results['wscalls'] += 1 + # second pass, fields blocks from wcs.blocks import BlockDef diff --git a/wcs/qommon/xml_storage.py b/wcs/qommon/xml_storage.py index 073546633..427c74562 100644 --- a/wcs/qommon/xml_storage.py +++ b/wcs/qommon/xml_storage.py @@ -92,7 +92,7 @@ class XmlStorableObject(StorableObject): return cls.import_from_xml_tree(tree, include_id=include_id, check_deprecated=check_deprecated) @classmethod - def import_from_xml_tree(cls, tree, include_id=False, **kwargs): + def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs): obj = cls() # if the tree we get is actually a ElementTree for real, we get its