Compare commits

..

32 Commits

Author SHA1 Message Date
Emmanuel Cazenave 78144fe00b backoffice: display drafts stats (#72542)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-18 15:22:02 +01:00
Frédéric Péters df546eb981 translation update
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-18 14:23:07 +01:00
Frédéric Péters 0294c31667 misc: add theme variables to maintenance page context (#88262)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-18 11:51:52 +01:00
Frédéric Péters 0667660357 misc: add legacy declaration of DeprecationsScanAfterJob (#88266)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-18 11:51:41 +01:00
Frédéric Péters bd8d750953 misc: skip empty/none filetypes (#88269)
gitea/wcs/pipeline/head Build queued... Details
2024-03-18 11:51:33 +01:00
Valentin Deniaud 6838b2a135 workflow_tests: allow testing sms workflow action (#87541)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-18 11:04:20 +01:00
Valentin Deniaud 17ae2751a2 testdef: stop ignoring item selection error when no data source (#88176)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-18 10:59:12 +01:00
Valentin Deniaud fdc8154527 testdef: set formdata backoffice submission flag (#88191) 2024-03-18 10:58:05 +01:00
Valentin Deniaud 2c62dd8196 admin: allow running tests from result page (#88201)
gitea/wcs/pipeline/head Build queued... Details
2024-03-18 10:57:40 +01:00
Valentin Deniaud 783f3a8bb4 admin: get user list from ajax in workflow tests click action (#87543)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-18 10:26:13 +01:00
Valentin Deniaud a11facd293 qommon: move add_js_behaviours function to module scope (#87543) 2024-03-18 10:25:43 +01:00
Valentin Deniaud 46610bb775 workflow_tests: allow different users in button click action (#87543) 2024-03-18 10:25:43 +01:00
Valentin Deniaud ea21213f93 workflow_tests: use agent user only where necessary (#87543) 2024-03-18 10:25:43 +01:00
Valentin Deniaud b51d025422 workflow_tests: rework access to parent object from action (#87543) 2024-03-18 10:25:43 +01:00
Frédéric Péters e9a20e4de9 misc: add site option for contact email for nominatim queries (#6862)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-17 09:38:22 +01:00
Frédéric Péters 3c0e04afe7 api: ignore invalid base64 data when receiving file fields (#88248)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-17 07:47:30 +01:00
Frédéric Péters 2399c72d27 backoffice: check back value of lookup form (#88247)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 20:16:59 +01:00
Frédéric Péters 75030a2bd7 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 17:49:28 +01:00
Frédéric Péters 5bfc33eb62 misc: do not fail temporary_access_url if formdata got removed (#88232)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 17:05:53 +01:00
Lauréline Guérin 3477ee2f29 depreciation: rename DeprecationsScanAfterJob (#72093)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 16:34:28 +01:00
Lauréline Guérin b4c4181cde misc: fix test (#72093) 2024-03-15 16:34:28 +01:00
Lauréline Guérin eba79fdc77 depreciations: errors from deprecated elements on import (#72093) 2024-03-15 16:34:28 +01:00
Lauréline Guérin 86f28b8037 deprecations: forbid import of new python expressions (#72093) 2024-03-15 16:34:28 +01:00
Lauréline Guérin 78f2796266 depreciations: new method check_objects (#72093) 2024-03-15 16:34:28 +01:00
Lauréline Guérin aa917a59c4 depreciations: method per object type (#72093) 2024-03-15 16:34:28 +01:00
Corentin Sechet cf0ee0ca29 js: fix map and address synchronization (#88205)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 16:21:19 +01:00
Frédéric Péters 7b45d83bd2 misc: disable beforeunload when submitting the form (#88233)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 15:59:03 +01:00
Frédéric Péters 8efea827a1 misc: warn user if closing tab on an unsaved form (#6116)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 14:58:53 +01:00
Frédéric Péters 68712c8cd0 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 14:41:18 +01:00
Frédéric Péters e30798deb5 workflows: record an error when using a RTF model with disabled support (#88124)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 14:26:29 +01:00
Lauréline Guérin bb73f23502 export_import: malformed bundle (#88130)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 14:21:51 +01:00
Frédéric Péters 1a4fdc71cf carddata: skip empty blocks when updating related items (#88224)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-15 12:49:20 +01:00
47 changed files with 2059 additions and 643 deletions

View File

@ -53,6 +53,7 @@ def teardown_module(module):
def test_empty_site(pub):
pub.user_class.wipe()
resp = get_app(pub).get('/backoffice/users/')
resp = resp.click('New User')
resp = get_app(pub).get('/backoffice/settings/')

View File

@ -220,6 +220,25 @@ def test_block_export_import(pub):
assert 'Invalid File (Unknown referenced objects)' in resp
assert '<ul><li>Unknown datasources: foobar</li></ul>' in resp
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
block.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
block.store()
resp = app.get('/backoffice/forms/blocks/%s/' % block.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/forms/blocks/')
resp = resp.click(href='import')
resp.form['file'] = Upload('block', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_block_delete(pub):
create_superuser(pub)

View File

@ -1039,6 +1039,23 @@ def test_data_sources_import(pub):
resp = resp.form.submit()
assert 'Invalid File' in resp.text
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/settings/data-sources/')
resp = resp.click(href='import')
resp.form['file'] = Upload('ds', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_data_sources_edit_slug(pub):
create_superuser(pub)

View File

@ -7,11 +7,11 @@ import pytest
from quixote.http_request import Upload as QuixoteUpload
from wcs import fields
from wcs.backoffice.deprecations import DeprecationsScanAfterJob
from wcs.blocks import BlockDef
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.formdef import FormDef, FormdefImportError
from wcs.mail_templates import MailTemplate
from wcs.qommon.form import UploadedFile
from wcs.qommon.http_request import HTTPRequest
@ -23,8 +23,8 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from wcs.wscalls import NamedWsCall
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_superuser
@ -293,7 +293,7 @@ def test_deprecations_choice_label(pub):
accept = st0.add_action('choice', id='_choice')
accept.label = '[test] action'
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
@ -305,7 +305,7 @@ def test_deprecations_skip_invalid_ezt(pub):
display = st0.add_action('displaymsg')
display.message = 'message with invalid [if-any] ezt'
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
@ -316,19 +316,19 @@ def test_deprecations_ignore_ezt_looking_tag(pub):
sendmail = st0.add_action('sendmail')
sendmail.subject = '[REMINDER] your appointment'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert not job.report_lines
sendmail.subject = '[reminder]'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines
sendmail.subject = '[if-any plop]test[end]'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines
@ -397,7 +397,7 @@ def test_deprecations_document_models(pub):
export_to2.by = ['_submitter']
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
assert job.report_lines == [
{
@ -446,7 +446,7 @@ def test_deprecations_inspect_pages(pub):
display.message = 'message with [ezt] info'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
create_superuser(pub)
@ -485,7 +485,7 @@ def test_deprecations_inspect_pages(pub):
display.message = 'message with {{django}} info'
workflow.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
resp = app.get(formdef.get_admin_url() + 'inspect')
@ -506,7 +506,7 @@ def test_deprecations_inspect_pages_old_format(pub):
]
formdef.store()
job = DeprecationsScanAfterJob()
job = DeprecationsScan()
job.execute()
with open(os.path.join(pub.app_dir, 'deprecations.json')) as f:
@ -525,3 +525,124 @@ def test_deprecations_inspect_pages_old_format(pub):
resp = app.get('/backoffice/studio/deprecations/')
assert resp.pyquery('.section--python-condition li a')
def test_deprecations_on_import(pub):
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
blockdef = BlockDef()
blockdef.name = 'foobar'
blockdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
workflow = Workflow(name='test')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
data_source = NamedDataSource(name='ds_python')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
mail_template = MailTemplate() # no python expression in mail templates
mail_template.name = 'Hello2'
mail_template.subject = 'plop'
mail_template.body = 'plop [ezt] plop'
mail_template.store()
job = DeprecationsScan()
job.check_deprecated_elements_in_object(formdef)
formdef_xml = formdef.export_to_xml()
FormDef.import_from_xml_tree(formdef_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(blockdef)
blockdef_xml = blockdef.export_to_xml()
BlockDef.import_from_xml_tree(blockdef_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(workflow)
workflow_xml = workflow.export_to_xml()
Workflow.import_from_xml_tree(workflow_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(data_source)
data_source_xml = data_source.export_to_xml()
NamedDataSource.import_from_xml_tree(data_source_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(wscall)
wscall_xml = wscall.export_to_xml()
NamedWsCall.import_from_xml_tree(wscall_xml)
job = DeprecationsScan()
job.check_deprecated_elements_in_object(mail_template)
mail_template_xml = mail_template.export_to_xml()
MailTemplate.import_from_xml_tree(mail_template_xml)
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(formdef)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(FormdefImportError) as excinfo:
FormDef.import_from_xml_tree(formdef_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(blockdef)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(BlockdefImportError) as excinfo:
BlockDef.import_from_xml_tree(blockdef_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(workflow)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(WorkflowImportError) as excinfo:
Workflow.import_from_xml_tree(workflow_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(data_source)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(NamedDataSourceImportError) as excinfo:
NamedDataSource.import_from_xml_tree(data_source_xml)
assert str(excinfo.value) == 'Python expression detected'
job = DeprecationsScan()
with pytest.raises(DeprecatedElementsDetected) as excinfo:
job.check_deprecated_elements_in_object(wscall)
assert str(excinfo.value) == 'Python expression detected'
with pytest.raises(NamedWsCallImportError) as excinfo:
NamedWsCall.import_from_xml_tree(wscall_xml)
assert str(excinfo.value) == 'Python expression detected'
# no python expressions
job = DeprecationsScan()
job.check_deprecated_elements_in_object(mail_template)
MailTemplate.import_from_xml_tree(mail_template_xml)

View File

@ -4078,6 +4078,39 @@ def test_form_overwrite(pub):
assert resp.pyquery('.error').text() == 'Invalid File'
def test_form_export_import_export(pub):
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.table_name = 'xxx'
formdef.fields = []
formdef.store()
app = login(get_app(pub))
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
formdef.store()
resp = app.get('/backoffice/forms/%s/' % formdef.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/forms/')
resp = resp.click(href='import')
resp.form['file'] = Upload('formdef', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_form_export_import_export_overwrite(pub):
create_superuser(pub)
create_role(pub)
@ -4135,6 +4168,23 @@ def test_form_export_import_export_overwrite(pub):
field_ow = formdef_overwrited.fields[i]
assert (field.id, field.label, field.key) == (field_ow.id, field_ow.label, field_ow.key)
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef2.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
formdef2.store()
formdef2_xml = ET.tostring(formdef2.export_to_xml(include_id=True))
resp = app.get('/backoffice/forms/%s/' % formdef.id)
resp = resp.click(href='overwrite')
resp.forms[0]['file'] = Upload('formdef.wcs', formdef2_xml)
resp = resp.forms[0].submit()
assert 'Python expression detected' in resp
def test_form_overwrite_from_url(pub):
create_superuser(pub)

View File

@ -399,6 +399,93 @@ def test_settings_export_import(pub):
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Unknown referenced objects [Unknown datasources: foobar]' in resp
BlockDef.wipe()
# python expressions
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [
fields.PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp.form['confirm'].checked = True
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
FormDef.wipe()
blockdef = BlockDef()
blockdef.name = 'foobar'
blockdef.fields = [
fields.StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
BlockDef.wipe()
workflow = Workflow(name='test')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp.form['confirm'].checked = True
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
Workflow.wipe()
data_source = NamedDataSource(name='ds_python')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
NamedDataSource.wipe()
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit').follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit').follow()
assert 'Python expression detected' in resp
NamedWsCall.wipe()
# check a backup of settings has been created
assert [x for x in os.listdir(pub.app_dir) if x.startswith('config.pck.backup-')]

View File

@ -814,6 +814,10 @@ def test_tests_manual_run(pub):
assert 'You should enter digits only, for example: 123.' in resp.text
assert 'disabled' not in resp.text
resp = resp.click('Run tests again')
resp = app.get('/backoffice/forms/1/tests/results/')
assert len(resp.pyquery('tr')) == 4
TestDef.remove_object(testdef.id)
resp = app.get('/backoffice/forms/1/tests/results/%s/' % result.id)
assert 'disabled' in resp.text
@ -1046,6 +1050,7 @@ def test_tests_result_inspect(pub):
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.name = 'First test'
testdef.agent_id = user.id
testdef.is_in_backoffice = True
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(id='1', button_name='Loop on status'),
]
@ -1087,6 +1092,11 @@ def test_tests_result_inspect(pub):
assert 'Condition result' in resp.text
assert 'result-true' in resp.text
resp.form['django-condition'] = 'form_submission_backoffice'
resp = resp.form.submit()
assert 'Condition result' in resp.text
assert 'result-true' in resp.text
# check inspect is not accessible for old results
light_test_result = TestResult.select()[-1]
test_result = TestResult.get(light_test_result.id)

View File

@ -990,6 +990,28 @@ def test_workflows_export_import(pub):
assert 'Invalid File' in resp.text
assert Workflow.count() == 2
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/workflows/')
resp = resp.click('Import')
resp.form['file'] = Upload('wf.wcs', xml_export.encode('utf-8'))
resp = resp.form.submit('submit')
assert 'Python expression detected' in resp
def test_workflows_import_from_url(pub):
create_superuser(pub)

View File

@ -197,6 +197,8 @@ def test_workflow_tests_edit_actions(pub):
def test_workflow_tests_action_button_click(pub):
create_superuser(pub)
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
@ -217,6 +219,9 @@ def test_workflow_tests_action_button_click(pub):
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert escape('Click on "Button 4" by backoffice user') in resp.text
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
assert 'Workflow has no action that displays a button.' in resp.text
@ -240,6 +245,23 @@ def test_workflow_tests_action_button_click(pub):
('Button 4 (not available)', True, 'Button 4 (not available)'),
]
resp.form['button_name'] = 'Button 1'
resp.form['who'] = 'submitter'
resp = resp.form.submit().follow()
assert escape('Click on "Button 1" by submitter') in resp.text
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
resp.form['who'] = 'other'
resp.form['who_id'].force_value(user.id)
resp = resp.form.submit().follow()
assert escape('Click on "Button 1" by test user') in resp.text
user.remove_self()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert escape('Click on "Button 1" by missing user') in resp.text
def test_workflow_tests_action_assert_status(pub):
create_superuser(pub)
@ -321,6 +343,7 @@ def test_workflow_tests_action_assert_email(pub):
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' not in resp.text
assert 'Email to' not in resp.text
# empty configuration is allowed
resp = resp.click('Edit')
@ -331,10 +354,68 @@ def test_workflow_tests_action_assert_email(pub):
resp.form['body_strings$element0'] = 'def'
resp = resp.form.submit().follow()
assert 'Email to' not in resp.text
assert_email = TestDef.get(testdef.id).workflow_tests.actions[0]
assert assert_email.subject_strings == ['abc']
assert assert_email.body_strings == ['def']
resp = resp.click('Edit')
resp.form['addresses$element0'] = 'test@entrouvert.com'
resp = resp.form.submit().follow()
assert escape('Email to "test@entrouvert.com"') in resp.text
assert_email.addresses = ['a@entrouvert.com', 'b@entrouvert.com', 'c@entrouvert.com']
assert_email.parent.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert escape('Email to "a@entrouvert.com" (+2)') in resp.text
def test_workflow_tests_action_assert_sms(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertSMS(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' not in resp.text
assert 'SMS to' not in resp.text
# empty configuration is allowed
resp = resp.click('Edit')
resp = resp.form.submit().follow()
resp = resp.click('Edit')
resp.form['phone_numbers$element0'] = '0123456789'
resp.form['body'] = 'Hello'
resp = resp.form.submit().follow()
assert 'SMS to 0123456789' in resp.text
assert_sms = TestDef.get(testdef.id).workflow_tests.actions[0]
assert assert_sms.phone_numbers == ['0123456789']
assert assert_sms.body == 'Hello'
assert_sms.phone_numbers = ['0123456789', '0123456781', '0123456782']
assert_sms.parent.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert escape('SMS to 0123456789 (+2)') in resp.text
def test_workflow_tests_action_assert_backoffice_field(pub):
create_superuser(pub)

View File

@ -1,4 +1,6 @@
import io
import os
import re
import xml.etree.ElementTree as ET
import pytest
@ -207,6 +209,23 @@ def test_wscalls_import(pub, wscall):
resp = resp.form.submit()
assert 'Invalid File' in resp.text
# python expression
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
resp = app.get('/backoffice/settings/wscalls/%s/' % wscall.id)
resp = resp.click(href=re.compile('^export$'))
xml_export = resp.text
resp = app.get('/backoffice/settings/wscalls/')
resp = resp.click(href='import')
resp.form['file'] = Upload('wscall', xml_export.encode('utf-8'))
resp = resp.form.submit()
assert 'Python expression detected' in resp
def test_wscalls_empty_param_values(pub):
create_superuser(pub)

View File

@ -177,6 +177,16 @@ def test_reverse_geocoding(pub):
== 'https://nominatim.entrouvert.org/reverse?zoom=16&key=KEY&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
pub.site_options.set('options', 'nominatim_contact_email', 'test@example.net')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/reverse?zoom=16&key=KEY&email=test%40example.net&'
'format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
pub.site_options.set(
'options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value'
)
@ -209,7 +219,7 @@ def test_geocoding(pub):
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert rsps.calls[-1].request.url == (
'https://nominatim.entrouvert.org/search?viewbox=2.34,1.23,3.45,2.34&bounded=1&'
'https://nominatim.entrouvert.org/search?viewbox=2.34%2C1.23%2C3.45%2C2.34&bounded=1&'
'format=json&q=test&accept-language=en'
)
@ -230,7 +240,7 @@ def test_geocoding(pub):
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert rsps.calls[-1].request.url == (
'https://nominatim.entrouvert.org/search?key=KEY&viewbox=2.34,1.23,3.45,2.34&bounded=1&'
'https://nominatim.entrouvert.org/search?key=KEY&viewbox=2.34%2C1.23%2C3.45%2C2.34&bounded=1&'
'format=json&q=test&accept-language=en'
)

View File

@ -7,7 +7,7 @@ import xml.etree.ElementTree as ET
import pytest
from wcs.api_export_import import klass_to_slug
from wcs.api_export_import import BundleDeclareJob, BundleImportJob, klass_to_slug
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
@ -940,6 +940,50 @@ def test_export_import_bundle_import(pub):
assert formdef.disabled is True
assert formdef.workflow_roles == {'_receiver': extra_role.id}
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), b'garbage')
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file.'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing manifest.'
# missing component
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
manifest_json = {
'application': 'Test',
'slug': 'test',
'elements': [{'type': 'forms', 'slug': 'foo', 'name': 'foo'}],
}
manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
tarinfo = tarfile.TarInfo('manifest.json')
tarinfo.size = len(manifest_fd.getvalue())
tar.addfile(tarinfo, fileobj=manifest_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing component forms/foo.'
@pytest.mark.parametrize(
'category_class',
@ -1321,6 +1365,30 @@ def test_export_import_bundle_declare(pub):
== []
)
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-declare/'), b'garbage')
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleDeclareJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file.'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-declare/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleDeclareJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing manifest.'
def test_export_import_bundle_unlink(pub):
application = Application()
@ -1896,6 +1964,38 @@ def test_export_import_bundle_check(pub):
},
}
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), b'garbage')
assert resp.json['err_desc'] == 'Invalid tar file'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing manifest'
# missing component
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
manifest_json = {
'application': 'Test',
'slug': 'test',
'version_number': '42',
'elements': [{'type': 'forms', 'slug': 'foo', 'name': 'foo'}],
}
manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
tarinfo = tarfile.TarInfo('manifest.json')
tarinfo.size = len(manifest_fd.getvalue())
tar.addfile(tarinfo, fileobj=manifest_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing component forms/foo'
def test_export_import_workflow_options(pub):
FormDef.wipe()
@ -1946,6 +2046,97 @@ def test_export_import_workflow_options(pub):
assert formdef.workflow_options == {'foo': 'bar2'}
def test_export_import_with_deprecated(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'forbid-new-python-expressions', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
PageField(id='1', label='page1', condition={'type': 'python', 'value': 'True'}),
]
formdef.store()
bundle = create_bundle(
[
{'type': 'forms', 'slug': 'foo', 'name': 'foo'},
],
('forms/foo', formdef),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
blockdef = BlockDef()
blockdef.name = 'foo'
blockdef.fields = [
StringField(id='2', label='python_prefill', prefill={'type': 'formula', 'value': '1 + 2'}),
]
blockdef.store()
bundle = create_bundle(
[
{'type': 'blocks', 'slug': 'foo', 'name': 'foo'},
],
('blocks/foo', blockdef),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
workflow = Workflow(name='foo')
st0 = workflow.add_status('Status0', 'st0')
sendsms = st0.add_action('sendsms', id='_sendsms')
sendsms.to = 'xxx'
sendsms.condition = {'type': 'python', 'value': 'True'}
sendsms.parent = st0
st0.items.append(sendsms)
workflow.store()
bundle = create_bundle(
[
{'type': 'workflows', 'slug': 'foo', 'name': 'foo'},
],
('workflows/foo', workflow),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
data_source = NamedDataSource(name='foo')
data_source.data_source = {'type': 'formula', 'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
bundle = create_bundle(
[
{'type': 'data-sources', 'slug': 'foo', 'name': 'foo'},
],
('data-sources/foo', data_source),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
wscall = NamedWsCall()
wscall.name = 'foo'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': '=1+2'}}
wscall.store()
bundle = create_bundle(
[
{'type': 'wscalls', 'slug': 'foo', 'name': 'foo'},
],
('wscalls/foo', wscall),
)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
def test_api_export_import_invalid_slug(pub):
pub.role_class.wipe()
role1 = pub.role_class(name='Test role 1')

View File

@ -3889,6 +3889,13 @@ def test_formdata_lookup(pub):
resp = resp.follow()
assert 'No such tracking code or identifier.' in resp.text
# check it's not possible to replace back value with anything else
for invalid_value in ('http://example.invalid/', 'xxx'):
resp = app.get('/backoffice/management/listing')
resp.forms[0]['back'] = invalid_value
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/management/'
def test_backoffice_sidebar_user_context(pub):
user = create_user(pub)

View File

@ -1416,6 +1416,7 @@ def test_card_update_related(pub):
formdef.name = 'foo2'
formdef.fields = [
BlockField(id='1', label='Test', block_slug=blockdef.slug),
BlockField(id='2', label='Test2', block_slug=blockdef.slug), # left empty
]
formdef.store()

View File

@ -915,6 +915,17 @@ def test_file_convert_from_anything():
assert value.get_file_pointer().read() == b'hello'
def test_file_from_json_value(pub):
value = fields.FileField().from_json_value({'content': 'aGVsbG8=', 'filename': 'test.txt'})
assert value.base_filename == 'test.txt'
assert value.get_file_pointer().read() == b'hello'
value = fields.FileField().from_json_value(
{'content': 'aGVsbG8', 'filename': 'test.txt'} # invalid padding
)
assert value is None
def test_new_field_type_options(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):

View File

@ -1795,3 +1795,27 @@ def test_convert_image_format_errors(pub):
assert pub.get_cached_complex_data(img) is None
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == '|convert_image_format: conversion error (xxx)'
def test_temporary_access_url(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.StringField(id='1', label='Test', varname='foo')]
formdef.store()
# no formdata
context = pub.substitutions.get_context_variables(mode='lazy')
assert Template('{% temporary_access_url %}').render(context) == ''
# formdata
formdata = formdef.data_class()()
formdata.data = {'1': 'Foo Bar'}
formdata.store()
pub.substitutions.feed(formdata)
context = pub.substitutions.get_context_variables(mode='lazy')
assert Template('{% temporary_access_url %}').render(context).startswith('http://example.net/code/')
# removed formdata
formdata.remove_self()
assert Template('{% temporary_access_url %}').render(context) == ''

View File

@ -501,8 +501,16 @@ def test_validation_item_field(pub):
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)
# no check on invalid value
formdata.data['1'] = 'xxx'
testdef = TestDef.create_from_formdata(formdef, formdata)
with pytest.raises(TestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Invalid value "xxx" for field "Test": invalid value selected'
# no check on invalid value for field with data source
formdef.fields[0].data_source = {'type': 'jsonvalue', 'value': json.dumps({})}
formdef.store()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)
@ -533,8 +541,19 @@ def test_validation_item_field_inside_block(pub):
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)
# no check on invalid value
formdata.data['1'] = {'data': [{'1': 'xxx'}]}
testdef = TestDef.create_from_formdata(formdef, formdata)
with pytest.raises(TestError) as excinfo:
testdef.run(formdef)
assert (
str(excinfo.value) == 'Empty value for field "Test" (of field "Block Data"): invalid value selected'
)
# no check on invalid value for field with data source
block.fields[0].data_source = {'type': 'jsonvalue', 'value': json.dumps({})}
block.store()
formdef.refresh_from_storage()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)

View File

@ -204,6 +204,110 @@ def test_workflow_tests_button_click(pub):
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
def test_workflow_tests_button_click_who(pub):
role = pub.role_class(name='test role')
role.store()
agent_user = pub.user_class(name='agent user')
agent_user.roles = [role.id]
agent_user.store()
other_role = pub.role_class(name='other test role')
other_role.store()
other_user = pub.user_class(name='other user')
other_user.roles = [other_role.id]
other_user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
jump_by_unknown = workflow.add_status(name='Jump by unknown')
jump_by_receiver = workflow.add_status(name='Jump by receiver')
jump_by_submitter = workflow.add_status(name='Jump by submitter')
jump_by_other_user = workflow.add_status(name='Jump by other user')
jump = new_status.add_action('choice')
jump.label = 'Go to next status'
jump.status = jump_by_unknown.id
jump.by = ['unknown']
receiver_jump = new_status.add_action('choice')
receiver_jump.label = 'Go to next status'
receiver_jump.status = jump_by_receiver.id
receiver_jump.by = ['_receiver']
submitter_jump = new_status.add_action('choice')
submitter_jump.label = 'Go to next status'
submitter_jump.status = jump_by_submitter.id
submitter_jump.by = ['_submitter']
other_user_jump = new_status.add_action('choice')
other_user_jump.label = 'Go to next status'
other_user_jump.status = jump_by_other_user.id
other_user_jump.by = [other_role.id]
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.workflow_roles = {'_receiver': role.id}
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = agent_user.id
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to next status', who='receiver'),
workflow_tests.AssertStatus(status_name='Jump by receiver'),
]
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to next status', who='submitter'),
workflow_tests.AssertStatus(status_name='Jump by submitter'),
]
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to next status', who='other', who_id=other_user.id),
workflow_tests.AssertStatus(status_name='Jump by other user'),
]
testdef.run(formdef)
other_user.remove_self()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Broken, missing user'
# submitter is anonymous
submitter_jump.by = ['logged-users']
workflow.store()
formdef.refresh_from_storage()
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to next status', who='submitter'),
workflow_tests.AssertStatus(status_name='Jump by submitter'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Button "Go to next status" is not displayed.'
# not anonymous submitter
submitter_user = pub.user_class(name='submitter user')
submitter_user.email = 'test@example.com'
submitter_user.store()
formdata.user = submitter_user
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = agent_user.id
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to next status', who='submitter'),
workflow_tests.AssertStatus(status_name='Jump by submitter'),
]
testdef.run(formdef)
def test_workflow_tests_automatic_jump(pub):
user = pub.user_class(name='test user')
user.store()
@ -443,6 +547,62 @@ def test_workflow_tests_sendmail(mocked_send_email, pub):
assert str(excinfo.value) == 'Email subject does not contain "In new status".'
def test_workflow_tests_sms(pub):
pub.cfg['sms'] = {'sender': 'xxx', 'passerelle_url': 'http://passerelle.invalid/'}
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
sendsms = new_status.add_action('sendsms')
sendsms.to = ['0123456789']
sendsms.body = 'Hello'
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertSMS(phone_numbers=['0123456789'], body='Hello'),
]
testdef.run(formdef)
testdef.workflow_tests.actions.append(workflow_tests.AssertSMS())
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No SMS was sent.'
testdef.workflow_tests.actions = [
workflow_tests.AssertSMS(phone_numbers=['0612345678']),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'SMS was not sent to 0612345678.'
assert 'SMS phone numbers: 0123456789' in excinfo.value.details
testdef.workflow_tests.actions = [
workflow_tests.AssertSMS(body='Goodbye'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'SMS body mismatch.'
assert 'SMS body: "Hello"' in excinfo.value.details
def test_workflow_tests_backoffice_fields(pub):
user = pub.user_class(name='test user')
user.store()
@ -700,6 +860,10 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
set_backoffice_fields = transition_status.add_action('set-backoffice-fields')
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
sendsms = transition_status.add_action('sendsms')
sendsms.to = ['0123456789']
sendsms.body = 'Hello'
jump = transition_status.add_action('jump')
jump.status = end_status.id
@ -732,7 +896,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
testdef.run(formdef)
actions = testdef.workflow_tests.actions
assert len(actions) == 8
assert len(actions) == 9
assert actions[0].key == 'assert-status'
assert actions[0].status_name == 'Status with timeout jump'
@ -749,6 +913,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
assert actions[4].key == 'assert-webservice-call'
assert actions[5].key == 'assert-email'
assert actions[6].key == 'assert-backoffice-field'
assert actions[7].key == 'assert-sms'
assert actions[-1].key == 'assert-status'
assert actions[-1].status_name == 'End status'

View File

@ -36,7 +36,7 @@ from wcs.qommon import force_str
from wcs.qommon.form import UploadedFile
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.upload_storage import PicklableUpload
from wcs.wf.export_to_model import ExportToModel, UploadValidationError, transform_to_pdf
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from ..admin_pages.test_all import create_superuser
@ -320,6 +320,8 @@ def test_export_to_model_xml(pub):
item.attach_to_history = True
def run(template, filename='/foo/template.xml', content_type='application/xml'):
formdata.evolution[-1].parts = None
formdata.store()
pub.loggederror_class.wipe()
upload = QuixoteUpload(filename, content_type=content_type)
upload.fp = io.BytesIO()
@ -330,8 +332,9 @@ def test_export_to_model_xml(pub):
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
with open(formdata.evolution[0].parts[-1].get_file_path()) as fd:
return fd.read()
if formdata.evolution[0].parts:
with open(formdata.evolution[0].parts[-1].get_file_path()) as fd:
return fd.read()
# good XML
assert run(template='<a>{{ form_var_string }}</a>') == '<a>écho</a>'
@ -341,23 +344,53 @@ def test_export_to_model_xml(pub):
assert run(template='<a>{{ form_var_string }}</a>', filename='/foo/template.svg') == '<a>écho</a>'
# unknown file format
with pytest.raises(UploadValidationError) as e:
run(
template='<a>{{ form_var_string }}</a>',
filename='/foo/template.txt',
content_type='application/octet-stream',
)
assert str(e.value) == 'Only OpenDocument and XML files can be used.'
assert not run(
template='<a>{{ form_var_string }}</a>',
filename='/foo/template.txt',
content_type='application/octet-stream',
)
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'Only OpenDocument and XML files can be used.'
# invalid UTF-8
with pytest.raises(UploadValidationError) as e:
assert run(template=b'<name>test \xE0 {{form_var_string}}</name>') == ''
assert str(e.value) == 'XML model files must be UTF-8.'
assert not run(template=b'<name>test \xE0 {{form_var_string}}</name>')
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'XML model files must be UTF-8.'
# malformed XML
assert run(template='<a>{{ form_var_string }}<a>') == '<a>écho<a>'
# on error in the XML correctness no exception is raised but an error is logged
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'The rendered template is not a valid XML document.'
def test_export_to_model_disabled_rtf(pub):
formdef = FormDef()
formdef.name = 'foo-export-to-template-with-django'
formdef.fields = [
StringField(id='1', label='String', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = ExportToModel()
item.method = 'non-interactive'
item.attach_to_history = True
upload = QuixoteUpload('test.rtf', content_type='application/rtf')
upload.fp = io.BytesIO()
upload.fp.write(b'{\\rtf...')
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.convert_to_pdf = False
pub.substitutions.reset()
pub.substitutions.feed(formdata)
pub.loggederror_class.wipe()
item.perform(formdata)
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'Only OpenDocument and XML files can be used.'
@pytest.mark.parametrize('filename', ['template-form-details.odt', 'template-form-details-no-styles.odt'])

View File

@ -27,6 +27,7 @@ from wcs.categories import CardDefCategory, DataSourceCategory
from wcs.data_sources import (
DataSourceSelectionWidget,
NamedDataSource,
NamedDataSourceImportError,
RefreshAgendas,
get_structured_items,
has_chrono,
@ -667,15 +668,22 @@ class NamedDataSourcesDirectory(Directory):
def import_submit(self, form):
fp = form.get_widget('file').parse().fp
error = False
error, reason = False, None
try:
datasource = NamedDataSource.import_from_xml(fp)
get_session().message = ('info', _('This datasource has been successfully imported.'))
except NamedDataSourceImportError as e:
error = True
reason = str(e)
except ValueError:
error = True
if error:
form.set_error('file', _('Invalid File'))
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
form.set_error('file', msg)
raise ValueError()
try:

View File

@ -36,7 +36,7 @@ from quixote.html import TemplateIO, htmltext
from wcs.api_access import ApiAccess
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.fields.map import MapOptionsMixin
from wcs.formdef import FormDef, FormdefImportError, get_formdefs_of_all_kinds
from wcs.qommon import _, audit, errors, get_cfg, ident, misc, pgettext_lazy, template
@ -61,6 +61,7 @@ from wcs.qommon.form import (
TextWidget,
)
from wcs.workflows import Workflow, WorkflowImportError
from wcs.wscalls import NamedWsCallImportError
from .api_access import ApiAccessDirectory
from .data_sources import NamedDataSourcesDirectory
@ -1512,7 +1513,10 @@ class SiteImportAfterJob(AfterJob):
msg = _(e.msg) % e.msg_args
if e.details:
msg += ' [%s]' % e.details
error = _('Failed to import a workflow (%s); site import did not complete.') % msg
error = _('Failed to import objects (%s); site import did not complete.') % msg
except (NamedDataSourceImportError, NamedWsCallImportError) as e:
results = None
error = _('Failed to import objects (%s); site import did not complete.') % str(e)
self.results = results
if error:

View File

@ -26,7 +26,7 @@ from wcs.backoffice.snapshots import SnapshotsDirectory
from wcs.qommon import _, errors, misc, template
from wcs.qommon.form import CheckboxWidget, FileWidget, Form, HtmlWidget, SlugWidget, StringWidget, TextWidget
from wcs.utils import grep_strings
from wcs.wscalls import NamedWsCall, WsCallRequestWidget
from wcs.wscalls import NamedWsCall, NamedWsCallImportError, WsCallRequestWidget
class NamedWsCallUI:
@ -317,15 +317,22 @@ class NamedWsCallsDirectory(Directory):
def import_submit(self, form):
fp = form.get_widget('file').parse().fp
error = False
error, reason = False, None
try:
wscall = NamedWsCall.import_from_xml(fp)
get_session().message = ('info', _('This webservice call has been successfully imported.'))
except NamedWsCallImportError as e:
error = True
reason = str(e)
except ValueError:
error = True
if error:
form.set_error('file', _('Invalid File'))
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
form.set_error('file', msg)
raise ValueError()
try:

View File

@ -26,7 +26,7 @@ from quixote import get_publisher, get_response
from wcs.api_utils import is_url_signed
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.categories import (
BlockCategory,
@ -38,12 +38,12 @@ from wcs.categories import (
WorkflowCategory,
)
from wcs.comment_templates import CommentTemplate
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.formdef import FormDef, FormdefImportError
from wcs.mail_templates import MailTemplate
from wcs.sql import Equal, Role
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
from wcs.workflows import Workflow, WorkflowImportError
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from .qommon import _
from .qommon.afterjobs import AfterJob
@ -270,101 +270,118 @@ def object_dependencies(request, objects, slug):
@signature_required
def bundle_check(request):
tar_io = io.BytesIO(request.body)
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return JsonResponse({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
for element in manifest.get('elements'):
if element['type'] not in klasses or element['type'] == 'roles':
continue
element_klass = klasses[element['type']]
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
with tarfile.open(fileobj=tar_io) as tar:
try:
obj = element_klass.get_by_slug(slug)
if obj is None:
raise KeyError
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
# element not found, report this as unexisting
unknown_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
applications = Application.select([Equal('slug', application_slug)])
legacy = False
if not applications:
legacy = True
else:
application = applications[0]
elements = ApplicationElement.select(
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file, missing manifest')})
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return JsonResponse({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
for element in manifest.get('elements'):
if element['type'] not in klasses or element['type'] == 'roles':
continue
element_klass = klasses[element['type']]
try:
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
return JsonResponse(
{
'err': 1,
'err_desc': _(
'Invalid tar file, missing component %s/%s'
% (element['type'], element['slug'])
),
}
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
obj = element_klass.get_by_slug(slug)
if obj is None:
raise KeyError
except KeyError:
# element not found, report this as unexisting
unknown_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
applications = Application.select([Equal('slug', application_slug)])
legacy = False
if not applications:
legacy = True
else:
application = applications[0]
elements = ApplicationElement.select(
[
Equal('application_id', application.id),
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
]
)
if not elements:
legacy = True
if legacy:
# object exists, but not linked to the application
legacy_elements.append(
{
'type': element['type'],
'slug': element['slug'],
# information needed here, Relation objects may not exist yet in hobo
'text': obj.name,
'url': request.build_absolute_uri(
reverse(
'api-export-import-object-redirect',
kwargs={'objects': element['type'], 'slug': element['slug']},
)
),
}
)
continue
snapshots_for_app = get_publisher().snapshot_class.select(
[
Equal('application_id', application.id),
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
]
)
if not elements:
legacy = True
if legacy:
# object exists, but not linked to the application
legacy_elements.append(
{
'type': element['type'],
'slug': element['slug'],
# information needed here, Relation objects may not exist yet in hobo
'text': obj.name,
'url': request.build_absolute_uri(
reverse(
'api-export-import-object-redirect',
kwargs={'objects': element['type'], 'slug': element['slug']},
)
),
}
)
continue
snapshots_for_app = get_publisher().snapshot_class.select(
[
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
Equal('application_slug', application_slug),
Equal('application_version', application_version),
],
order_by='-timestamp',
)
if not snapshots_for_app:
# legacy, no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
snapshot_for_app = snapshots_for_app[0]
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
if snapshot_for_app.id != last_snapshot.id:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%shistory/compare?version1=%s&version2=%s'
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
}
Equal('application_slug', application_slug),
Equal('application_version', application_version),
],
order_by='-timestamp',
)
if not snapshots_for_app:
# legacy, no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
snapshot_for_app = snapshots_for_app[0]
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
if snapshot_for_app.id != last_snapshot.id:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%shistory/compare?version1=%s&version2=%s'
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
}
)
except tarfile.TarError:
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file')})
return JsonResponse(
{
@ -378,6 +395,10 @@ def bundle_check(request):
)
class BundleKeyError(Exception):
pass
class BundleImportJob(AfterJob):
def __init__(self, tar_content, **kwargs):
super().__init__(**kwargs)
@ -389,48 +410,77 @@ class BundleImportJob(AfterJob):
object_types = sorted(object_types, key=lambda a: 'categories' in a, reverse=True)
tar_io = io.BytesIO(self.tar_content)
with tarfile.open(fileobj=tar_io) as self.tar:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=False, install=False
)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=False, install=False
)
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += (
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
)
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += (
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
)
# init cache of application elements, from imported manifest
self.application_elements = set()
# init cache of application elements, from imported manifest
self.application_elements = set()
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# again, to remove [pre-install] in dependencies labels
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type], finalize=True)
# again, to remove [pre-install] in dependencies labels
for _type in object_types:
self.install(
[x for x in manifest.get('elements') if x.get('type') == _type], finalize=True
)
# remove obsolete application elements
self.unlink_obsolete_objects()
# remove obsolete application elements
self.unlink_obsolete_objects()
except (
BlockdefImportError,
FormdefImportError,
WorkflowImportError,
NamedDataSourceImportError,
NamedWsCallImportError,
) as e:
error = str(e)
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def pre_install(self, elements):
for element in elements:
element_klass = klasses[element['type']]
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
@ -472,7 +522,12 @@ class BundleImportJob(AfterJob):
imported_positions = {}
for element in elements:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
new_object = element_klass.import_from_xml_tree(
ET.fromstring(element_content), include_id=False, check_datasources=False
)
@ -587,24 +642,40 @@ class BundleDeclareJob(BundleImportJob):
object_types = [x for x in klasses if x != 'roles']
tar_io = io.BytesIO(self.tar_content)
with tarfile.open(fileobj=tar_io) as self.tar:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=True, install=True
)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
else:
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=True, install=True
)
# count number of actions
self.total_count = len([x for x in manifest.get('elements') if x.get('type') in object_types])
# count number of actions
self.total_count = len(
[x for x in manifest.get('elements') if x.get('type') in object_types]
)
# init cache of application elements, from manifest
self.application_elements = set()
# init cache of application elements, from manifest
self.application_elements = set()
# declare elements
for type in object_types:
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
# declare elements
for type in object_types:
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
# remove obsolete application elements
self.unlink_obsolete_objects()
# remove obsolete application elements
self.unlink_obsolete_objects()
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def declare(self, elements):
for element in elements:

View File

@ -22,17 +22,24 @@ import re
from quixote import get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import get_formdefs_of_all_kinds
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
from wcs.mail_templates import MailTemplate
from wcs.qommon import _, ezt, template
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.template import Template
from wcs.wf.export_to_model import UploadValidationError
from wcs.wf.form import FormWorkflowStatusItem
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
class DeprecatedElementsDetected(Exception):
pass
class DeprecationsDirectory(Directory):
do_not_call_in_templates = True
_q_exports = ['', 'scan']
@ -66,7 +73,7 @@ class DeprecationsDirectory(Directory):
def scan(self):
job = get_response().add_after_job(
DeprecationsScanAfterJob(
DeprecationsScan(
label=_('Scanning for deprecations'),
user_id=get_request().user.id,
return_url='/backoffice/studio/deprecations/',
@ -127,7 +134,7 @@ class DeprecationsDirectory(Directory):
}
class DeprecationsScanAfterJob(AfterJob):
class DeprecationsScan(AfterJob):
def done_action_url(self):
return self.kwargs['return_url']
@ -155,181 +162,25 @@ class DeprecationsScanAfterJob(AfterJob):
)
self.store()
for formdef in formdefs:
if formdef.id:
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
elif formdef.get_workflow():
source = f'workflow:{formdef.get_workflow().id}'
else:
source = '-'
for field in formdef.fields or []:
location_label = _('%(name)s / Field "%(label)s"') % {
'name': formdef.name,
'label': field.ellipsized_label,
}
url = formdef.get_field_admin_url(field)
self.check_data_source(
getattr(field, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
prefill = getattr(field, 'prefill', None)
if prefill:
if prefill.get('type') == 'formula':
self.add_report_line(
location_label=location_label,
url=url,
category='python-prefill',
source=source,
)
else:
self.check_string(
prefill.get('value'),
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key == 'page':
for condition in field.get_conditions():
if condition and condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
source=source,
)
break
if field.key in ('title', 'subtitle', 'comment'):
self.check_string(
field.label,
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
self.add_report_line(
location_label=location_label,
url=url,
category='fields',
source=source,
)
self.increment_count()
for workflow in workflows:
source = f'workflow:{workflow.id}'
for action in workflow.get_all_items():
location_label = '%s / %s' % (workflow.name, action.description)
url = action.get_admin_url()
for string in action.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if getattr(action, 'condition', None):
if action.condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
css_class='important' if (action.key == 'jump' and action.timeout) else '',
source=source,
)
if action.key == 'export_to_model':
try:
kind = action.model_file_validation(action.model_file, allow_rtf=True)
except UploadValidationError:
pass
else:
if kind == 'rtf':
self.add_report_line(
location_label=location_label, url=url, category='rtf', source=source
)
if action.key in ('aggregationemail', 'resubmit'):
self.add_report_line(
location_label=location_label, url=url, category='actions', source=source
)
if action.key in ('register-comment', 'sendmail'):
for attachment in getattr(action, 'attachments', None) or []:
if attachment and not ('{%' in attachment or '{{' in attachment):
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
if action.key == 'webservice_call':
self.check_remote_call_url(
action.url, location_label=location_label, url=url, source=source
)
for global_action in workflow.global_actions or []:
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
for trigger in global_action.triggers or []:
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
if trigger.key == 'timeout' and trigger.anchor == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
self.increment_count()
for named_data_source in named_data_sources:
source = f'datasource:{named_data_source.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Data source'),
'name': named_data_source.name,
}
url = named_data_source.get_admin_url()
self.check_data_source(
getattr(named_data_source, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
self.increment_count()
for named_ws_call in named_ws_calls:
source = f'wscall:{named_ws_call.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Webservice'),
'name': named_ws_call.name,
}
url = named_ws_call.get_admin_url()
for string in named_ws_call.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if named_ws_call.request and named_ws_call.request.get('url'):
self.check_remote_call_url(
named_ws_call.request['url'], location_label=location_label, url=url, source=source
)
self.increment_count()
for mail_template in mail_templates:
source = f'mail_template:{mail_template.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Mail Template'),
'name': mail_template.name,
}
url = mail_template.get_admin_url()
for string in mail_template.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
for string in mail_template.attachments or []:
# legacy was to have straight python expressions (not prefixed by "=").
if not Template.is_template_string(string):
self.add_report_line(
location_label=location_label, url=url, category='python-expression', source=source
)
self.increment_count()
self.check_objects(formdefs + workflows + named_data_sources + named_ws_calls + mail_templates)
self.build_report_file()
self.increment_count()
def check_objects(self, objects):
for obj in objects:
if isinstance(obj, (FormDef, CardDef, BlockDef)):
self.check_formdef(obj)
elif isinstance(obj, Workflow):
self.check_workflow(obj)
elif isinstance(obj, NamedDataSource):
self.check_named_data_source(obj)
elif isinstance(obj, NamedWsCall):
self.check_named_ws_call(obj)
elif isinstance(obj, MailTemplate):
self.check_mail_template(obj)
self.increment_count()
def check_data_source(self, data_source, location_label, url, source):
if not data_source:
return
@ -386,6 +237,169 @@ class DeprecationsScanAfterJob(AfterJob):
location_label=location_label, url=url, category='json-data-store', source=source
)
def check_formdef(self, formdef):
if formdef.id:
source = f'{formdef.xml_root_node}:{formdef.id}' if formdef.id else ''
elif hasattr(formdef, 'get_workflow') and formdef.get_workflow():
source = f'workflow:{formdef.get_workflow().id}'
else:
source = '-'
for field in formdef.fields or []:
location_label = _('%(name)s / Field "%(label)s"') % {
'name': formdef.name,
'label': field.ellipsized_label,
}
url = formdef.get_field_admin_url(field)
self.check_data_source(
getattr(field, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
prefill = getattr(field, 'prefill', None)
if prefill:
if prefill.get('type') == 'formula':
self.add_report_line(
location_label=location_label,
url=url,
category='python-prefill',
source=source,
)
else:
self.check_string(
prefill.get('value'),
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key == 'page':
for condition in field.get_conditions():
if condition and condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
source=source,
)
break
if field.key in ('title', 'subtitle', 'comment'):
self.check_string(
field.label,
location_label=location_label,
url=url,
python_check=False,
source=source,
)
if field.key in ('table', 'table-select', 'tablerows', 'ranked-items'):
self.add_report_line(
location_label=location_label,
url=url,
category='fields',
source=source,
)
def check_workflow(self, workflow):
source = f'workflow:{workflow.id}'
for action in workflow.get_all_items():
location_label = '%s / %s' % (workflow.name, action.description)
url = action.get_admin_url()
for string in action.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if getattr(action, 'condition', None):
if action.condition.get('type') == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-condition',
css_class='important' if (action.key == 'jump' and action.timeout) else '',
source=source,
)
if action.key == 'export_to_model':
try:
kind = action.model_file_validation(action.model_file, allow_rtf=True)
except UploadValidationError:
pass
else:
if kind == 'rtf':
self.add_report_line(
location_label=location_label, url=url, category='rtf', source=source
)
if action.key in ('aggregationemail', 'resubmit'):
self.add_report_line(
location_label=location_label, url=url, category='actions', source=source
)
if action.key in ('register-comment', 'sendmail'):
for attachment in getattr(action, 'attachments', None) or []:
if attachment and not ('{%' in attachment or '{{' in attachment):
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
if action.key == 'webservice_call':
self.check_remote_call_url(action.url, location_label=location_label, url=url, source=source)
for global_action in workflow.global_actions or []:
location_label = '%s / %s' % (workflow.name, _('trigger in %s') % global_action.name)
for trigger in global_action.triggers or []:
url = '%striggers/%s/' % (global_action.get_admin_url(), trigger.id)
if trigger.key == 'timeout' and trigger.anchor == 'python':
self.add_report_line(
location_label=location_label,
url=url,
category='python-expression',
source=source,
)
break
def check_named_data_source(self, named_data_source):
source = f'datasource:{named_data_source.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Data source'),
'name': named_data_source.name,
}
url = named_data_source.get_admin_url()
self.check_data_source(
getattr(named_data_source, 'data_source', None),
location_label=location_label,
url=url,
source=source,
)
def check_named_ws_call(self, named_ws_call):
source = f'wscall:{named_ws_call.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Webservice'),
'name': named_ws_call.name,
}
url = named_ws_call.get_admin_url()
for string in named_ws_call.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
if named_ws_call.request and named_ws_call.request.get('url'):
self.check_remote_call_url(
named_ws_call.request['url'], location_label=location_label, url=url, source=source
)
def check_mail_template(self, mail_template):
source = f'mail_template:{mail_template.id}'
location_label = _('%(title)s "%(name)s"') % {
'title': _('Mail Template'),
'name': mail_template.name,
}
url = mail_template.get_admin_url()
for string in mail_template.get_computed_strings():
self.check_string(string, location_label=location_label, url=url, source=source)
for string in mail_template.attachments or []:
# legacy was to have straight python expressions (not prefixed by "=").
if not Template.is_template_string(string):
self.add_report_line(
location_label=location_label, url=url, category='python-expression', source=source
)
def add_report_line(self, **kwargs):
if kwargs not in self.report_lines:
self.report_lines.append(kwargs)
@ -400,3 +414,32 @@ class DeprecationsScanAfterJob(AfterJob):
fd,
indent=2,
)
def check_deprecated_elements_in_object(self, obj):
if not get_publisher().has_site_option('forbid-new-python-expressions'):
# for perfs, don't check object if nothing is forbidden
return
self.report_lines = []
objects = [obj]
if isinstance(obj, Workflow):
for status in obj.possible_status:
for item in status.items:
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
objects.append(item.formdef)
if obj.variables_formdef:
objects.append(obj.variables_formdef)
if obj.backoffice_fields_formdef:
objects.append(obj.backoffice_fields_formdef)
self.check_objects(objects)
for report_line in self.report_lines:
if 'python' in report_line['category'] and get_publisher().has_site_option(
'forbid-new-python-expressions'
):
raise DeprecatedElementsDetected(_('Python expression detected'))
class DeprecationsScanAfterJob(DeprecationsScan):
pass # legacy name, to load old pickle files

View File

@ -298,7 +298,10 @@ class ManagementDirectory(Directory):
get_session().message = None
return redirect(formdata.get_url(backoffice=True))
return redirect(get_request().form.get('back') or '.')
back_place = get_request().form.get('back')
if back_place not in ('listing', 'forms'):
back_place = '.' # auto
return redirect(back_place)
def get_lookup_sidebox(self, back_place=''):
r = TemplateIO(html=True)

View File

@ -179,12 +179,17 @@ class BlockDef(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
blockdef = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
blockdef = cls.import_from_xml_tree(
tree,
include_id=include_id,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if blockdef.slug:
try:
@ -197,7 +202,11 @@ class BlockDef(StorableObject):
return blockdef
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, check_datasources=True, **kwargs):
def import_from_xml_tree(
cls, tree, include_id=False, check_datasources=True, check_deprecated=True, **kwargs
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
blockdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise BlockdefImportError(_('Missing name'))
@ -279,6 +288,14 @@ class BlockDef(StorableObject):
details[_('Unknown datasources')].update(unknown_datasources)
raise BlockdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(blockdef)
except DeprecatedElementsDetected as e:
raise BlockdefImportError(str(e))
return blockdef
def get_usage_fields(self):

View File

@ -203,13 +203,14 @@ class UpdateRelationsAfterJob(AfterJob):
objdata_changed = False
for field in fields:
if getattr(field, 'block_field', None):
blockdata_changed = False
for block_row_data in objdata.data[field.block_field.id]['data']:
blockdata_changed |= update_data(field, block_row_data)
if blockdata_changed:
# if block data changed, maybe block digest changed too
update_data(field.block_field, objdata.data)
objdata_changed |= blockdata_changed
if objdata.data.get(field.block_field.id):
blockdata_changed = False
for block_row_data in objdata.data[field.block_field.id]['data']:
blockdata_changed |= update_data(field, block_row_data)
if blockdata_changed:
# if block data changed, maybe block digest changed too
update_data(field.block_field, objdata.data)
objdata_changed |= blockdata_changed
else:
objdata_changed |= update_data(field, objdata.data)
if objdata_changed:

View File

@ -51,6 +51,10 @@ from .qommon.xml_storage import XmlStorableObject
data_source_functions = {}
class NamedDataSourceImportError(Exception):
pass
class DataSourceError(Exception):
pass
@ -913,9 +917,22 @@ class NamedDataSource(XmlStorableObject):
return root
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
data_source = super().import_from_xml_tree(tree, include_id=include_id, **kwargs)
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
data_source = super().import_from_xml_tree(
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
)
DataSourceCategory.object_category_xml_import(data_source, tree, include_id=include_id)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(data_source)
except DeprecatedElementsDetected as e:
raise NamedDataSourceImportError(str(e))
return data_source
@classmethod

View File

@ -237,7 +237,10 @@ class FileField(WidgetField):
def from_json_value(self, value):
if value and 'filename' in value and 'content' in value:
content = base64.b64decode(value['content'])
try:
content = base64.b64decode(value['content'])
except ValueError:
return None
content_type = value.get('content_type', 'application/octet-stream')
if content_type.startswith('text/'):
charset = 'utf-8'

View File

@ -107,6 +107,7 @@ class FormDefForm(Form):
def __init__(self):
super().__init__(enctype='multipart/form-data', use_tokens=False)
self.attrs['data-warn-on-unsaved-content'] = 'true'
def _render_error_notice_content(self, errors):
t = TemplateIO(html=True)
@ -1464,7 +1465,9 @@ class FormDef(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, fix_on_error=False, check_datasources=True):
def import_from_xml(
cls, fd, include_id=False, fix_on_error=False, check_datasources=True, check_deprecated=True
):
try:
tree = ET.parse(fd)
except Exception:
@ -1474,6 +1477,7 @@ class FormDef(StorableObject):
include_id=include_id,
fix_on_error=fix_on_error,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if formdef.url_name:
@ -1496,8 +1500,15 @@ class FormDef(StorableObject):
@classmethod
def import_from_xml_tree(
cls, tree, include_id=False, fix_on_error=False, snapshot=False, check_datasources=True
cls,
tree,
include_id=False,
fix_on_error=False,
snapshot=False,
check_datasources=True,
check_deprecated=True,
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
from wcs.carddef import CardDef
formdef = cls()
@ -1713,6 +1724,14 @@ class FormDef(StorableObject):
details[_('Unknown datasources')].update(unknown_datasources)
raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(formdef)
except DeprecatedElementsDetected as e:
raise FormdefImportError(str(e))
return formdef
def finish_tests_xml_import(self):

View File

@ -4,8 +4,8 @@ msgid ""
msgstr ""
"Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-03-15 07:48+0100\n"
"PO-Revision-Date: 2024-03-15 07:48+0100\n"
"POT-Creation-Date: 2024-03-18 14:22+0100\n"
"PO-Revision-Date: 2024-03-18 14:22+0100\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n"
"Language: fr\n"
@ -383,7 +383,8 @@ msgid "You can install a new fields block by uploading a file."
msgstr ""
"Vous pouvez installer un nouveau bloc de champs en téléchargeant un fichier."
#: admin/blocks.py admin/forms.py admin/workflows.py
#: admin/blocks.py admin/data_sources.py admin/forms.py admin/workflows.py
#: admin/wscalls.py
#, python-format
msgid "Invalid File (%s)"
msgstr "Fichier invalide (%s)"
@ -1885,6 +1886,7 @@ msgid "Subject"
msgstr "Sujet"
#: admin/mail_templates.py wf/notification.py wf/sendmail.py wf/sms.py
#: workflow_tests.py
msgid "Body"
msgstr "Corps"
@ -2547,11 +2549,11 @@ msgstr "Erreur : ce nest pas un fichier valide"
#: admin/settings.py
#, python-format
msgid "Failed to import a workflow (%s); site import did not complete."
msgid "Failed to import objects (%s); site import did not complete."
msgstr ""
"Erreur à limport dun workflow (%s). Limport du site na pas pu terminer."
"Erreur à limport déléments (%s). Limport du site na pas pu terminer."
#: admin/settings.py
#: admin/settings.py api_export_import.py
#, python-format
msgid "Error: %s"
msgstr "Erreur : %s"
@ -2869,7 +2871,7 @@ msgstr "Modifier laction"
msgid "Deleting action:"
msgstr "Suppression de laction :"
#: admin/workflow_tests.py
#: admin/workflow_tests.py workflow_tests.py
msgid "Backoffice user"
msgstr "Utilisateur agent"
@ -3688,6 +3690,27 @@ msgstr "Catégories (sources de données)"
msgid "Category (data Sources)"
msgstr "Catégorie (sources de données)"
#: api_export_import.py
msgid "Invalid tar file, missing manifest"
msgstr "Fichier tar invalide, manifeste manquant"
#: api_export_import.py
#, python-format
msgid "Invalid tar file, missing component %s/%s"
msgstr "Fichier tar invalide, composant %s/%s manquant"
#: api_export_import.py
msgid "Invalid tar file"
msgstr "Fichier tar invalide"
#: api_export_import.py
msgid "Invalid tar file, missing manifest."
msgstr "Fichier tar invalide, manifeste manquant."
#: api_export_import.py
msgid "Invalid tar file."
msgstr "Fichier tar invalide."
#: api_export_import.py
#, python-format
msgid "Application (%s) initial installation"
@ -4128,6 +4151,10 @@ msgstr "Source de données"
msgid "Webservice"
msgstr "Webservice"
#: backoffice/deprecations.py
msgid "Python expression detected"
msgstr "Expression Python détectée"
#: backoffice/i18n.py backoffice/root.py templates/wcs/backoffice/i18n.html
msgid "Multilinguism"
msgstr "Multilinguisme"
@ -6816,7 +6843,6 @@ msgid ""
"{% endif %}\n"
"{% endif %}\n"
" "
msgstr ""
"\n"
"Le formulaire a été enregistré le {{ form_receipt_datetime }}.\n"
@ -10167,6 +10193,10 @@ msgstr "Vous pouvez créer la réponse webservice correspondante ici."
msgid "Result"
msgstr "Résultat"
#: templates/wcs/backoffice/test-result.html
msgid "Run tests again"
msgstr "Relancer les tests"
#: templates/wcs/backoffice/test-result.html
msgid "Details"
msgstr "Détails"
@ -11972,7 +12002,7 @@ msgstr "Erreur dans le gabarit du sujet, le courriel ne peut pas être généré
msgid "Email too big to be sent"
msgstr "Courriel trop volumineux"
#: wf/sms.py
#: wf/sms.py workflow_tests.py
msgid "Submitter"
msgstr "Demandeur"
@ -12158,10 +12188,26 @@ msgstr "Clic sur un bouton daction"
msgid "Workflow has no action that displays a button."
msgstr "Le workflow ne contient pas daction qui affiche un bouton."
#: workflow_tests.py
msgid "backoffice user"
msgstr "utilisateur agent"
#: workflow_tests.py
msgid "submitter"
msgstr "demandeur"
#: workflow_tests.py
msgid "missing user"
msgstr "utilisateur manquant"
#: workflow_tests.py
#, python-format
msgid "Click on \"%s\""
msgstr "Clic sur « %s »"
msgid "Click on \"%(button_name)s\" by %(user)s"
msgstr "Clic sur « %(button_name)s » par %(user)s"
#: workflow_tests.py
msgid "Broken, missing user"
msgstr "Cassé, utilisateur manquant"
#: workflow_tests.py
#, python-format
@ -12176,6 +12222,14 @@ msgstr "pas disponible"
msgid "Button name"
msgstr "Texte du bouton"
#: workflow_tests.py
msgid "User who clicks on button"
msgstr "Usager qui clique sur le bouton"
#: workflow_tests.py
msgid "Other user"
msgstr "Autre utilisateur"
#: workflow_tests.py
msgid "Assert form status"
msgstr "Vérifier le statut de la demande"
@ -12328,6 +12382,46 @@ msgstr "Réponse webservice"
msgid "Call count"
msgstr "Nombre dappels"
#: workflow_tests.py
msgid "Assert SMS is sent"
msgstr "Vérifier lenvoi dun SMS"
#: workflow_tests.py
#, python-format
msgid "SMS to %s"
msgstr "SMS vers %s"
#: workflow_tests.py
msgid "No SMS was sent."
msgstr "Aucun SMS na été envoyé."
#: workflow_tests.py
#, python-format
msgid "SMS phone numbers: %s"
msgstr "Numéros de téléphones pour le SMS : %s"
#: workflow_tests.py
#, python-format
msgid "SMS was not sent to %s."
msgstr "Un SMS na pas été envoyé à %s."
#: workflow_tests.py
#, python-format
msgid "SMS body: \"%s\""
msgstr "Contenu du SMS : « %s »"
#: workflow_tests.py
msgid "SMS body mismatch."
msgstr "Différence dans le contenu du SMS."
#: workflow_tests.py
msgid "Phone numbers"
msgstr "Numéros de téléphone"
#: workflow_tests.py
msgid "Add phone number"
msgstr "Ajouter un numéro de téléphone"
#: workflow_traces.py
msgid "Created (by API)"
msgstr "Création (par lAPI)"

View File

@ -154,8 +154,10 @@ class MaintenanceMiddleware(MiddlewareMixin):
pub = get_publisher()
maintenance_mode = pub.get_site_option('maintenance_page', 'variables')
if maintenance_mode and not pass_through(request, pub):
pub.install_lang()
context = pub.get_site_options('variables')
maintenance_message = pub.get_site_option('maintenance_page_message', 'variables')
context = {'maintenance_message': maintenance_message or ''}
context['maintenance_message'] = maintenance_message or ''
return TemplateResponse(
request,
['hobo/maintenance/maintenance_page.html', 'wcs/maintenance_page.html'],

View File

@ -183,9 +183,9 @@ class WcsPublisher(QommonPublisher):
formdef.register_cronjobs()
def update_deprecations_report(self, **kwargs):
from .backoffice.deprecations import DeprecationsScanAfterJob
from .backoffice.deprecations import DeprecationsScan
DeprecationsScanAfterJob().execute()
DeprecationsScan().execute()
def has_postgresql_config(self):
return bool(self.cfg.get('postgresql', {}))
@ -292,6 +292,8 @@ class WcsPublisher(QommonPublisher):
'workflows_xml',
'blockdefs_xml',
'roles_xml',
'datasources',
'wscalls',
):
continue
path = os.path.join(self.app_dir, f)
@ -346,6 +348,22 @@ class WcsPublisher(QommonPublisher):
if os.path.split(f)[0] in results:
results[os.path.split(f)[0]] += 1
# import datasources and wscalls
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall
for f in z.namelist():
if os.path.dirname(f) == 'datasources' and os.path.basename(f):
with z.open(f) as fd:
data_source = NamedDataSource.import_from_xml(fd, include_id=True)
data_source.store()
results['datasources'] += 1
if os.path.dirname(f) == 'wscalls' and os.path.basename(f):
with z.open(f) as fd:
wscall = NamedWsCall.import_from_xml(fd, include_id=True)
wscall.store()
results['wscalls'] += 1
# second pass, fields blocks
from wcs.blocks import BlockDef

View File

@ -909,7 +909,7 @@ class FileWithPreviewWidget(CompositeWidget):
attrs['data-url'] += '?storage=%s' % self.storage
self.file_type = kwargs.pop('file_type', None)
if self.file_type:
attrs['accept'] = ','.join(self.file_type)
attrs['accept'] = ','.join([x for x in self.file_type if x])
if self.max_file_size_bytes:
# this could be used for client size validation of file size
attrs['data-max-file-size'] = str(self.max_file_size_bytes)

View File

@ -823,6 +823,14 @@ class QommonPublisher(Publisher):
)
return attrs
def get_nominatim_extra_params(self):
params = {}
for option_name, query_name in (('nominatim_key', 'key'), ('nominatim_contact_email', 'email')):
value = self.get_site_option(option_name)
if value:
params[query_name] = value
return params
def get_reverse_geocoding_service_url(self):
url = self.get_site_option('reverse_geocoding_service_url')
if url:
@ -834,11 +842,9 @@ class QommonPublisher(Publisher):
)
url += '/reverse'
reverse_zoom_level = self.get_site_option('nominatim_reverse_zoom_level') or 18
url += '?zoom=%s' % reverse_zoom_level
key = self.get_site_option('nominatim_key')
if key:
url += '&key=%s' % key
return url
params = {'zoom': reverse_zoom_level}
params.update(self.get_nominatim_extra_params())
return urllib.parse.urljoin(url, '?' + urllib.parse.urlencode(params))
def get_geocoding_service_url(self):
url = self.get_site_option('geocoding_service_url')
@ -850,15 +856,13 @@ class QommonPublisher(Publisher):
or 'https://nominatim.entrouvert.org'
)
url += '/search'
key = self.get_site_option('nominatim_key')
if key:
url += '?key=%s' % key
params = self.get_nominatim_extra_params()
if self.get_site_option('map-bounds-top-left'):
url += '&' if '?' in url else '?'
top, left = self.get_site_option('map-bounds-top-left').split(';')
bottom, right = self.get_site_option('map-bounds-bottom-right').split(';')
url += 'viewbox=%s,%s,%s,%s&bounded=1' % (left, top, right, bottom)
return url
params['viewbox'] = f'{left},{top},{right},{bottom}'
params['bounded'] = 1
return urllib.parse.urljoin(url, '?' + urllib.parse.urlencode(params))
def get_working_day_calendar(self):
return self.get_site_option('working_day_calendar') or settings.WORKING_DAY_CALENDAR

View File

@ -9,9 +9,13 @@ $(function() {
$('a[rel=popup], a[data-popup]').data('title-selector', 'h2');
$('a[rel=popup], a[data-popup]').data('close-button-text', WCS_I18N.close);
$(document).on('gadjo:dialog-loaded', function(e, dialog) {
window.disable_beforeunload = true;
if ($(dialog).find('[name$=add_element]').length) {
prepare_widget_list_elements();
}
if ($(dialog).data('enable-select2')) {
add_js_behaviours($(dialog));
}
if (jQuery.fn.colourPicker !== undefined) {
jQuery('select.colour-picker').colourPicker({title: ''});
}

View File

@ -123,6 +123,212 @@ Responsive_table_widget.prototype.init = function () {
});
};
function add_js_behaviours($base) {
$base.find('input[type=email]').on('change wcs:change', function() {
var $email_input = $(this);
var val = $email_input.val();
var val_domain = val.split('@')[1];
var $domain_hint_div = this.domain_hint_div;
var highest_ratio = 0;
var suggestion = null;
if (typeof val_domain === 'undefined' || known_domains.indexOf(val_domain) > -1) {
// domain not yet typed in, or known domain, don't suggest anything.
if ($domain_hint_div) {
$domain_hint_div.hide();
}
return;
}
for (var i=0; i < well_known_domains.length; i++) {
var domain = well_known_domains[i];
var ratio = val_domain.similarity(domain);
if (ratio > highest_ratio) {
highest_ratio = ratio;
suggestion = domain;
}
}
if (highest_ratio > 0.80 && highest_ratio < 1) {
if ($domain_hint_div === undefined) {
$domain_hint_div = $('<div class="field-live-hint"><p class="message"></p><button type="button" class="action"></button><button type="button" class="close"><span class="sr-only"></span></button></div>');
this.domain_hint_div = $domain_hint_div;
$(this).after($domain_hint_div);
$domain_hint_div.find('button.action').on('click', function() {
$email_input.val($email_input.val().replace(/@.*/, '@' + $(this).data('suggestion')));
$email_input.trigger('wcs:change');
$domain_hint_div.hide();
return false;
});
$domain_hint_div.find('button.close').on('click', function() {
$domain_hint_div.hide();
return false;
});
}
$domain_hint_div.find('p').text(WCS_I18N.email_domain_suggest + ' @' + suggestion + ' ?');
$domain_hint_div.find('button.action').text(WCS_I18N.email_domain_fix);
$domain_hint_div.find('button.action').data('suggestion', suggestion);
$domain_hint_div.find('button.close span.sr-only').text(WCS_I18N.close);
$domain_hint_div.show();
} else if ($domain_hint_div) {
$domain_hint_div.hide();
}
});
$base.find('.date-pick').each(function() {
if (this.type == "date" || this.type == "time") {
return; // prefer native date/time widgets
}
var $date_input = $(this);
$date_input.attr('type', 'text');
if ($date_input.data('formatted-value')) {
$date_input.val($date_input.data('formatted-value'));
}
var options = Object();
options.autoclose = true;
options.weekStart = 1;
options.format = $date_input.data('date-format');
options.minView = $date_input.data('min-view');
options.maxView = $date_input.data('max-view');
options.startView = $date_input.data('start-view');
if ($date_input.data('start-date')) options.startDate = $date_input.data('start-date');
if ($date_input.data('end-date')) options.endDate = $date_input.data('end-date');
$date_input.datetimepicker(options);
});
/* searchable select */
$base.find('select[data-autocomplete]').each(function(i, elem) {
var required = $(elem).data('required');
var options = {
language: {
errorLoading: function() { return WCS_I18N.s2_errorloading; },
noResults: function () { return WCS_I18N.s2_nomatches; },
inputTooShort: function (input, min) { return WCS_I18N.s2_tooshort; },
loadingMore: function () { return WCS_I18N.s2_loadmore; },
searching: function () { return WCS_I18N.s2_searching; }
}
};
options.placeholder = $(elem).find('[data-hint]').data('hint');
if (!required) {
if (!options.placeholder) options.placeholder = '...';
options.allowClear = true;
}
$(elem).select2(options);
});
/* searchable select using a data source */
$base.find('select[data-select2-url]').each(function(i, elem) {
var required = $(elem).data('required');
// create an additional hidden field to hold the label of the selected
// option, it is necessary as the server may not have any knowledge of
// possible options.
var $input_display_value = $('<input>', {
type: 'hidden',
name: $(elem).attr('name') + '_display',
value: $(elem).data('initial-display-value')
});
$input_display_value.insertAfter($(elem));
var options = {
minimumInputLength: 1,
formatResult: function(result) { return result.text; },
language: {
errorLoading: function() { return WCS_I18N.s2_errorloading; },
noResults: function () { return WCS_I18N.s2_nomatches; },
inputTooShort: function (input, min) { return WCS_I18N.s2_tooshort; },
loadingMore: function () { return WCS_I18N.s2_loadmore; },
searching: function () { return WCS_I18N.s2_searching; }
},
templateSelection: function(data, container) {
if (data.edit_related_url) {
$(data.element).attr('data-edit-related-url', data.edit_related_url);
}
if (data.view_related_url) {
$(data.element).attr('data-view-related-url', data.view_related_url);
}
return data.text;
}
};
if (!required) {
options.placeholder = '...';
options.allowClear = true;
}
var url = $(elem).data('select2-url');
if (url.indexOf('/api/') == 0) { // local proxying
var data_type = 'json';
} else {
var data_type = 'jsonp';
}
options.ajax = {
delay: 250,
dataType: data_type,
data: function(params) {
return {q: params.term, page_limit: 50};
},
processResults: function (data, params) {
return {results: data.data};
},
url: function() {
var url = $(elem).data('select2-url');
url = url.replace(/\[var_.+?\]/g, function(match, g1, g2) {
// compatibility: if there are [var_...] references in the URL
// replace them by looking for other select fields on the same
// page.
var related_select = $('#' + match.slice(1, -1));
var value_container_id = $(related_select).data('valuecontainerid');
return $('#' + value_container_id).val() || '';
});
return url;
}
};
var select2 = $(elem).select2(options);
$(elem).on('change', function() {
// update _display hidden field with selected text
var $selected = $(elem).find(':selected').first();
var text = $selected.text();
$input_display_value.val(text);
// update edit-related button and view-related link href
$(elem).siblings('.edit-related').attr('href', '').hide();
$(elem).siblings('.view-related').attr('href', '').hide();
if ($selected.attr('data-edit-related-url')) {
$(elem).siblings('.edit-related').attr('href', $selected.attr('data-edit-related-url') + '?_popup=1').show();
}
if ($selected.attr('data-view-related-url')) {
$(elem).siblings('.view-related').attr('href', $selected.attr('data-view-related-url')).show();
}
});
if ($input_display_value.val()) {
// if the _display hidden field was created with an initial value take it
// and create a matching <option> in the real <select> widget, and use it
// to set select2 initial state.
var option = $('<option></option>', {value: $(elem).data('value')});
option.appendTo($(elem));
option.text($input_display_value.val());
if ($(elem).data('initial-edit-related-url')) {
option.attr('data-edit-related-url', $(elem).data('initial-edit-related-url'));
}
if ($(elem).data('initial-view-related-url')) {
option.attr('data-view-related-url', $(elem).data('initial-view-related-url'));
}
select2.val($(elem).data('value')).trigger('change');
$(elem).select2('data', {id: $(elem).data('value'), text: $(elem).data('initial-display-value')});
}
});
/* Make table widgets responsive */
$base.find('.TableWidget, .SingleSelectTableWidget, .TableListRowsWidget').each(function (i, elem) {
const table = elem.querySelector('table');
new Responsive_table_widget(table);
});
/* Add class to reset error style on change */
$base.find('.widget-with-error').each(function(i, elem) {
$(elem).find('input, select, textarea').on('change', function() {
$(this).parents('.widget-with-error').addClass('widget-reset-error');
});
});
$base.find('div.widget-prefilled').on('change input paste', function(ev) {
$(this).removeClass('widget-prefilled');
});
}
$(function() {
$('.section.foldable').addClass('gadjo-foldable-ignore');
$('.section.foldable > h2 [role=button]').each(function() {
@ -149,6 +355,19 @@ $(function() {
var autosave_button_to_click_on_complete = null;
var last_auto_save = $('form[data-has-draft]').serialize();
if ($('form[data-warn-on-unsaved-content]').length) {
window.addEventListener('beforeunload', function (e) {
var $form = $('form[data-warn-on-unsaved-content]');
var current_data = $form.serialize();
if (last_auto_save == current_data) return;
if (window.disable_beforeunload) return;
// preventDefault() and returnValue will trigger the browser alert
// warning user about closing tag/window and losing data.
e.preventDefault();
e.returnValue = true;
});
}
if ($('form[data-has-draft]:not([data-autosave=false])').length == 1) {
var error_counter = 0;
@ -233,213 +452,7 @@ $(function() {
var known_domains = WCS_VALID_KNOWN_DOMAINS;
}
function add_js_behaviours($base) {
$base.find('input[type=email]').on('change wcs:change', function() {
var $email_input = $(this);
var val = $email_input.val();
var val_domain = val.split('@')[1];
var $domain_hint_div = this.domain_hint_div;
var highest_ratio = 0;
var suggestion = null;
if (typeof val_domain === 'undefined' || known_domains.indexOf(val_domain) > -1) {
// domain not yet typed in, or known domain, don't suggest anything.
if ($domain_hint_div) {
$domain_hint_div.hide();
}
return;
}
for (var i=0; i < well_known_domains.length; i++) {
var domain = well_known_domains[i];
var ratio = val_domain.similarity(domain);
if (ratio > highest_ratio) {
highest_ratio = ratio;
suggestion = domain;
}
}
if (highest_ratio > 0.80 && highest_ratio < 1) {
if ($domain_hint_div === undefined) {
$domain_hint_div = $('<div class="field-live-hint"><p class="message"></p><button type="button" class="action"></button><button type="button" class="close"><span class="sr-only"></span></button></div>');
this.domain_hint_div = $domain_hint_div;
$(this).after($domain_hint_div);
$domain_hint_div.find('button.action').on('click', function() {
$email_input.val($email_input.val().replace(/@.*/, '@' + $(this).data('suggestion')));
$email_input.trigger('wcs:change');
$domain_hint_div.hide();
return false;
});
$domain_hint_div.find('button.close').on('click', function() {
$domain_hint_div.hide();
return false;
});
}
$domain_hint_div.find('p').text(WCS_I18N.email_domain_suggest + ' @' + suggestion + ' ?');
$domain_hint_div.find('button.action').text(WCS_I18N.email_domain_fix);
$domain_hint_div.find('button.action').data('suggestion', suggestion);
$domain_hint_div.find('button.close span.sr-only').text(WCS_I18N.close);
$domain_hint_div.show();
} else if ($domain_hint_div) {
$domain_hint_div.hide();
}
});
$base.find('.date-pick').each(function() {
if (this.type == "date" || this.type == "time") {
return; // prefer native date/time widgets
}
var $date_input = $(this);
$date_input.attr('type', 'text');
if ($date_input.data('formatted-value')) {
$date_input.val($date_input.data('formatted-value'));
}
var options = Object();
options.autoclose = true;
options.weekStart = 1;
options.format = $date_input.data('date-format');
options.minView = $date_input.data('min-view');
options.maxView = $date_input.data('max-view');
options.startView = $date_input.data('start-view');
if ($date_input.data('start-date')) options.startDate = $date_input.data('start-date');
if ($date_input.data('end-date')) options.endDate = $date_input.data('end-date');
$date_input.datetimepicker(options);
});
/* searchable select */
$base.find('select[data-autocomplete]').each(function(i, elem) {
var required = $(elem).data('required');
var options = {
language: {
errorLoading: function() { return WCS_I18N.s2_errorloading; },
noResults: function () { return WCS_I18N.s2_nomatches; },
inputTooShort: function (input, min) { return WCS_I18N.s2_tooshort; },
loadingMore: function () { return WCS_I18N.s2_loadmore; },
searching: function () { return WCS_I18N.s2_searching; }
}
};
options.placeholder = $(elem).find('[data-hint]').data('hint');
if (!required) {
if (!options.placeholder) options.placeholder = '...';
options.allowClear = true;
}
$(elem).select2(options);
});
/* searchable select using a data source */
$base.find('select[data-select2-url]').each(function(i, elem) {
var required = $(elem).data('required');
// create an additional hidden field to hold the label of the selected
// option, it is necessary as the server may not have any knowledge of
// possible options.
var $input_display_value = $('<input>', {
type: 'hidden',
name: $(elem).attr('name') + '_display',
value: $(elem).data('initial-display-value')
});
$input_display_value.insertAfter($(elem));
var options = {
minimumInputLength: 1,
formatResult: function(result) { return result.text; },
language: {
errorLoading: function() { return WCS_I18N.s2_errorloading; },
noResults: function () { return WCS_I18N.s2_nomatches; },
inputTooShort: function (input, min) { return WCS_I18N.s2_tooshort; },
loadingMore: function () { return WCS_I18N.s2_loadmore; },
searching: function () { return WCS_I18N.s2_searching; }
},
templateSelection: function(data, container) {
if (data.edit_related_url) {
$(data.element).attr('data-edit-related-url', data.edit_related_url);
}
if (data.view_related_url) {
$(data.element).attr('data-view-related-url', data.view_related_url);
}
return data.text;
}
};
if (!required) {
options.placeholder = '...';
options.allowClear = true;
}
var url = $(elem).data('select2-url');
if (url.indexOf('/api/autocomplete/') == 0) { // local proxying
var data_type = 'json';
} else {
var data_type = 'jsonp';
}
options.ajax = {
delay: 250,
dataType: data_type,
data: function(params) {
return {q: params.term, page_limit: 50};
},
processResults: function (data, params) {
return {results: data.data};
},
url: function() {
var url = $(elem).data('select2-url');
url = url.replace(/\[var_.+?\]/g, function(match, g1, g2) {
// compatibility: if there are [var_...] references in the URL
// replace them by looking for other select fields on the same
// page.
var related_select = $('#' + match.slice(1, -1));
var value_container_id = $(related_select).data('valuecontainerid');
return $('#' + value_container_id).val() || '';
});
return url;
}
};
var select2 = $(elem).select2(options);
$(elem).on('change', function() {
// update _display hidden field with selected text
var $selected = $(elem).find(':selected').first();
var text = $selected.text();
$input_display_value.val(text);
// update edit-related button and view-related link href
$(elem).siblings('.edit-related').attr('href', '').hide();
$(elem).siblings('.view-related').attr('href', '').hide();
if ($selected.attr('data-edit-related-url')) {
$(elem).siblings('.edit-related').attr('href', $selected.attr('data-edit-related-url') + '?_popup=1').show();
}
if ($selected.attr('data-view-related-url')) {
$(elem).siblings('.view-related').attr('href', $selected.attr('data-view-related-url')).show();
}
});
if ($input_display_value.val()) {
// if the _display hidden field was created with an initial value take it
// and create a matching <option> in the real <select> widget, and use it
// to set select2 initial state.
var option = $('<option></option>', {value: $(elem).data('value')});
option.appendTo($(elem));
option.text($input_display_value.val());
if ($(elem).data('initial-edit-related-url')) {
option.attr('data-edit-related-url', $(elem).data('initial-edit-related-url'));
}
if ($(elem).data('initial-view-related-url')) {
option.attr('data-view-related-url', $(elem).data('initial-view-related-url'));
}
select2.val($(elem).data('value')).trigger('change');
$(elem).select2('data', {id: $(elem).data('value'), text: $(elem).data('initial-display-value')});
}
});
/* Make table widgets responsive */
$base.find('.TableWidget, .SingleSelectTableWidget, .TableListRowsWidget').each(function (i, elem) {
const table = elem.querySelector('table');
new Responsive_table_widget(table);
});
/* Add class to reset error style on change */
$base.find('.widget-with-error').each(function(i, elem) {
$(elem).find('input, select, textarea').on('change', function() {
$(this).parents('.widget-with-error').addClass('widget-reset-error');
});
});
$base.find('div.widget-prefilled').on('change input paste', function(ev) {
$(this).removeClass('widget-prefilled');
});
}
add_js_behaviours($('form[data-live-url], form[data-backoffice-preview]'));
add_js_behaviours($('form[data-live-url], form[data-backoffice-preview], form[data-enable-select2]'));
last_auto_save = $('form[data-has-draft]').serialize();
// Form with error
@ -471,6 +484,7 @@ $(function() {
});
$('form').on('submit', function(event) {
var $form = $(this);
window.disable_beforeunload = true;
/* prevent more autosave */
if (autosave_timeout_id !== null) {
window.clearTimeout(autosave_timeout_id);

View File

@ -17,11 +17,11 @@ function geoloc_prefill(element_type, element_values, widget_name=null)
}
if ($input_widget.length) {
$input_widget.val(element_value)
$input_widget.each((idx, elt) => elt.dispatchEvent(new Event('change')))
$input_widget.each((idx, elt) => elt.dispatchEvent(new Event('change', {'bubbles': true})))
found = true;
} else if ($text_widget.length) {
$text_widget.val(element_value)
$text_widget.each((idx, elt) => elt.dispatchEvent(new Event('change')))
$text_widget.each((idx, elt) => elt.dispatchEvent(new Event('change', {'bubbles': true})))
found = true;
} else if ($select_widget.length) {
if ($options.length == 0) break;
@ -31,7 +31,7 @@ function geoloc_prefill(element_type, element_values, widget_name=null)
if ($.slugify($option.val()) == slugified_value ||
$.slugify($option.text()) == slugified_value) {
$option.prop('selected', true);
$option.parent().each((idx, elt) => elt.dispatchEvent(new Event('change')))
$option.parent().each((idx, elt) => elt.dispatchEvent(new Event('change', {'bubbles': true})))
found = true;
break;
}

View File

@ -1313,7 +1313,11 @@ def temporary_access_url(
from wcs.formdef import FormDef
formdef = FormDef.get_by_urlname(formdef_urlname)
formdata = formdef.data_class().get(formdata_id)
try:
formdata = formdef.data_class().get(formdata_id)
except KeyError:
# formdata somehow got removed, ignore
return ''
duration = 0
for amount, unit in ((days, 86400), (hours, 3600), (minutes, 60), (seconds, 1)):

View File

@ -31,7 +31,7 @@ class XmlStorableObject(StorableObject):
first_byte = fd.read(1)
fd.seek(0)
if first_byte == b'<':
return cls.import_from_xml(fd, include_id=True)
return cls.import_from_xml(fd, include_id=True, check_deprecated=False)
else:
obj = StorableObject.storage_load(fd)
obj._upgrade_must_store = True
@ -84,15 +84,15 @@ class XmlStorableObject(StorableObject):
sub.text = role.name
@classmethod
def import_from_xml(cls, fd, include_id=False):
def import_from_xml(cls, fd, include_id=False, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id)
return cls.import_from_xml_tree(tree, include_id=include_id, check_deprecated=check_deprecated)
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, **kwargs):
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
obj = cls()
# if the tree we get is actually a ElementTree for real, we get its

View File

@ -3,6 +3,10 @@
{% block appbar-title %}{% trans "Result" %} #{{ test_result.id }}{% endblock %}
{% block appbar-actions %}
<a href="../run">{% trans "Run tests again" %}</a>
{% endblock %}
{% block body %}
<div class="section">
<h3>{% trans "Details" %}</h3>

View File

@ -222,6 +222,7 @@ class TestDef(sql.TestDef):
def build_formdata(self, objectdef, include_fields=False):
formdata = objectdef.data_class()()
formdata.just_created()
formdata.backoffice_submission = self.is_in_backoffice
formdata.workflow_traces = []
@ -292,8 +293,7 @@ class TestDef(sql.TestDef):
def _run(self, objectdef):
formdata = self.run_form_fill(objectdef)
if self.agent_id and self.workflow_tests.actions:
agent_user = get_publisher().user_class.get(self.agent_id)
self.workflow_tests.run(formdata, agent_user)
self.workflow_tests.run(formdata)
def run_form_fill(self, objectdef):
self.formdata = formdata = self.build_formdata(objectdef)
@ -369,6 +369,7 @@ class TestDef(sql.TestDef):
'carddef:'
):
x.data_source = None
x.had_data_source = True
value = self.data['fields'].get(field.id)
if value is not None:
@ -479,10 +480,14 @@ class TestDef(sql.TestDef):
testdef.missing_required_fields.append(label)
return False
if widget.error != get_selection_error_text():
return True
ignore_invalid_selection = bool(
widget.error == get_selection_error_text()
and (widget.field.data_source or hasattr(widget.field, 'had_data_source'))
)
if ignore_invalid_selection:
return False
return False
return True
@classmethod
def get_error_widget(cls, widget, testdef=None):

View File

@ -938,7 +938,11 @@ class ExportToModel(WorkflowStatusItem):
model_file = self.get_model_file()
if not model_file:
return
outstream = self.apply_template_to_formdata(formdata, model_file)
try:
outstream = self.apply_template_to_formdata(formdata, model_file)
except UploadValidationError as e:
get_publisher().record_error(str(e), formdata=formdata, exception=e)
return
filename = self.get_filename(model_file)
content_type = model_file.content_type
if self.convert_to_pdf:

View File

@ -151,13 +151,23 @@ class SendSMSWorkflowStatusItem(WorkflowStatusItem):
sms_cfg = get_cfg('sms', {})
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
try:
sms.SMS.get_sms_class().send(sender, recipients, sms_body)
self.send_sms(sender, recipients, sms_body)
except errors.SMSError as e:
get_publisher().record_error(_('Could not send SMS'), formdata=formdata, exception=e)
def send_sms(self, sender, recipients, sms_body):
sms.SMS.get_sms_class().send(sender, recipients, sms_body)
def i18n_scan(self, base_location):
location = '%sitems/%s/' % (base_location, self.id)
yield location, None, self.body
def get_workflow_test_action(self, formdata, *args, **kwargs):
def record_sms(sender, recipients, sms_body):
formdata.sent_sms.append({'phone_numbers': recipients, 'body': sms_body})
setattr(self, 'send_sms', record_sms)
return self
register_item_class(SendSMSWorkflowStatusItem)

View File

@ -17,9 +17,19 @@
import datetime
import uuid
from quixote import get_publisher, get_session
from wcs import wf
from wcs.qommon import _
from wcs.qommon.form import EmailWidget, IntWidget, SingleSelectWidget, StringWidget, WidgetList
from wcs.qommon.form import (
EmailWidget,
IntWidget,
JsonpSingleSelectWidget,
RadiobuttonsWidget,
SingleSelectWidget,
StringWidget,
WidgetList,
)
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from wcs.qommon.xml_storage import XmlStorableObject
from wcs.testdef import TestError, WebserviceResponse
@ -52,7 +62,7 @@ class WorkflowTests(XmlStorableObject):
_names = 'workflow_tests'
xml_root_node = 'workflow_tests'
testdef_id = None
actions = None
_actions = None
XML_NODES = [
('testdef_id', 'int'),
@ -61,15 +71,26 @@ class WorkflowTests(XmlStorableObject):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.actions = []
self._actions = []
def run(self, formdata, agent_user):
@property
def actions(self):
return self._actions
@actions.setter
def actions(self, actions):
self._actions = actions
for action in actions:
action.parent = self
def run(self, formdata):
self.mock_formdata_methods(formdata)
# mark formdata as running workflow tests
formdata.workflow_test = True
formdata.frozen_receipt_time = formdata.receipt_time
formdata.sent_sms = []
formdata.sent_emails = []
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
@ -81,11 +102,12 @@ class WorkflowTests(XmlStorableObject):
continue
if not action.is_assertion:
formdata.sent_sms.clear()
formdata.sent_emails.clear()
formdata.used_webservice_responses.clear()
try:
action.perform(formdata, agent_user)
action.perform(formdata)
except WorkflowTestError as e:
e.action_uuid = action.uuid
e.details.append(_('Form status when error occured: %s') % status.name)
@ -119,6 +141,7 @@ class WorkflowTests(XmlStorableObject):
def add_actions_from_formdata(self, formdata):
test_action_class_by_trace_id = {
'sendmail': AssertEmail,
'sendsms': AssertSMS,
'webservice_call': AssertWebserviceCall,
'set-backoffice-fields': AssertBackofficeFieldValues,
'button': ButtonClick,
@ -147,11 +170,6 @@ class WorkflowTests(XmlStorableObject):
action = self.add_action(AssertStatus)
action.set_attributes_from_trace(formdata.formdef, workflow_traces[-1])
def store(self, *args, **kwargs):
super().store(*args, **kwargs)
for action in self.actions:
action.parent = self
def export_actions_to_xml(self, element, attribute_name, **kwargs):
for action in self.actions:
element.append(action.export_to_xml())
@ -166,9 +184,7 @@ class WorkflowTests(XmlStorableObject):
except KeyError:
continue
action = klass.import_from_xml_tree(sub)
action.parent = self
actions.append(action)
actions.append(klass.import_from_xml_tree(sub))
return actions
@ -222,16 +238,32 @@ class ButtonClick(WorkflowTestAction):
key = 'button-click'
button_name = None
who = 'receiver'
who_id = None
optional_fields = ['who_id']
is_assertion = False
XML_NODES = WorkflowTestAction.XML_NODES + [
('button_name', 'str'),
('who', 'str'),
('who_id', 'int'),
]
@property
def details_label(self):
return _('Click on "%s"') % self.button_name
if self.who == 'receiver':
user = _('backoffice user')
elif self.who == 'submitter':
user = _('submitter')
else:
try:
user = get_publisher().user_class.get(self.who_id)
except KeyError:
user = _('missing user')
return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user}
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
try:
@ -243,7 +275,21 @@ class ButtonClick(WorkflowTestAction):
self.button_name = item.label
def perform(self, formdata, user):
def perform(self, formdata):
if self.who == 'receiver':
user = get_publisher().user_class.get(self.parent.testdef.agent_id)
elif self.who == 'submitter':
if formdata.user_id:
user = get_publisher().user_class.get(formdata.user_id)
else:
get_session().mark_anonymous_formdata(formdata)
user = None
else:
try:
user = get_publisher().user_class.get(self.who_id)
except KeyError:
raise WorkflowTestError(_('Broken, missing user'))
status = formdata.get_status()
form = status.get_action_form(formdata, user)
if not form or not any(
@ -282,6 +328,31 @@ class ButtonClick(WorkflowTestAction):
value=value,
)
form.add(
RadiobuttonsWidget,
'who',
title=_('User who clicks on button'),
options=[
('receiver', _('Backoffice user'), 'receiver'),
('submitter', _('Submitter'), 'submitter'),
('other', _('Other user'), 'other'),
],
value=self.who,
attrs={'data-dynamic-display-parent': 'true'},
)
form.attrs['data-enable-select2'] = 'on'
form.add(
JsonpSingleSelectWidget,
'who_id',
url='/api/users/',
value=self.who_id,
attrs={
'data-dynamic-display-child-of': 'who',
'data-dynamic-display-value-in': 'other',
},
)
class AssertStatus(WorkflowTestAction):
label = _('Assert form status')
@ -305,7 +376,7 @@ class AssertStatus(WorkflowTestAction):
self.status_name = status.name
def perform(self, formdata, user):
def perform(self, formdata):
status = formdata.get_status()
if status.name != self.status_name:
raise WorkflowTestError(
@ -365,7 +436,7 @@ class AssertEmail(WorkflowTestAction):
return label
def perform(self, formdata, user):
def perform(self, formdata):
try:
email = formdata.sent_emails.pop(0)
except IndexError:
@ -443,7 +514,7 @@ class SkipTime(WorkflowTestAction):
formdata.receipt_time = rewind_time(formdata.receipt_time)
formdata.evolution[-1].time = rewind_time(formdata.evolution[-1].time)
def perform(self, formdata, user):
def perform(self, formdata):
self.rewind(formdata)
jump_actions = []
@ -504,7 +575,7 @@ class AssertBackofficeFieldValues(WorkflowTestAction):
def details_label(self):
return ''
def perform(self, formdata, user):
def perform(self, formdata):
for field_dict in self.fields:
field_id = field_dict['field_id']
expected_value = field_dict['value']
@ -591,7 +662,7 @@ class AssertWebserviceCall(WorkflowTestAction):
)
return r
def perform(self, formdata, user):
def perform(self, formdata):
try:
response = WebserviceResponse.get(self.webservice_response_id)
except KeyError:
@ -627,3 +698,65 @@ class AssertWebserviceCall(WorkflowTestAction):
value=self.webservice_response_id,
)
form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count)
class AssertSMS(WorkflowTestAction):
label = _('Assert SMS is sent')
key = 'assert-sms'
phone_numbers = None
body = None
optional_fields = ['phone_numbers', 'body']
XML_NODES = WorkflowTestAction.XML_NODES + [
('phone_numbers', 'str_list'),
('body', 'str'),
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.phone_numbers = self.phone_numbers or []
@property
def details_label(self):
if not self.phone_numbers:
return ''
label = _('SMS to %s') % self.phone_numbers[0]
if len(self.phone_numbers) > 1:
label = '%s (+%s)' % (label, len(self.phone_numbers) - 1)
return label
def perform(self, formdata):
try:
sms = formdata.sent_sms.pop(0)
except IndexError:
raise WorkflowTestError(_('No SMS was sent.'))
for recipient in self.phone_numbers:
if recipient not in sms['phone_numbers']:
details = [_('SMS phone numbers: %s') % ', '.join(sms['phone_numbers'])]
raise WorkflowTestError(_('SMS was not sent to %s.') % recipient, details=details)
if self.body != sms['body']:
details = [_('SMS body: "%s"') % sms['body']]
raise WorkflowTestError(_('SMS body mismatch.'), details=details)
def fill_admin_form(self, form, formdef):
form.add(
WidgetList,
'phone_numbers',
title=_('Phone numbers'),
value=self.phone_numbers,
add_element_label=_('Add phone number'),
element_kwargs={'render_br': False, 'size': 50},
)
form.add(
StringWidget,
'body',
title=_('Body'),
value=self.body,
)

View File

@ -1233,12 +1233,17 @@ class Workflow(StorableObject):
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
workflow = cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
workflow = cls.import_from_xml_tree(
tree,
include_id=include_id,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if workflow.slug and cls.get_by_slug(workflow.slug):
# slug already in use, reset so a new one will be generated on store()
@ -1247,7 +1252,11 @@ class Workflow(StorableObject):
return workflow
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
def import_from_xml_tree(
cls, tree, include_id=False, snapshot=False, check_datasources=True, check_deprecated=True
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise WorkflowImportError(_('Missing name'))
@ -1359,6 +1368,14 @@ class Workflow(StorableObject):
_('Unknown referenced objects'), details=unknown_referenced_objects_details
)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(workflow)
except DeprecatedElementsDetected as e:
raise WorkflowImportError(str(e))
return workflow
def get_list_of_roles(

View File

@ -44,6 +44,10 @@ from .qommon.template import Template
from .qommon.xml_storage import XmlStorableObject
class NamedWsCallImportError(Exception):
pass
class PayloadError(Exception):
pass
@ -280,6 +284,24 @@ class NamedWsCall(XmlStorableObject):
if self.request.get('post_data'):
yield from self.request.get('post_data').values()
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, check_deprecated=True, **kwargs):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
wscall = super().import_from_xml_tree(
tree, include_id=include_id, check_deprecated=check_deprecated, **kwargs
)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(wscall)
except DeprecatedElementsDetected as e:
raise NamedWsCallImportError(str(e))
return wscall
def export_request_to_xml(self, element, attribute_name, **kwargs):
request = getattr(self, attribute_name)
for attr in ('url', 'request_signature_key', 'method', 'timeout', 'cache_duration'):