deprecations: forbid import of new python expressions (#72093) #1212

Merged
lguerin merged 6 commits from wip/72093-depreciation-python-import into main 2024-03-15 16:34:28 +01:00
21 changed files with 820 additions and 217 deletions

View File

@ -53,6 +53,7 @@ def teardown_module(module):
def test_empty_site(pub):
pub.user_class.wipe()
resp = get_app(pub).get('/backoffice/users/')
resp = resp.click('New User')
resp = get_app(pub).get('/backoffice/settings/')

View File

@ -220,6 +220,25 @@ def test_block_export_import(pub):
assert 'Invalid File (Unknown referenced objects)' in resp
assert '<ul><li>Unknown datasources: foobar</li></ul>' in resp
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
block.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
block.store()
resp = app.get('/backoffice/forms/blocks/%s/' % block.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/forms/blocks/')
resp = resp.click(href='import')
resp.form['file'] = Upload('block', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_block_delete(pub):
create_superuser(pub)

View File

@ -1039,6 +1039,23 @@ def test_data_sources_import(pub):
resp = resp.form.submit()
assert 'Invalid File' in resp.text
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/settings/data-sources/')
resp = resp.click(href='import')
resp.form['file'] = Upload('ds', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_data_sources_edit_slug(pub):
create_superuser(pub)

View File

@ -7,11 +7,11 @@ import pytest
from quixote.http_request import Upload as QuixoteUpload
from wcs import fields
from wcs.backoffice.deprecations import DeprecationsScanAfterJob
from wcs.blocks import BlockDef
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.formdef import FormDef, FormdefImportError
from wcs.mail_templates import MailTemplate
from wcs.qommon.form import UploadedFile
from wcs.qommon.http_request import HTTPRequest
@ -23,8 +23,8 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from wcs.wscalls import NamedWsCall
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_superuser
@ -293,7 +293,7 @@ def test_deprecations_choice_label(pub):
accept = st0.add_action('choice', id='_choice')
accept.label = '[test] action'
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
@ -305,7 +305,7 @@ def test_deprecations_skip_invalid_ezt(pub):
display = st0.add_action('displaymsg')
display.message = 'message with invalid [if-any] ezt'
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
@ -316,19 +316,19 @@ def test_deprecations_ignore_ezt_looking_tag(pub):
sendmail = st0.add_action('sendmail')
sendmail.subject = '[REMINDER] your appointment'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
sendmail.subject = '[reminder]'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines
sendmail.subject = '[if-any plop]test[end]'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines
@ -397,7 +397,7 @@ def test_deprecations_document_models(pub):
export_to2.by = ['_submitter']
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines == [
{
@ -446,7 +446,7 @@ def test_deprecations_inspect_pages(pub):
display.message = 'message with [ezt] info'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
create_superuser(pub)
@ -485,7 +485,7 @@ def test_deprecations_inspect_pages(pub):
display.message = 'message with {{django}} info'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
resp = app.get(formdef.get_admin_url() + 'inspect')
@ -506,7 +506,7 @@ def test_deprecations_inspect_pages_old_format(pub):
]
formdef.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
with open(os.path.join(pub.app_dir, 'deprecations.json')) as f:
@ -525,3 +525,124 @@ def test_deprecations_inspect_pages_old_format(pub):
resp = app.get('/backoffice/studio/deprecations/')
assert resp.pyquery('.section--python-condition li a')
def test_deprecations_on_import(pub):
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
blockdef = BlockDef()
blockdef.name = 'foobar'
blockdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
workflow = Workflow(name='test')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
data_source = NamedDataSource(name='ds_python')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
mail_template = MailTemplate() # no python expression in mail templates
mail_template.name = 'Hello2'
mail_template.subject = 'plop'
mail_template.body = 'plop [ezt] plop'
mail_template.store()
job = DeprecationsScan()
job.check_deprecated_elements_in_object(formdef)
formdef_xml = formdef.export_to_xml()
FormDef.import_from_xml_tree(formdef_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(blockdef)
blockdef_xml = blockdef.export_to_xml()
BlockDef.import_from_xml_tree(blockdef_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(workflow)
workflow_xml = workflow.export_to_xml()
Workflow.import_from_xml_tree(workflow_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(data_source)
data_source_xml = data_source.export_to_xml()
NamedDataSource.import_from_xml_tree(data_source_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(wscall)
wscall_xml = wscall.export_to_xml()
NamedWsCall.import_from_xml_tree(wscall_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(mail_template)
mail_template_xml = mail_template.export_to_xml()
MailTemplate.import_from_xml_tree(mail_template_xml)
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(formdef)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(FormdefImportError) as excinfo:
FormDef.import_from_xml_tree(formdef_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(blockdef)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(BlockdefImportError) as excinfo:
BlockDef.import_from_xml_tree(blockdef_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(workflow)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(WorkflowImportError) as excinfo:
Workflow.import_from_xml_tree(workflow_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(data_source)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(NamedDataSourceImportError) as excinfo:
NamedDataSource.import_from_xml_tree(data_source_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(wscall)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(NamedWsCallImportError) as excinfo:
NamedWsCall.import_from_xml_tree(wscall_xml)
assert str(excinfo.value) == 'Python expression detected'
# no python expressions
job = DeprecationsScan()
job.check_deprecated_elements_in_object(mail_template)
MailTemplate.import_from_xml_tree(mail_template_xml)

View File

@ -4077,6 +4077,39 @@ def test_form_overwrite(pub):
assert resp.pyquery('.error').text() == 'Invalid File'
def test_form_export_import_export(pub):
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.table_name = 'xxx'
formdef.fields = []
formdef.store()
app = login(get_app(pub))
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
formdef.store()
resp = app.get('/backoffice/forms/%s/' % formdef.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/forms/')
resp = resp.click(href='import')
resp.form['file'] = Upload('formdef', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_form_export_import_export_overwrite(pub):
create_superuser(pub)
create_role(pub)
@ -4134,6 +4167,23 @@ def test_form_export_import_export_overwrite(pub):
field_ow = formdef_overwrited.fields[i]
assert (field.id, field.label, field.key) == (field_ow.id, field_ow.label, field_ow.key)
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef2.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
formdef2.store()
formdef2_xml = ET.tostring(formdef2.export_to_xml(include_id=True))
resp = app.get('/backoffice/forms/%s/' % formdef.id)
resp = resp.click(href='overwrite')
resp.forms[0]['file'] = Upload('formdef.wcs', formdef2_xml)
resp = resp.forms[0].submit()
assert 'Python expression detected' in resp
def test_form_overwrite_from_url(pub):
create_superuser(pub)

View File

@ -399,6 +399,93 @@ def test_settings_export_import(pub):
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Unknown referenced objects [Unknown datasources: foobar]' in resp
BlockDef.wipe()
# python expressions
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp.form['confirm'].checked = True
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
FormDef.wipe()
blockdef = BlockDef()
blockdef.name = 'foobar'
blockdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
BlockDef.wipe()
workflow = Workflow(name='test')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp.form['confirm'].checked = True
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
Workflow.wipe()
data_source = NamedDataSource(name='ds_python')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
NamedDataSource.wipe()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
NamedWsCall.wipe()
# check a backup of settings has been created
assert [x for x in os.listdir(pub.app_dir) if x.startswith('config.pck.backup-')]

View File

@ -990,6 +990,28 @@ def test_workflows_export_import(pub):
assert 'Invalid File' in resp.text
assert Workflow.count() == 2
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/workflows/')
resp = resp.click('Import')
resp.form['file'] = Upload('wf.wcs', xml_export.encode('utf-8'))
resp = resp.form.submit('submit')
assert 'Python expression detected' in resp
def test_workflows_import_from_url(pub):
create_superuser(pub)

View File

@ -1,4 +1,6 @@
import io
import os
import re
import xml.etree.ElementTree as ET
import pytest
@ -207,6 +209,23 @@ def test_wscalls_import(pub, wscall):
resp = resp.form.submit()
assert 'Invalid File' in resp.text
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
resp = app.get('/backoffice/settings/wscalls/%s/' % wscall.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/settings/wscalls/')
resp = resp.click(href='import')
resp.form['file'] = Upload('wscall', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_wscalls_empty_param_values(pub):
create_superuser(pub)

View File

@ -2046,6 +2046,97 @@ def test_export_import_workflow_options(pub):
assert formdef.workflow_options == {'foo': 'bar2'}
def test_export_import_with_deprecated(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
bundle = create_bundle(
[
{'type': 'forms', 'slug': 'foo', 'name': 'foo'},
],
('forms/foo', formdef),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
blockdef = BlockDef()
blockdef.name = 'foo'
blockdef.fields = [
StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
bundle = create_bundle(
[
{'type': 'blocks', 'slug': 'foo', 'name': 'foo'},
],
('blocks/foo', blockdef),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
workflow = Workflow(name='foo')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
bundle = create_bundle(
[
{'type': 'workflows', 'slug': 'foo', 'name': 'foo'},
],
('workflows/foo', workflow),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
data_source = NamedDataSource(name='foo')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
bundle = create_bundle(
[
{'type': 'data-sources', 'slug': 'foo', 'name': 'foo'},
],
('data-sources/foo', data_source),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
wscall = NamedWsCall()
wscall.name = 'foo'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
bundle = create_bundle(
[
{'type': 'wscalls', 'slug': 'foo', 'name': 'foo'},
],
('wscalls/foo', wscall),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
def test_api_export_import_invalid_slug(pub):
pub.role_class.wipe()
role1 = pub.role_class(name='Test role 1')

View File

@ -27,6 +27,7 @@ from wcs.categories import CardDefCategory, DataSourceCategory
from wcs.data_sources import (
DataSourceSelectionWidget,
NamedDataSource,
NamedDataSourceImportError,
RefreshAgendas,
get_structured_items,
has_chrono,
@ -667,15 +668,22 @@ class NamedDataSourcesDirectory(Directory):
def import_submit(self, form):
fp = form.get_widget('file').parse().fp
error = False
error, reason = False, None
try:
datasource = NamedDataSource.import_from_xml(fp)
get_session().message = ('info', _('This datasource has been successfully imported.'))
except NamedDataSourceImportError as e:
error = True
reason = str(e)
except ValueError:
error = True
if error:
form.set_error('file', _('Invalid File'))
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
form.set_error('file', msg)
raise ValueError()
try:

View File

@ -36,7 +36,7 @@ from quixote.html import TemplateIO, htmltext
from wcs.api_access import ApiAccess
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.fields.map import MapOptionsMixin
from wcs.formdef import FormDef, FormdefImportError, get_formdefs_of_all_kinds
from wcs.qommon import _, audit, errors, get_cfg, ident, misc, pgettext_lazy, template
@ -61,6 +61,7 @@ from wcs.qommon.form import (
TextWidget,
)
from wcs.workflows import Workflow, WorkflowImportError
from wcs.wscalls import NamedWsCallImportError
from .api_access import ApiAccessDirectory
from .data_sources import NamedDataSourcesDirectory
@ -1512,7 +1513,10 @@ class SiteImportAfterJob(AfterJob):
msg = _(e.msg) % e.msg_args
if e.details:
msg += ' [%s]' % e.details
error = _('Failed to import a workflow (%s); site import did not complete.') % msg
error = _('Failed to import objects (%s); site import did not complete.') % msg
except (NamedDataSourceImportError, NamedWsCallImportError) as e:
results = None
error = _('Failed to import objects (%s); site import did not complete.') % str(e)
self.results = results
if error:

View File

@ -26,7 +26,7 @@ from wcs.backoffice.snapshots import SnapshotsDirectory
from wcs.qommon import _, errors, misc, template
from wcs.qommon.form import CheckboxWidget, FileWidget, Form, HtmlWidget, SlugWidget, StringWidget, TextWidget
from wcs.utils import grep_strings
from wcs.wscalls import NamedWsCall, WsCallRequestWidget
from wcs.wscalls import NamedWsCall, NamedWsCallImportError, WsCallRequestWidget
class NamedWsCallUI:
@ -317,15 +317,22 @@ class NamedWsCallsDirectory(Directory):
def import_submit(self, form):
fp = form.get_widget('file').parse().fp
error = False
error, reason = False, None
try:
wscall = NamedWsCall.import_from_xml(fp)
get_session().message = ('info', _('This webservice call has been successfully imported.'))
except NamedWsCallImportError as e:
error = True
reason = str(e)
except ValueError:
error = True
if error:
form.set_error('file', _('Invalid File'))
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
form.set_error('file', msg)
raise ValueError()
try:

View File

@ -26,7 +26,7 @@ from quixote import get_publisher, get_response
from wcs.api_utils import is_url_signed
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.categories import (
BlockCategory,
@ -38,12 +38,12 @@ from wcs.categories import (
WorkflowCategory,
)
from wcs.comment_templates import CommentTemplate
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.formdef import FormDef, FormdefImportError
from wcs.mail_templates import MailTemplate
from wcs.sql import Equal, Role
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
from wcs.workflows import Workflow, WorkflowImportError
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from .qommon import _
from .qommon.afterjobs import AfterJob
@ -454,6 +454,15 @@ class BundleImportJob(AfterJob):
# remove obsolete application elements
self.unlink_obsolete_objects()
except (
BlockdefImportError,
FormdefImportError,
WorkflowImportError,
NamedDataSourceImportError,
NamedWsCallImportError,
) as e:
error = str(e)
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:

View File

@ -22,17 +22,24 @@ import re
from quixote import get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import get_formdefs_of_all_kinds
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
from wcs.mail_templates import MailTemplate
from wcs.qommon import _, ezt, template
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.template import Template
from wcs.wf.export_to_model import UploadValidationError
from wcs.wf.form import FormWorkflowStatusItem
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
class DeprecatedElementsDetected(Exception):
pass
class DeprecationsDirectory(Directory):
do_not_call_in_templates = True
_q_exports = ['', 'scan']
@ -66,7 +73,7 @@ class DeprecationsDirectory(Directory):
def scan(self):
job = get_response().add_after_job(
DeprecationsScanAfterJob(
DeprecationsScan(
label=_('Scanning for deprecations'),
user_id=get_request().user.id,
return_url='/backoffice/studio/deprecations/',
@ -127,7 +134,7 @@ class DeprecationsDirectory(Directory):
}
class DeprecationsScanAfterJob(AfterJob):
class DeprecationsScan(AfterJob):
def done_action_url(self):
return self.kwargs['return_url']
@ -155,181 +162,25 @@ class DeprecationsScanAfterJob(AfterJob):
)
self.store()
for formdef in formdefs:
if formdef.id:
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
elif formdef.get_workflow():
source = f'workflow:{formdef.get_workflow().id}'
else:
source = '-'
for field in formdef.fields or []:
location_label = _('%(name)s / Field "%(label)s"') % {
'name': formdef.name,
'label': field.ellipsized_label,
}
url = formdef.get_field_admin_url(field)
self.check_data_source(
getattr(field, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
prefill = getattr(field, 'prefill', None)
if prefill:
if prefill.get('type') == 'formula':
self.add_report_line(
location_label=location_label,
url=url,
category='python-prefill',
source=source,
)
else:
self.check_string(
prefill.get('value'),
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key == 'page':
for condition in field.get_conditions():
if condition and condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
source=source,
)
break
if field.key in ('title', 'subtitle', 'comment'):
self.check_string(
field.label,
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
self.add_report_line(
location_label=location_label,
url=url,
category='fields',
source=source,
)
self.increment_count()
for workflow in workflows:
source = f'workflow:{workflow.id}'
for action in workflow.get_all_items():
location_label = '%s / %s' % (workflow.name, action.description)
url = action.get_admin_url()
for string in action.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if getattr(action, 'condition', None):
if action.condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
css_class='important' if (action.key == 'jump' and action.timeout) else '',
source=source,
)
if action.key == 'export_to_model':
try:
kind = action.model_file_validation(action.model_file, allow_rtf=True)
except UploadValidationError:
pass
else:
if kind == 'rtf':
self.add_report_line(
location_label=location_label, url=url, category='rtf', source=source
)
if action.key in ('aggregationemail', 'resubmit'):
self.add_report_line(
location_label=location_label, url=url, category='actions', source=source
)
if action.key in ('register-comment', 'sendmail'):
for attachment in getattr(action, 'attachments', None) or []:
if attachment and not ('{%' in attachment or '{{' in attachment):
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
if action.key == 'webservice_call':
self.check_remote_call_url(
action.url, location_label=location_label, url=url, source=source
)
for global_action in workflow.global_actions or []:
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
for trigger in global_action.triggers or []:
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
if trigger.key == 'timeout' and trigger.anchor == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
self.increment_count()
for named_data_source in named_data_sources:
source = f'datasource:{named_data_source.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Data source'),
'name': named_data_source.name,
}
url = named_data_source.get_admin_url()
self.check_data_source(
getattr(named_data_source, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
self.increment_count()
for named_ws_call in named_ws_calls:
source = f'wscall:{named_ws_call.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Webservice'),
'name': named_ws_call.name,
}
url = named_ws_call.get_admin_url()
for string in named_ws_call.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if named_ws_call.request and named_ws_call.request.get('url'):
self.check_remote_call_url(
named_ws_call.request['url'], location_label=location_label, url=url, source=source
)
self.increment_count()
for mail_template in mail_templates:
source = f'mail_template:{mail_template.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Mail Template'),
'name': mail_template.name,
}
url = mail_template.get_admin_url()
for string in mail_template.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
for string in mail_template.attachments or []:
# legacy was to have straight python expressions (not prefixed by "=").
if not Template.is_template_string(string):
self.add_report_line(
location_label=location_label, url=url, category='python-expression', source=source
)
self.increment_count()
self.check_objects(formdefs + workflows + named_data_sources + named_ws_calls + mail_templates)
self.build_report_file()
self.increment_count()
def check_objects(self, objects):
for obj in objects:
if isinstance(obj, (FormDef, CardDef, BlockDef)):
self.check_formdef(obj)
elif isinstance(obj, Workflow):
self.check_workflow(obj)
elif isinstance(obj, NamedDataSource):
self.check_named_data_source(obj)
elif isinstance(obj, NamedWsCall):
self.check_named_ws_call(obj)
elif isinstance(obj, MailTemplate):
self.check_mail_template(obj)
self.increment_count()
def check_data_source(self, data_source, location_label, url, source):
if not data_source:
return
@ -386,6 +237,169 @@ class DeprecationsScanAfterJob(AfterJob):
location_label=location_label, url=url, category='json-data-store', source=source
)
def check_formdef(self, formdef):
if formdef.id:
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
elif hasattr(formdef, 'get_workflow') and formdef.get_workflow():
source = f'workflow:{formdef.get_workflow().id}'
else:
source = '-'
for field in formdef.fields or []:
location_label = _('%(name)s / Field "%(label)s"') % {
'name': formdef.name,
'label': field.ellipsized_label,
}
url = formdef.get_field_admin_url(field)
self.check_data_source(
getattr(field, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
prefill = getattr(field, 'prefill', None)
if prefill:
if prefill.get('type') == 'formula':
self.add_report_line(
location_label=location_label,
url=url,
category='python-prefill',
source=source,
)
else:
self.check_string(
prefill.get('value'),
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key == 'page':
for condition in field.get_conditions():
if condition and condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
source=source,
)
break
if field.key in ('title', 'subtitle', 'comment'):
self.check_string(
field.label,
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
self.add_report_line(
location_label=location_label,
url=url,
category='fields',
source=source,
)
def check_workflow(self, workflow):
source = f'workflow:{workflow.id}'
for action in workflow.get_all_items():
location_label = '%s / %s' % (workflow.name, action.description)
url = action.get_admin_url()
for string in action.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if getattr(action, 'condition', None):
if action.condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
css_class='important' if (action.key == 'jump' and action.timeout) else '',
source=source,
)
if action.key == 'export_to_model':
try:
kind = action.model_file_validation(action.model_file, allow_rtf=True)
except UploadValidationError:
pass
else:
if kind == 'rtf':
self.add_report_line(
location_label=location_label, url=url, category='rtf', source=source
)
if action.key in ('aggregationemail', 'resubmit'):
self.add_report_line(
location_label=location_label, url=url, category='actions', source=source
)
if action.key in ('register-comment', 'sendmail'):
for attachment in getattr(action, 'attachments', None) or []:
if attachment and not ('{%' in attachment or '{{' in attachment):
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
if action.key == 'webservice_call':
self.check_remote_call_url(action.url, location_label=location_label, url=url, source=source)
for global_action in workflow.global_actions or []:
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
for trigger in global_action.triggers or []:
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
if trigger.key == 'timeout' and trigger.anchor == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
def check_named_data_source(self, named_data_source):
source = f'datasource:{named_data_source.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Data source'),
'name': named_data_source.name,
}
url = named_data_source.get_admin_url()
self.check_data_source(
getattr(named_data_source, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
def check_named_ws_call(self, named_ws_call):
source = f'wscall:{named_ws_call.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Webservice'),
'name': named_ws_call.name,
}
url = named_ws_call.get_admin_url()
for string in named_ws_call.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if named_ws_call.request and named_ws_call.request.get('url'):
self.check_remote_call_url(
named_ws_call.request['url'], location_label=location_label, url=url, source=source
)
def check_mail_template(self, mail_template):
source = f'mail_template:{mail_template.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Mail Template'),
'name': mail_template.name,
}
url = mail_template.get_admin_url()
for string in mail_template.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
for string in mail_template.attachments or []:
# legacy was to have straight python expressions (not prefixed by "=").
if not Template.is_template_string(string):
self.add_report_line(
location_label=location_label, url=url, category='python-expression', source=source
)
def add_report_line(self, **kwargs):
if kwargs not in self.report_lines:
self.report_lines.append(kwargs)
@ -400,3 +414,28 @@ class DeprecationsScanAfterJob(AfterJob):
fd,
indent=2,
)
def check_deprecated_elements_in_object(self, obj):
if not get_publisher().has_site_option('forbid-new-python-expressions'):

si pas d'option, return, pas la peine de perdre du temps à parcourir l'objet

si pas d'option, return, pas la peine de perdre du temps à parcourir l'objet
# for perfs, don't check object if nothing is forbidden
return
self.report_lines = []
objects = [obj]
if isinstance(obj, Workflow):
for status in obj.possible_status:
for item in status.items:
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
objects.append(item.formdef)
if obj.variables_formdef:
objects.append(obj.variables_formdef)
if obj.backoffice_fields_formdef:
objects.append(obj.backoffice_fields_formdef)
self.check_objects(objects)
for report_line in self.report_lines:
if 'python' in report_line['category'] and get_publisher().has_site_option(
'forbid-new-python-expressions'
):
raise DeprecatedElementsDetected(_('Python expression detected'))

View File

@ -179,12 +179,17 @@ class BlockDef(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
blockdef = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
blockdef = cls.import_from_xml_tree(
tree,
include_id=include_id,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if blockdef.slug:
try:
@ -197,7 +202,11 @@ class BlockDef(StorableObject):
return blockdef
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, check_datasources=True, **kwargs):
def import_from_xml_tree(
cls, tree, include_id=False, check_datasources=True, check_deprecated=True, **kwargs
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
blockdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise BlockdefImportError(_('Missing name'))
@ -279,6 +288,14 @@ class BlockDef(StorableObject):
details[_('Unknown datasources')].update(unknown_datasources)
raise BlockdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(blockdef)
except DeprecatedElementsDetected as e:
raise BlockdefImportError(str(e))
return blockdef
def get_usage_fields(self):

View File

@ -51,6 +51,10 @@ from .qommon.xml_storage import XmlStorableObject
data_source_functions = {}
class NamedDataSourceImportError(Exception):
pass
class DataSourceError(Exception):
pass
@ -913,9 +917,22 @@ class NamedDataSource(XmlStorableObject):
return root
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
data_source = super().import_from_xml_tree(tree, include_id=include_id, **kwargs)
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
data_source = super().import_from_xml_tree(
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
)
DataSourceCategory.object_category_xml_import(data_source, tree, include_id=include_id)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(data_source)
except DeprecatedElementsDetected as e:
raise NamedDataSourceImportError(str(e))
return data_source
@classmethod

View File

@ -1465,7 +1465,9 @@ class FormDef(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, fix_on_error=False, check_datasources=True):
def import_from_xml(
cls, fd, include_id=False, fix_on_error=False, check_datasources=True, check_deprecated=True
):
try:
tree = ET.parse(fd)
except Exception:
@ -1475,6 +1477,7 @@ class FormDef(StorableObject):
include_id=include_id,
fix_on_error=fix_on_error,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if formdef.url_name:
@ -1497,8 +1500,15 @@ class FormDef(StorableObject):
@classmethod
def import_from_xml_tree(
cls, tree, include_id=False, fix_on_error=False, snapshot=False, check_datasources=True
cls,
tree,
include_id=False,
fix_on_error=False,
snapshot=False,
check_datasources=True,
check_deprecated=True,
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
from wcs.carddef import CardDef
formdef = cls()
@ -1714,6 +1724,14 @@ class FormDef(StorableObject):
details[_('Unknown datasources')].update(unknown_datasources)
raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(formdef)
except DeprecatedElementsDetected as e:
raise FormdefImportError(str(e))
return formdef
def finish_tests_xml_import(self):

View File

@ -183,9 +183,9 @@ class WcsPublisher(QommonPublisher):
formdef.register_cronjobs()
def update_deprecations_report(self, **kwargs):
from .backoffice.deprecations import DeprecationsScanAfterJob
from .backoffice.deprecations import DeprecationsScan
DeprecationsScanAfterJob().execute()
DeprecationsScan().execute()
def has_postgresql_config(self):
return bool(self.cfg.get('postgresql', {}))
@ -292,6 +292,8 @@ class WcsPublisher(QommonPublisher):
'workflows_xml',
'blockdefs_xml',
'roles_xml',
'datasources',
'wscalls',
):
continue
path = os.path.join(self.app_dir, f)
@ -346,6 +348,22 @@ class WcsPublisher(QommonPublisher):
if os.path.split(f)[0] in results:
results[os.path.split(f)[0]] += 1
# import datasources and wscalls
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall

pour passer sur les dépréciations, sinon on faisait juste une bête copie du fichier

pour passer sur les dépréciations, sinon on faisait juste une bête copie du fichier
for f in z.namelist():
if os.path.dirname(f) == 'datasources' and os.path.basename(f):
with z.open(f) as fd:
data_source = NamedDataSource.import_from_xml(fd, include_id=True)
data_source.store()
results['datasources'] += 1
if os.path.dirname(f) == 'wscalls' and os.path.basename(f):
with z.open(f) as fd:
wscall = NamedWsCall.import_from_xml(fd, include_id=True)
wscall.store()
results['wscalls'] += 1
# second pass, fields blocks
from wcs.blocks import BlockDef

View File

@ -31,7 +31,7 @@ class XmlStorableObject(StorableObject):
first_byte = fd.read(1)
fd.seek(0)
if first_byte == b'<':
return cls.import_from_xml(fd, include_id=True)
return cls.import_from_xml(fd, include_id=True, check_deprecated=False)

ne pas checker les dépréciations lorsqu'on charge un objet depuis sa définition xml stockée sur l'instance

ne pas checker les dépréciations lorsqu'on charge un objet depuis sa définition xml stockée sur l'instance
else:
obj = StorableObject.storage_load(fd)
obj._upgrade_must_store = True
@ -84,15 +84,15 @@ class XmlStorableObject(StorableObject):
sub.text = role.name
@classmethod
def import_from_xml(cls, fd, include_id=False):
def import_from_xml(cls, fd, include_id=False, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id)
return cls.import_from_xml_tree(tree, include_id=include_id, check_deprecated=check_deprecated)
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
obj = cls()
# if the tree we get is actually a ElementTree for real, we get its

View File

@ -1233,12 +1233,17 @@ class Workflow(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
workflow = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
workflow = cls.import_from_xml_tree(
tree,
include_id=include_id,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if workflow.slug and cls.get_by_slug(workflow.slug):
# slug already in use, reset so a new one will be generated on store()
@ -1247,7 +1252,11 @@ class Workflow(StorableObject):
return workflow
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
def import_from_xml_tree(
cls, tree, include_id=False, snapshot=False, check_datasources=True, check_deprecated=True
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise WorkflowImportError(_('Missing name'))
@ -1359,6 +1368,14 @@ class Workflow(StorableObject):
_('Unknown referenced objects'), details=unknown_referenced_objects_details
)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(workflow)
except DeprecatedElementsDetected as e:
raise WorkflowImportError(str(e))
return workflow
def get_list_of_roles(

View File

@ -44,6 +44,10 @@ from .qommon.template import Template
from .qommon.xml_storage import XmlStorableObject
class NamedWsCallImportError(Exception):
pass
class PayloadError(Exception):
pass
@ -280,6 +284,24 @@ class NamedWsCall(XmlStorableObject):
if self.request.get('post_data'):
yield from self.request.get('post_data').values()
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
wscall = super().import_from_xml_tree(
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(wscall)
except DeprecatedElementsDetected as e:
raise NamedWsCallImportError(str(e))
return wscall
def export_request_to_xml(self, element, attribute_name, **kwargs):
request = getattr(self, attribute_name)
for attr in ('url', 'request_signature_key', 'method', 'timeout', 'cache_duration'):