Compare commits

...

27 Commits

Author SHA1 Message Date
Emmanuel Cazenave 77927e64e7 misc: scan uploads with clamd (#87739)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-04-26 15:47:25 +02:00
Frédéric Péters cc0f8dda1c cron: log basic resource usage summary (#89037)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 15:23:58 +02:00
Frédéric Péters 5ef57d4671 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 14:42:42 +02:00
Frédéric Péters 7da8954476 general: expand blocks in form_details (#44804) 2024-04-26 14:42:02 +02:00
Frédéric Péters 703ff210ae misc: remove fixed fields from top errors summary (#82633)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 14:41:50 +02:00
Frédéric Péters 7106fbcecf misc: include subfield id in timing logs (#90079)
gitea/wcs/pipeline/head Build queued... Details
2024-04-26 13:34:17 +02:00
Serghei Mihai 449d4bfdaa misc: improve list as images rendering (#81010)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 11:41:21 +02:00
Emmanuel Cazenave 8a7b5977ba formdata : honor with_history parameter (#89918)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 11:33:19 +02:00
Corentin Sechet daab9515c0 api: add triggerable jump actions to carddata api (#88875)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 10:49:28 +02:00
Corentin Sechet ba7eb70024 api: add triggerable jump actions to carddef api (#88875) 2024-04-26 10:49:28 +02:00
Corentin Sechet 8779eea796 api: add triggerable global actions to card data api (#88875) 2024-04-26 10:49:28 +02:00
Corentin Sechet ab9adecf55 api: add triggerable global actions to carddef api (#88875) 2024-04-26 10:49:28 +02:00
Frédéric Péters bb78703d7e translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 09:01:55 +02:00
Frédéric Péters db0e30cfd8 misc: reset confirmation button event after listing is refreshed (#89875)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 08:30:23 +02:00
Frédéric Péters 19454beb04 misc: use straight attribute as AttributeError parameter (#89777) 2024-04-26 08:30:18 +02:00
Frédéric Péters efd10a0564 misc: extend |getlist on blocks, for compatibility names (#89777) 2024-04-26 08:30:18 +02:00
Frédéric Péters ea2744dbc4 misc: add support for live block conditions in workflow forms (#86798) 2024-04-26 08:30:03 +02:00
Frédéric Péters 2c68356878 backoffice: make submission sidebar items configurable (#84494)
(also #58888)
2024-04-26 08:29:57 +02:00
Frédéric Péters 01716e722f tests: add trigger jump + redirect test (#83418) 2024-04-26 08:29:52 +02:00
Frédéric Péters 617e170a92 workflows: remove unnecessary handling of exception in timeout_parse (#83418) 2024-04-26 08:29:52 +02:00
Frédéric Péters 5365983716 workflows: be explicit about jump modes (#83418) 2024-04-26 08:29:52 +02:00
Frédéric Péters db8e6bc0a5 workflows: add generic status options to global action manual trigger (#77926)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-26 08:29:36 +02:00
Frédéric Péters 0abc13aab0 blocks: add support for post conditions (#71778)
gitea/wcs/pipeline/head Build queued... Details
2024-04-26 08:29:31 +02:00
Frédéric Péters 26668ba3ea blocks: add/manage post conditions attribute (#71778) 2024-04-26 08:29:31 +02:00
Frédéric Péters b8b6dc1bad backoffice: link to individual fields in data source usage pane (#63947) 2024-04-26 08:29:26 +02:00
Valentin Deniaud 2927168c83 workflow_tests: ignore html in alert message (#90038)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-25 16:41:19 +02:00
Valentin Deniaud d8cb5e4737 workflow_tests: check all email recipients (#90035)
gitea/wcs/pipeline/head This commit looks good Details
2024-04-25 15:58:13 +02:00
76 changed files with 1909 additions and 294 deletions

View File

@ -344,6 +344,17 @@ ladresse.
contenu des champs de type « Fichier » nest pas exporté.
</p>
<p>
Un paramètre <code>include-actions</code> permet dinclure (<code>on</code>) ou
non (<code>off</code>) la liste des actions globales et des déclencheurs de
sauts automatiques actuellement accessible via l'API à l'utilisateur qui
effectue la requête.
</p>
<screen>
<input>GET https://www.example.net/api/forms/inscriptions/list?include-actions=on</input>
</screen>
</section>
<section>

View File

@ -625,7 +625,9 @@ def test_data_sources_view(pub):
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'Usage in forms' in resp.text
assert '/backoffice/forms/%s/' % formdef.id in resp.text
assert [x.attrib['href'] for x in resp.pyquery('.usage-in-forms a')] == [
'http://example.net/backoffice/forms/1/fields/1/'
]
# additional formdef types
user_formdef = UserFieldsFormDef(pub)
@ -664,12 +666,14 @@ def test_data_sources_view(pub):
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
assert 'Usage in forms' in resp.text
assert '/backoffice/forms/%s/' % formdef.id in resp.text
assert '/backoffice/workflows/%s/backoffice-fields/fields/' % workflow.id in resp.text
assert '/backoffice/workflows/%s/variables/fields/' % workflow.id in resp.text
assert '/backoffice/workflows/%s/status/1/items/_x/fields/' % workflow.id in resp.text
assert '/backoffice/settings/users/fields/' in resp.text
assert '/backoffice/cards/%s/' % carddef.id in resp.text
assert sorted([x.attrib['href'] for x in resp.pyquery('.usage-in-forms a')]) == [
f'http://example.net/backoffice/cards/{carddef.id}/fields/1/',
f'http://example.net/backoffice/forms/{formdef.id}/fields/1/',
'http://example.net/backoffice/settings/users/fields/fields/1/',
f'http://example.net/backoffice/workflows/{workflow.id}/backoffice-fields/fields/1/',
f'http://example.net/backoffice/workflows/{workflow.id}/status/1/items/_x/fields/1/',
f'http://example.net/backoffice/workflows/{workflow.id}/variables/fields/1/',
]
# cleanup
user_formdef = UserFieldsFormDef(pub)

View File

@ -164,6 +164,7 @@ def test_deprecations(pub):
timeout_jump = st0.add_action('jump')
timeout_jump.timeout = '213'
timeout_jump.mode = 'timeout'
timeout_jump.condition = {'type': 'python', 'value': 'True'}
for klass in (

View File

@ -273,6 +273,46 @@ def test_forms_edit_management(pub, formdef):
assert FormDef.get(1).management_sidebar_items == set()
def test_forms_edit_backoffice_submission(pub, formdef):
create_superuser(pub)
create_role(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/')
assert 'Backoffice submission' not in resp.text
formdef.backoffice_submission_roles = ['x']
formdef.store()
resp = app.get('/backoffice/forms/1/')
assert_option_display(resp, 'Backoffice submission', 'Default')
resp = resp.click('Backoffice submission', href='options/backoffice-submission')
assert resp.forms[0]['submission_sidebar_items$elementgeneral'].checked is True
assert resp.forms[0]['submission_sidebar_items$elementsubmission-context'].checked is True
assert resp.forms[0]['submission_sidebar_items$elementuser'].checked is True
assert resp.forms[0]['submission_sidebar_items$elementcustom-template'].checked is True
resp.forms[0]['submission_sidebar_items$elementuser'].checked = False
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/forms/1/'
resp = resp.follow()
assert_option_display(resp, 'Backoffice submission', 'Custom')
assert 'general' in FormDef.get(1).submission_sidebar_items
assert 'user' not in FormDef.get(1).submission_sidebar_items
resp = resp.click('Backoffice submission', href='options/backoffice-submission')
resp.forms[0]['submission_sidebar_items$elementuser'].checked = True
resp = resp.forms[0].submit().follow()
assert FormDef.get(1).submission_sidebar_items == {'__default__'}
# unselect all
resp = resp.click('Backoffice submission', href='options/backoffice-submission')
for field in resp.forms[0].fields:
if field.startswith('submission_sidebar_items$'):
resp.forms[0][field].checked = False
resp = resp.forms[0].submit().follow()
assert FormDef.get(1).submission_sidebar_items == set()
def test_forms_edit_tracking_code(pub, formdef):
create_superuser(pub)
create_role(pub)

View File

@ -102,6 +102,9 @@ def test_i18n_page(pub):
block = BlockDef(name='test')
# check strings will be stripped
block.fields = [StringField(id='1', label='text field ')]
block.post_conditions = [
{'condition': {'type': 'django', 'value': 'blah1'}, 'error_message': 'block post condition error'},
]
block.store()
carddef = CardDef()
@ -145,6 +148,9 @@ def test_i18n_page(pub):
# check custom validation message
assert TranslatableMessage.count([Equal('string', 'Custom Error')]) == 1
# check block post condition
assert TranslatableMessage.count([Equal('string', 'block post condition error')]) == 1
# check table
assert resp.pyquery('tr').length == TranslatableMessage.count()

View File

@ -1538,6 +1538,7 @@ def test_workflows_edit_jump_previous(pub):
jump = st1.add_action('jump', id='_jump')
jump.timeout = 86400
jump.mode = 'timeout'
ac1 = workflow.add_global_action('Action', 'ac1')
@ -1584,6 +1585,7 @@ def test_workflows_edit_jump_timeout(pub):
jump = st1.add_action('jump', id='_jump')
jump.status = '1'
jump.timeout = 86400
jump.mode = 'timeout'
workflow.store()
@ -1639,6 +1641,7 @@ def test_workflows_jump_target_links(pub):
jump = st1.add_action('jump', id='_jump')
jump.timeout = 86400
jump.mode = 'timeout'
jump.status = st2.id
workflow.store()
@ -3920,6 +3923,7 @@ def test_workflows_inspect_view(pub):
jump = baz_status.add_action('jump', id='_jump')
jump.timeout = 86400
jump.mode = 'timeout'
jump.status = foo_status.id
jump.condition = {'type': 'django', 'value': '1 == 1'}

View File

@ -10,6 +10,7 @@ from wcs.api_access import ApiAccess
from wcs.carddef import CardDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.upload_storage import PicklableUpload
from wcs.workflows import Workflow
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app
from .utils import sign_uri
@ -175,6 +176,8 @@ def test_carddata_include_params(pub, local_user, auth):
resp = get_url('/api/cards/test/list?include-workflow-data=on')
assert 'workflow' in resp.json['data'][0]
assert 'data' in resp.json['data'][0]['workflow']
resp = get_url('/api/cards/test/list?include-actions=on')
assert 'actions' in resp.json['data'][0]
resp = get_url('/api/cards/test/%s/?include-fields=off' % carddata.id)
assert 'fields' not in resp.json
@ -192,6 +195,8 @@ def test_carddata_include_params(pub, local_user, auth):
assert 'data' not in resp.json['workflow']
resp = get_url('/api/cards/test/%s/?include-workflow=off&include-workflow-data=off' % carddata.id)
assert 'workflow' not in resp.json
resp = get_url('/api/cards/test/%s/?include-actions=off' % carddata.id)
assert 'actions' not in resp.json
resp = get_url('/api/cards/test/list')
assert len(resp.json['data']) == 1
@ -499,3 +504,184 @@ def test_api_card_list_custom_id_filter_identifier(pub):
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['id'] == 'bar'
assert resp.json['data'][0]['internal_id'] == str(card.id)
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_carddata_global_actions(auth, pub, local_user):
CardDef.wipe()
Workflow.wipe()
ApiAccess.wipe()
pub.role_class.wipe()
role = pub.role_class(name='allowed-action-role')
role.store()
local_user.roles = [role.id]
local_user.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
app = get_app(pub)
if auth == 'http-basic':
access.roles = [role]
access.store()
def get_url(url, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.get(url, **kwargs)
else:
def get_url(url, **kwargs):
return app.get(
sign_uri(
url,
user=local_user,
orig=access.access_identifier,
key=access.access_key,
),
**kwargs,
)
workflow = Workflow(name='test-workflow')
workflow.add_status('test-status')
action = workflow.add_global_action('Global Action')
trigger = action.append_trigger('webservice')
trigger.identifier = 'test-trigger'
trigger.roles = [role.id]
workflow.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test-carddef'
carddef.workflow_id = workflow.id
carddef.workflow_roles = {'_receiver': role.id}
carddef.store()
carddata = carddef.data_class()()
carddata.just_created()
carddata.jump_status('workflow-status')
carddata.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {
'global-action:test-trigger': f'{carddata.get_api_url()}hooks/test-trigger/'
}
trigger.identifier = None
workflow.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}
trigger.identifier = 'test-trigger'
trigger.roles = ['_unhautorized']
workflow.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_carddata_jump_trigger_action(auth, pub, local_user):
CardDef.wipe()
Workflow.wipe()
ApiAccess.wipe()
pub.role_class.wipe()
role = pub.role_class(name='allowed-role')
role.store()
local_user.roles = [role.id]
local_user.store()
access = ApiAccess()
access.name = 'test'
access.access_identifier = 'test'
access.access_key = '12345'
access.store()
app = get_app(pub)
if auth == 'http-basic':
access.roles = [role]
access.store()
def get_url(url, **kwargs):
app.set_authorization(('Basic', ('test', '12345')))
return app.get(url, **kwargs)
else:
def get_url(url, **kwargs):
return app.get(
sign_uri(
url,
user=local_user,
orig=access.access_identifier,
key=access.access_key,
),
**kwargs,
)
workflow = Workflow(name='test-workflow')
source_status = workflow.add_status('source-status')
target_status = workflow.add_status('target-status')
jump = source_status.add_action('jump')
jump.status = target_status.id
jump.trigger = 'test-trigger'
jump.by = [role.id]
workflow.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test-carddef'
carddef.workflow_id = workflow.id
carddef.workflow_roles = {'_receiver': role.id}
carddef.store()
carddata = carddef.data_class()()
carddata.just_created()
carddata.jump_status('source-status')
carddata.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {
'jump:test-trigger': f'{carddata.get_api_url()}jump/trigger/test-trigger/'
}
jump.trigger = None
workflow.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}
jump.trigger = 'test-trigger'
jump.condition = {'type': 'django', 'value': 'false'}
workflow.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}
jump.condition = None
jump.by = ['_submitter']
workflow.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}
jump.by = [role.id]
workflow.store()
carddata.jump_status(target_status.id)
carddata.store()
resp = get_url('/api/cards/test-carddef/%s/?include-actions=on' % carddata.id)
assert resp.json['actions'] == {}

View File

@ -237,6 +237,72 @@ def test_carddef_schema(pub):
resp = get_app(pub).get('/api/cards/test/@schema', status=403)
def test_carddef_schema_global_actions(pub):
CardDef.wipe()
Workflow.wipe()
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
workflow = Workflow(name='test-workflow')
workflow.add_status('dummy-status')
action = workflow.add_global_action('Test Global Action')
trigger = action.append_trigger('webservice')
trigger.identifier = 'test-trigger'
workflow.store()
carddef = CardDef()
carddef.name = 'test-actions'
carddef.workflow = workflow
carddef.store()
resp = get_app(pub).get(sign_uri('/api/cards/test-actions/@schema'), status=200)
assert resp.json['workflow']['actions'] == {
'global-action:test-trigger': {'label': 'Test Global Action (test-trigger)'}
}
trigger.identifier = None
workflow.store()
resp = get_app(pub).get(sign_uri('/api/cards/test-actions/@schema'), status=200)
assert resp.json['workflow']['actions'] == {}
def test_carddef_schema_jump_triggers(pub):
CardDef.wipe()
Workflow.wipe()
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
workflow = Workflow(name='test-workflow')
source_status = workflow.add_status('source-status')
workflow.add_status('target-status')
jump = source_status.add_action('jump')
jump.trigger = 'test-trigger'
workflow.store()
carddef = CardDef()
carddef.name = 'test-actions'
carddef.workflow = workflow
carddef.store()
resp = get_app(pub).get(sign_uri('/api/cards/test-actions/@schema'), status=200)
assert resp.json['workflow']['actions'] == {
'jump:test-trigger': {'label': 'source-status (test-trigger)'}
}
jump.trigger = None
workflow.store()
resp = get_app(pub).get(sign_uri('/api/cards/test-actions/@schema'), status=200)
assert resp.json['workflow']['actions'] == {}
def test_carddef_schema_user_cards_datasource(pub):
CardDef.wipe()
carddef = CardDef()

View File

@ -449,6 +449,7 @@ def test_formdef_schema(pub, access):
jump = st1.add_action('jump')
jump.status = 'st2'
jump.timeout = 100
jump.mode = 'timeout'
st2 = workflow.add_status('Status2', 'st2')
jump = st2.add_action('jump')
jump.status = 'st3'

View File

@ -59,6 +59,7 @@ def formdef(pub):
jump = new_status.add_action('jump', id='_jump')
jump.status = '2'
jump.timeout = 86400
jump.mode = 'timeout'
jump = new_status.add_action('jump', id='_jump')
jump.status = '3'
jump = middle_status1.add_action('jump', id='_jump')

View File

@ -46,6 +46,7 @@ def test_workflow_trigger(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
workflow.add_status('Status2', 'st2')
workflow.store()
@ -122,6 +123,7 @@ def test_workflow_trigger_with_data(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'xx-yy'
jump.mode = 'trigger'
jump.status = 'st2'
workflow.add_status('Status2', 'st2')
workflow.store()
@ -224,6 +226,7 @@ def test_workflow_trigger_with_file_data(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
st2 = workflow.add_status('Status2', 'st2')
@ -268,6 +271,7 @@ def test_workflow_trigger_with_condition(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.condition = {'type': 'django', 'value': 'form_var_foo == "bar"'}
jump.status = 'st2'
workflow.add_status('Status2', 'st2')
@ -307,9 +311,11 @@ def test_workflow_trigger_jump_once(pub, local_user):
workflow.add_status('Status3', 'st3')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
jump = st2.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st3'
workflow.store()
@ -347,6 +353,7 @@ def test_workflow_trigger_api_access(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
workflow.add_status('Status2', 'st2')
workflow.store()
@ -400,6 +407,7 @@ def test_workflow_trigger_http_auth_access(pub, local_user):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
workflow.add_status('Status2', 'st2')
workflow.store()

View File

@ -271,6 +271,7 @@ def test_backoffice_forms(pub):
st1 = workflow.add_status('Status1')
jump = st1.add_action('jump', id='_jump')
jump.timeout = 86400
jump.mode = 'timeout'
jump.status = 'finished'
workflow.store()
@ -384,6 +385,7 @@ def test_backoffice_listing(pub):
st1.id = 'plop'
jump = st1.add_action('jump', id='_jump')
jump.timeout = 86400
jump.mode = 'timeout'
jump.status = 'finished'
workflow.store()
@ -856,6 +858,144 @@ def test_backoffice_multi_actions_some_status(pub):
assert len(resp.pyquery('[data-status_accepted]')) == 5
def test_backoffice_multi_actions_generic_status(pub):
create_superuser(pub)
Workflow.wipe()
FormDef.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '2'
action1 = workflow.add_global_action('FOOBAR')
register_comment = action1.add_action('register-comment')
register_comment.comment = 'hello'
trigger = action1.triggers[0]
trigger.statuses = ['_endpoint_status']
trigger.roles = ['_receiver']
assert set(trigger.get_statuses_ids()) == {'rejected', 'finished'}
assert trigger.render_as_line() == 'Manual, from final status, by Recipient'
action2 = workflow.add_global_action('FOOBAR2')
register_comment = action2.add_action('register-comment')
register_comment.comment = 'hello2'
trigger = action2.triggers[0]
trigger.statuses = ['_waitpoint_status']
trigger.roles = ['_receiver']
assert set(trigger.get_statuses_ids()) == {'new', 'accepted'}
assert trigger.render_as_line() == 'Manual, from pause status, by Recipient'
action3 = workflow.add_global_action('FOOBAR3')
register_comment = action3.add_action('register-comment')
register_comment.comment = 'hello3'
trigger = action3.triggers[0]
trigger.statuses = ['_transition_status']
trigger.roles = ['_receiver']
assert set(trigger.get_statuses_ids()) == {'just_submitted'}
assert trigger.render_as_line() == 'Manual, from transition status, by Recipient'
trigger.statuses = ['_transition_status', 'xxx'] # check with invalid status
assert trigger.render_as_line() == 'Manual, from transition status, by Recipient'
workflow.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.workflow_roles = {'_receiver': 1}
formdef.workflow_id = workflow.id
formdef.store()
for i in range(15):
formdata = formdef.data_class()()
formdata.just_created()
if i < 5:
formdata.jump_status('accepted')
elif i % 3 == 0:
formdata.jump_status('new')
else:
formdata.jump_status('finished')
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/?filter=all')
ids = []
for checkbox in resp.forms[0].fields['select[]']:
if checkbox._value == '_all':
continue
# check them all
ids.append(checkbox._value)
checkbox.checked = True
assert len(resp.pyquery('[data-status_new]')) == 3
assert len(resp.pyquery('[data-status_finished]')) == 7
assert len(resp.pyquery('[data-status_accepted]')) == 5
assert resp.pyquery(
f'form#multi-actions button[name="button-action-{action1.id}"]'
'[data-visible_status_finished]'
'[data-visible_status_rejected]'
':not([data-visible_status_new])'
':not([data-visible_status_accepted])'
':not([data-visible_status_just_submitted])'
)
assert resp.pyquery(
f'form#multi-actions button[name="button-action-{action2.id}"]'
':not([data-visible_status_finished])'
':not([data-visible_status_rejected])'
'[data-visible_status_new]'
'[data-visible_status_accepted]'
':not([data-visible_status_just_submitted])'
)
assert resp.pyquery(
f'form#multi-actions button[name="button-action-{action3.id}"]'
':not([data-visible_status_finished])'
':not([data-visible_status_rejected])'
':not([data-visible_status_new])'
':not([data-visible_status_accepted])'
'[data-visible_status_just_submitted]'
)
resp = resp.forms[0].submit(f'button-action-{action1.id}')
assert '?job=' in resp.location
resp = resp.follow()
assert 'Executing task &quot;FOOBAR&quot; on forms' in resp.text
assert '>completed<' in resp.text
for id in ids:
formdata = formdef.data_class().get(id)
comments = [x.content for x in formdata.iter_evolution_parts(JournalEvolutionPart)]
# check action was only executed on "finished"
if formdata.status == 'wf-finished':
assert comments == ['<p>hello</p>']
else:
assert comments == []
# check not end point status
resp = app.get('/backoffice/management/form-title/?filter=all')
ids = []
for checkbox in resp.forms[0].fields['select[]']:
if checkbox._value == '_all':
continue
# check them all
ids.append(checkbox._value)
checkbox.checked = True
assert len(resp.pyquery('[data-status_new]')) == 3
assert len(resp.pyquery('[data-status_finished]')) == 7
assert len(resp.pyquery('[data-status_accepted]')) == 5
resp = resp.forms[0].submit(f'button-action-{action2.id}')
assert '?job=' in resp.location
resp = resp.follow()
assert 'Executing task &quot;FOOBAR2&quot; on forms' in resp.text
assert '>completed<' in resp.text
for id in ids:
formdata = formdef.data_class().get(id)
comments = [x.content for x in formdata.iter_evolution_parts(JournalEvolutionPart)]
# check action was only executed on not final status
if formdata.status in ('wf-finished', 'wf-rejected'):
assert comments == ['<p>hello</p>']
else:
assert comments == ['<p>hello2</p>']
def test_backoffice_multi_actions_confirmation(pub):
create_superuser(pub)
Workflow.wipe()

View File

@ -1648,14 +1648,12 @@ def test_block_card_item_link(pub):
# check cards are links in backoffice
resp = app.get('/backoffice/management' + resp.request.path)
assert (
'<div class="value"><a href="http://example.net/backoffice/data/foo/%s/">card plop</a></div></div>'
% card.id
in resp
resp.pyquery('div.value > a[href="http://example.net/backoffice/data/foo/%s/"]' % card.id).text()
== 'card plop'
)
assert (
'<div class="value"><a href="http://example.net/backoffice/data/foo/%s/">card plop2</a></div></div>'
% card2.id
in resp
resp.pyquery('div.value > a[href="http://example.net/backoffice/data/foo/%s/"]' % card2.id).text()
== 'card plop2'
)

View File

@ -2293,3 +2293,27 @@ def test_backoffice_submission_then_front(pub):
f'with the number {formdata.get_display_id()}. It has been submitted for you by '
f'admin after a phone call.'
)
def test_backoffice_submission_sidebar_elements(pub):
user = create_user(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.PageField(id='0', label='1st page'),
fields.StringField(id='1', label='Field on 1st page'),
]
formdef.backoffice_submission_roles = user.roles[:]
formdef.workflow_roles = {'_receiver': 1}
formdef.store()
app = login(get_app(pub))
resp = app.get(formdef.get_submission_url(backoffice=True))
assert resp.pyquery('.submit-user-selection')
formdef.submission_sidebar_items = ['general', 'custom-template']
formdef.store()
resp = app.get(formdef.get_submission_url(backoffice=True))
assert not resp.pyquery('.submit-user-selection')

View File

@ -38,6 +38,7 @@ def test_workflow_inspect_page(pub):
st1 = workflow.add_status('Status1')
jump = st1.add_action('jump', id='_jump')
jump.timeout = '=86400'
jump.mode = 'timeout'
jump.status = 'finished'
workflow.store()

View File

@ -78,30 +78,13 @@ def nocache(settings):
@pytest.fixture
def sql_queries(monkeypatch):
import psycopg2.extensions
import wcs.sql
queries = []
wcs.sql.cleanup_connection()
class LoggingCursor(psycopg2.extensions.cursor):
"""A cursor that logs queries using its connection logging facilities."""
def execute(self, query, vars=None):
queries.append(query)
return super().execute(query, vars)
class MyLoggingConnection(wcs.sql.WcsPgConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cursor_factory = LoggingCursor
backup_original_class = wcs.sql.WcsPgConnection
wcs.sql.WcsPgConnection = MyLoggingConnection
wcs.sql.LoggingCursor.queries = queries
yield queries
wcs.sql.cleanup_connection()
wcs.sql.WcsPgConnection = backup_original_class
@pytest.fixture

View File

@ -2863,14 +2863,16 @@ def test_form_workflow_trigger(pub):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st2'
jump2 = st1.add_action('jump')
jump2.trigger = 'YYY'
jump2.mode = 'trigger'
jump2.status = 'st3'
jump2.set_marker_on_status = True
workflow.add_status('Status2', 'st2')
st2 = workflow.add_status('Status2', 'st2')
workflow.add_status('Status3', 'st3')
workflow.store()
@ -2897,8 +2899,8 @@ def test_form_workflow_trigger(pub):
user.roles = [role.id]
user.store()
app.post(formdata.get_url() + 'jump/trigger/XXX', status=302)
resp = app.post(formdata.get_url() + 'jump/trigger/XXX', status=302)
assert resp.location == formdata.get_url()
formdata = formdef.data_class().get(formdata.id)
assert formdata.status == 'wf-st2'
@ -2922,6 +2924,16 @@ def test_form_workflow_trigger(pub):
formdata = formdef.data_class().get(formdata.id)
assert formdata.workflow_data.get('data') == {'foo': 'bar'}
# check with redirect action
formdata.status = 'wf-st1'
formdata.store()
redirect = st2.add_action('redirect_to_url')
redirect.url = 'https://example.net'
workflow.store()
resp = app.post(formdata.get_url() + 'jump/trigger/XXX', status=302)
assert resp.location == 'https://example.net'
def test_form_worklow_multiple_identical_status(pub):
user = create_user(pub)
@ -2935,6 +2947,7 @@ def test_form_worklow_multiple_identical_status(pub):
st1.extra_css_class = 'CSS-STATUS1'
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st1'
workflow.store()
@ -3341,6 +3354,7 @@ def test_form_worklow_multiple_identical_status_with_journal_error(pub):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'XXX'
jump.mode = 'trigger'
jump.status = 'st1'
workflow.store()

View File

@ -1741,10 +1741,10 @@ def test_block_subfields_display_locations(pub):
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', required=True, label='Test'),
fields.TitleField(id='234', label='Blah Title'),
fields.SubtitleField(id='345', label='Blah Subtitle'),
fields.CommentField(id='456', label='Blah Comment'),
fields.StringField(id='123', required=True, label='Test'),
]
block.store()
@ -1849,7 +1849,6 @@ def test_block_block_counter(pub):
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', required=True, label='Test'),
fields.TitleField(
id='234', label='Blah Title #{{ block_counter.index }} #{{ block_counter.index0 }}'
),
@ -1859,6 +1858,7 @@ def test_block_block_counter(pub):
fields.CommentField(
id='456', label='Blah Comment #{{ block_counter.index }} #{{ block_counter.index0 }}'
),
fields.StringField(id='123', required=True, label='Test'),
]
block.store()
@ -2881,3 +2881,108 @@ def test_block_multiple_rows_single_draft(pub, logged_user, tracking_code):
assert formdef.data_class().count() == 1
assert formdef.data_class().select()[0].status == 'wf-new'
def test_block_field_post_condition(pub):
FormDef.wipe()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', label='Foo', varname='foo'),
fields.StringField(id='234', label='Bar', varname='bar'),
]
block.post_conditions = [
{
'condition': {'type': 'django', 'value': 'block_var_foo|startswith:"b"'},
'error_message': 'foo must start with a b.',
},
{
'condition': {'type': 'django', 'value': 'block_var_foo == block_var_bar'},
'error_message': 'foo and bar must be identical.',
},
]
block.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar'),
]
formdef.store()
app = get_app(pub)
resp = app.get(formdef.get_url())
resp.form['f1$element0$f123'] = 'foo'
resp.form['f1$element0$f234'] = 'bar'
resp = resp.form.submit('submit') # -> error
assert (
resp.pyquery('.widget-with-error .error').text()
== 'foo must start with a b. foo and bar must be identical.'
)
resp.form['f1$element0$f123'] = 'baz'
resp.form['f1$element0$f234'] = 'bar'
resp = resp.form.submit('submit') # -> error
assert resp.pyquery('.widget-with-error .error').text() == 'foo and bar must be identical.'
resp.form['f1$element0$f123'] = 'baz'
resp.form['f1$element0$f234'] = 'baz'
resp = resp.form.submit('submit') # -> validation page
assert resp.form['f1$element0$f123'].attrs['readonly']
resp = resp.form.submit('submit') # -> end page
formdata = formdef.data_class().select()[0]
assert formdata.status == 'wf-new'
assert formdata.data == {
'1': {'data': [{'123': 'baz', '234': 'baz'}], 'schema': {'123': 'string', '234': 'string'}},
'1_display': 'foobar',
}
# multiple rows
formdef.fields[0].max_items = 3
formdef.store()
formdef.data_class().wipe()
resp = app.get(formdef.get_url())
resp.form['f1$element0$f123'] = 'baz'
resp.form['f1$element0$f234'] = 'bar'
resp = resp.form.submit('f1$add_element')
assert not resp.pyquery('.widget-with-error')
resp.form['f1$element1$f123'] = 'bar'
resp.form['f1$element1$f234'] = 'bar'
resp = resp.form.submit('submit') # -> error
assert (
resp.pyquery('.widget-with-error[data-block-row="element0"] .error').text()
== 'foo and bar must be identical.'
)
assert resp.pyquery('.widget-with-error[data-block-row="element1"] .error').text() == ''
resp.form['f1$element1$f234'] = 'baz'
resp = resp.form.submit('submit') # -> error
assert (
resp.pyquery('.widget-with-error[data-block-row="element0"] .error').text()
== 'foo and bar must be identical.'
)
assert (
resp.pyquery('.widget-with-error[data-block-row="element1"] .error').text()
== 'foo and bar must be identical.'
)
resp.form['f1$element0$f123'] = 'bar'
resp.form['f1$element1$f234'] = 'bar'
resp = resp.form.submit('submit') # -> validation page
assert resp.form['f1$element0$f123'].attrs['readonly']
resp = resp.form.submit('submit') # -> end page
formdata = formdef.data_class().select()[0]
assert formdata.status == 'wf-new'
assert formdata.data == {
'1': {
'data': [{'123': 'bar', '234': 'bar'}, {'123': 'bar', '234': 'bar'}],
'schema': {'123': 'string', '234': 'string'},
},
'1_display': 'foobar, foobar',
}

View File

@ -1,6 +1,7 @@
import os
import re
import urllib.parse
from unittest import mock
import pytest
import responses
@ -679,3 +680,166 @@ def test_file_auto_convert_heic(pub):
resp = resp.follow()
assert resp.click('image.jpeg').follow().content_type == 'image/jpeg'
assert b'JFIF' in resp.click('image.jpeg').follow().body
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_no_clamd(pub, enable_tracking_codes):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['0'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_no_clamd(pub, enable_tracking_codes):
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
subp.run.assert_not_called() # -> no scan
formdata = formdef.data_class().select()[0]
assert formdata.data['1']['data'][0]['234'].clamd == {}
assert formdata.data['1']['data'][1]['234'].clamd == {}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [
fields.FileField(
id='0',
label='file',
)
]
formdef.store()
formdef.data_class().wipe()
upload = Upload('test.txt', b'foobar', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f0$file'] = upload
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
subp.run.assert_called_once_with(
['clamdscan', '--fdpass', formdata.data['0'].get_fs_filename()], check=False
)
assert formdata.data['0'].clamd == {'returncode': 0, 'waiting_scan': False}
@pytest.mark.parametrize('enable_tracking_codes', [True, False])
def test_form_block_file_field_clamd(pub, enable_tracking_codes):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='field label'),
]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.enable_tracking_codes = enable_tracking_codes
formdef.fields = [fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3)]
formdef.store()
formdef.data_class().wipe()
upload1 = Upload('test1.txt', b'foobar', 'text/plain')
upload2 = Upload('test2.txt', b'barfoo', 'text/plain')
resp = get_app(pub).get('/test/')
resp.forms[0]['f1$element0$f234$file'] = upload1
resp = resp.form.submit('f1$add_element')
resp.forms[0]['f1$element1$f234$file'] = upload2
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit') # -> validation
subp.run.assert_not_called() # -> no scan
resp = resp.form.submit('submit') # -> submit
formdata = formdef.data_class().select()[0]
assert subp.run.call_count == 2
calls = [
mock.call(
['clamdscan', '--fdpass', formdata.data['1']['data'][0]['234'].get_fs_filename()], check=False
),
mock.call(
['clamdscan', '--fdpass', formdata.data['1']['data'][1]['234'].get_fs_filename()], check=False
),
]
subp.run.assert_has_calls(calls)
assert formdata.data['1']['data'][0]['234'].clamd == {'returncode': 0, 'waiting_scan': False}
assert formdata.data['1']['data'][1]['234'].clamd == {'returncode': 0, 'waiting_scan': False}

View File

@ -411,6 +411,52 @@ def test_formdata_attachment_pick_from_portfolio(pub, fargo_url):
assert 'use-file-from-fargo' in resp.text
def test_formdata_attachment_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
create_user(pub)
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
attach = st1.add_action('addattachment', id='_attach')
attach.by = ['_submitter']
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
resp = resp.forms[0].submit('submit')
assert 'Check values then click submit.' in resp.text
resp = resp.forms[0].submit('submit')
assert resp.status_int == 302
resp = resp.follow()
assert 'The form has been recorded' in resp.text
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.forms[0].submit('button_attach')
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart'
attachment = formdata.evolution[-1].parts[0]
subp.run.assert_called_once_with(['clamdscan', '--fdpass', attachment.get_file_path()], check=False)
assert attachment.clamd == {'returncode': 0, 'waiting_scan': False}
def test_formdata_generated_document_download(pub):
create_user(pub)
wf = Workflow(name='status')

View File

@ -1559,6 +1559,14 @@ def test_form_item_with_card_image_data_source(pub, http_requests):
resp = app.get('/test/')
assert len(resp.pyquery('div.RadiobuttonsWithImagesWidget')) == 1
assert (
'--image-desktop-width: 150px;--image-desktop-height: 150px'
in resp.pyquery('div.RadiobuttonsWithImagesWidget').attr['style']
)
assert (
'--image-mobile-width: 75px;--image-mobile-height: 75px;'
in resp.pyquery('div.RadiobuttonsWithImagesWidget').attr['style']
)
assert len(resp.pyquery(':not(template) > label > img.item-with-image--picture')) == 3
assert TransientData.count() == 3

View File

@ -97,7 +97,7 @@ def test_blocks_in_form_details(pub):
assert 'test1' not in details
assert 'test empty block' not in details
assert 'test2' in details
assert 'bar' in details
assert 'plop:\n foo\n' in details
def test_block_migrate(pub):

View File

@ -62,3 +62,24 @@ def test_import_blockdef_multiple_errors(pub):
assert excinfo.value.details == (
'Unknown datasources: carddef:foo:unknown, carddef:unknown, foobar; Unknown field types: foobaz'
)
def test_import_blockdef_post_conditions(pub):
BlockDef.wipe()
carddef = CardDef()
carddef.name = 'foo'
carddef.fields = []
carddef.store()
blockdef = BlockDef()
blockdef.name = 'foo'
blockdef.fields = []
blockdef.post_conditions = [
{'condition': {'type': 'django', 'value': 'blah1'}, 'error_message': 'bar1'},
{'condition': {'type': 'django', 'value': 'blah2'}, 'error_message': 'bar2'},
]
export = ET.tostring(export_to_indented_xml(blockdef))
blockdef2 = BlockDef.import_from_xml(io.BytesIO(export))
assert blockdef.post_conditions == blockdef2.post_conditions

View File

@ -159,6 +159,7 @@ def test_trigger_jumps(pub):
st1 = workflow.add_status('Status1', 'st1')
jump = st1.add_action('jump')
jump.trigger = 'goto2'
jump.mode = 'trigger'
jump.status = 'st2'
st2 = workflow.add_status('Status2', 'st2')
workflow.store()

View File

@ -1613,15 +1613,15 @@ def test_named_datasource_in_formdef(pub):
assert datasource.slug == 'foobar'
formdef = FormDef()
assert not datasource.is_used_in_formdef(formdef)
assert not any(datasource.usage_in_formdef(formdef))
formdef.fields = [
fields.ItemField(id='0', label='string', data_source={'type': 'foobar'}),
]
assert datasource.is_used_in_formdef(formdef)
assert any(datasource.usage_in_formdef(formdef))
datasource.slug = 'barfoo'
assert not datasource.is_used_in_formdef(formdef)
assert not any(datasource.usage_in_formdef(formdef))
def test_data_source_in_template(pub):

View File

@ -367,7 +367,7 @@ def test_build_agenda_datasources(mock_collect, pub, chrono_url):
fields.ItemField(id='0', label='string', data_source={'type': datasource3.slug}),
]
formdef.store()
assert datasource3.is_used_in_formdef(formdef)
assert any(datasource3.usage_in_formdef(formdef))
datasource3.data_source['value'] = '{{ agendas_url }}api/agenda/events-FOOBAR/datetimes/'
datasource3.store()
build_agenda_datasources(pub)

View File

@ -5155,6 +5155,81 @@ def test_getlist_of_lazyformdata_field(pub):
assert tmpl.render(context) == '1,2,3'
def test_getlist_of_relation_in_block_field(pub):
BlockDef.wipe()
CardDef.wipe()
FormDef.wipe()
carddef = CardDef()
carddef.name = 'Card'
carddef.fields = [
fields.StringField(id='1', label='string', varname='str'),
fields.NumericField(id='2', label='numeric', varname='num'),
]
carddef.store()
ds = {'type': 'carddef:%s' % carddef.url_name}
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.ItemField(id='1', label='card', varname='card', data_source=ds),
]
block.store()
formdef = FormDef()
formdef.name = 'test form'
formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar', max_items=3, varname='block'),
]
formdef.store()
carddef.data_class().wipe()
formdef.data_class().wipe()
carddatas = []
for i in range(3):
carddata = carddef.data_class()()
carddata.data = {'1': str(i + 1), '2': i + 1}
carddata.just_created()
carddata.store()
carddatas.append(carddata)
# add empty data
carddata = carddef.data_class()()
carddata.data = {}
carddata.just_created()
carddata.store()
carddatas.append(carddata)
formdata = formdef.data_class()()
formdata.data = {
'1': {
'data': [
{'1': carddatas[0].id, '1_display': 'card'},
{'1': carddatas[1].id, '1_display': 'card'},
{'1': carddatas[2].id, '1_display': 'card'},
{'1': carddatas[0].id, '1_display': 'card'},
{'1': carddatas[3].id, '1_display': 'card'},
],
'schema': {'1': 'item'},
},
}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
context = pub.substitutions.get_context_variables(mode='lazy')
tmpl = Template('{{ form_var_block|getlist:"card_live_var_str"|sum }}')
assert tmpl.render(context) == '7'
tmpl = Template('{{ form_var_block|getlist:"card_live_var_num"|sum }}')
assert tmpl.render(context) == '7'
with pytest.raises(AttributeError):
tmpl = Template('{{ form_var_block|getlist:"card_live_var_plop"|sum }}')
tmpl.render(context)
def test_items_field_contains(pub):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
@ -5619,7 +5694,9 @@ def test_rst_form_details_all_fields(pub):
assert '**Subtitle**\n\n' in rst
assert 'Text:\n para1\n para2' in rst
assert 'File:\n test.jpeg' in rst
assert 'Block Field:\n XfooY, Xfoo2Y' in rst
assert 'Block Field:\n' in rst
assert 'Test:\n foo\n' in rst
assert 'Test:\n foo2\n' in rst
assert 'Map:\n 2;4\n\n' in rst

View File

@ -261,7 +261,11 @@ def get_logs(hostname=None):
os.path.join(base_dir, 'cron-logs', now.strftime('%Y'), 'cron.log-%s' % now.strftime('%Y%m%d'))
) as fd:
lines = fd.readlines()
lines = [line.split(']', 1)[1].strip() for line in lines] # split on ] to get what follows the PID
# split on ] to get what follows the PID
lines = [line.split(']', 1)[1].strip() for line in lines]
# remove resource usage details
clean_usage_line_re = re.compile('(resource usage summary).*')
lines = [clean_usage_line_re.sub(r'\1', x) for x in lines]
return lines
@ -440,7 +444,7 @@ def test_cron_command_jobs(settings):
clear_log_files()
call_command('cron', job_name='job2', domain='example.net')
assert jobs == ['job2']
assert get_logs('example.net') == ['start', "running jobs: ['job2']"]
assert get_logs('example.net') == ['start', "running jobs: ['job2']", 'resource usage summary']
get_publisher_class().cronjobs = []
jobs = []
clear_log_files()
@ -450,6 +454,7 @@ def test_cron_command_jobs(settings):
'start',
"running jobs: ['job2']",
'long job: job2 (took 0 minutes, 0 CPU minutes)',
'resource usage summary',
]
assert jobs == ['job2']
get_publisher_class().cronjobs = []
@ -464,6 +469,7 @@ def test_cron_command_jobs(settings):
'job3: running on "bar" took 0 minutes, 0 CPU minutes',
'job3: running on "blah" took 0 minutes, 0 CPU minutes',
'long job: job3 (took 0 minutes, 0 CPU minutes)',
'resource usage summary',
]
assert jobs == ['job3']
@ -547,6 +553,7 @@ def test_cron_command_job_exception(settings):
'start',
"running jobs: ['job1']",
'exception running job job1: Error',
'resource usage summary',
]
clean_temporary_pub()
@ -570,7 +577,12 @@ def test_cron_command_job_log(settings):
get_publisher_class().cronjobs = []
clear_log_files()
call_command('cron', job_name='job1', domain='example.net')
assert get_logs('example.net') == ['start', "running jobs: ['job1']", 'hello']
assert get_logs('example.net') == [
'start',
"running jobs: ['job1']",
'hello',
'resource usage summary',
]
pub.load_site_options()
pub.site_options.set('options', 'cron-log-level', 'debug')

View File

@ -2167,6 +2167,7 @@ def test_view_performances(pub):
jump = status.add_action('jump', id='_jump%s' % j)
jump.by = []
jump.timeout = 5
jump.mode = 'timeout'
jump.status = 'st%s' % (j + 1)
workflow.store()
workflows.append(workflow)

View File

@ -478,6 +478,7 @@ def test_workflow_tests_automatic_jump_timeout(pub):
jump = new_status.add_action('jump')
jump.status = stalled_status.id
jump.timeout = 120 * 60 # 2 hours
jump.mode = 'timeout'
jump.condition = {'type': 'django', 'value': 'form_receipt_datetime|age_in_days >= 1'}
sendmail = new_status.add_action('sendmail')
@ -620,7 +621,7 @@ def test_workflow_tests_sendmail(mocked_send_email, pub):
end_status = workflow.add_status(name='End status')
sendmail = new_status.add_action('sendmail')
sendmail.to = ['test@example.org']
sendmail.to = ['test@example.org', 'test2@example.org']
sendmail.subject = 'In new status'
sendmail.body = 'xxx'
@ -687,7 +688,7 @@ def test_workflow_tests_sendmail(mocked_send_email, pub):
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Email was not sent to address "other@example.org".'
assert 'Email addresses: test@example.org' in excinfo.value.details
assert 'Email addresses: test2@example.org, test@example.org' in excinfo.value.details
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to end status'),
@ -895,17 +896,17 @@ def test_workflow_tests_alert(pub):
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(message='Hello 42'),
workflow_tests.AssertAlert(message='Héllo 42 abc'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: None' in excinfo.value.details
assert 'Expected alert: Hello 42' in excinfo.value.details
assert 'Expected alert: Héllo 42 abc' in excinfo.value.details
alert = new_status.add_action('displaymsg')
alert.message = 'Hello {{ 41|add:1 }}'
alert.message = 'Héllo <strong>{{ 41|add:1 }}</strong> abc'
workflow.store()
formdef.refresh_from_storage()
@ -918,7 +919,7 @@ def test_workflow_tests_alert(pub):
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: <p>Hello 42</p>' in excinfo.value.details
assert 'Displayed alerts: Héllo 42 abc' in excinfo.value.details
assert 'Expected alert: Hello 43' in excinfo.value.details
@ -1267,6 +1268,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
jump = status_with_timeout_jump.add_action('jump')
jump.status = status_with_button.id
jump.timeout = '{{ 1 }} day'
jump.mode = 'timeout'
choice = status_with_button.add_action('choice')
choice.label = 'Accept'

View File

@ -94,6 +94,7 @@ def test_get_json_export_dict(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.mode = 'timeout'
jump.status = 'st2'
workflow.roles['_other'] = 'Other Function'
@ -122,6 +123,7 @@ def test_action_repr(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.mode = 'timeout'
jump.status = 'st2'
action = workflow.add_global_action('Timeout')

View File

@ -491,8 +491,12 @@ def test_export_to_model_form_details_section(pub, filename):
assert '>2015-05-12<' in new_content
assert 'Invisible' not in new_content
assert new_content.count('/table:table-cell') == 8
assert 'XfooY, Xfoo2Y' in new_content
assert '>14.4<' in new_content
# block sub fields
assert '>foo<' in new_content
assert '>bar<' in new_content
assert '>foo2<' in new_content
assert '>bar2<' in new_content
if filename == 'template-form-details-no-styles.odt':
with open(formdata.evolution[0].parts[-1].get_file_path(), 'rb') as fd:

View File

@ -1,6 +1,7 @@
import io
import json
import os
from unittest import mock
import pytest
import responses
@ -829,3 +830,159 @@ def test_workflow_form_line_details(pub):
fields.BlockField(id='1', label='test', block_slug='foobar', varname='fooblock'),
]
assert display_form.get_line_details() == 'by foorole'
def test_workflow_form_block_condition(pub):
FormDef.wipe()
Workflow.wipe()
BlockDef.wipe()
user = create_user(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.StringField(id='123', required=True, label='One', varname='one'),
fields.StringField(
id='234',
required=True,
label='Two',
condition={'type': 'django', 'value': 'block_var_one|startswith:"test"'},
),
]
block.store()
wf = Workflow(name='test')
status = wf.add_status('New', 'st1')
status.items = []
display_form = status.add_action('form', id='_display_form')
display_form.by = ['_submitter']
display_form.varname = 'blah'
display_form.hide_submit_button = False
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar', varname='fooblock', max_items=3),
]
jump = status.add_action('jumponsubmit', id='_jump')
jump.status = status.id
wf.store()
formdef = create_formdef()
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
app = login(get_app(pub), username='foo', password='foo')
resp = app.get(formdata.get_url(backoffice=False))
assert resp.pyquery('[data-widget-name="fblah_1$element0$f234"]').attr.style == 'display: none'
live_url = resp.html.find('form').attrs['data-live-url']
live_resp = app.post(live_url + '?modified_field_id[]=123', params=resp.form.submit_fields())
assert live_resp.json['result']['blah_1-234-0']['visible'] is False
resp.form['fblah_1$element0$f123'] = 'test'
live_resp = app.post(live_url + '?modified_field_id[]=123', params=resp.form.submit_fields())
assert live_resp.json['result']['blah_1-234-0']['visible'] is True
resp = resp.form.submit('fblah_1$add_element')
resp = resp.form.submit('fblah_1$add_element')
live_resp = app.post(live_url + '?modified_field_id[]=123', params=resp.form.submit_fields())
assert live_resp.json['result']['blah_1-234-0']['visible'] is True
assert live_resp.json['result']['blah_1-234-1']['visible'] is False
assert live_resp.json['result']['blah_1-234-2']['visible'] is False
resp.form['fblah_1$element2$f123'] = 'test3'
live_resp = app.post(live_url + '?modified_field_id[]=123', params=resp.form.submit_fields())
assert live_resp.json['result']['blah_1-234-0']['visible'] is True
assert live_resp.json['result']['blah_1-234-1']['visible'] is False
assert live_resp.json['result']['blah_1-234-2']['visible'] is True
def test_workflow_form_file_clamd(pub):
pub.load_site_options()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'enable-clamd', 'true')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
pub.load_site_options()
FormDef.wipe()
Workflow.wipe()
BlockDef.wipe()
user = create_user(pub)
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='123', required=True, label='Test', varname='test'),
]
block.store()
wf = Workflow(name='test')
status = wf.add_status('New', 'st1')
status.items = []
display_form = status.add_action('form', id='_display_form')
display_form.by = ['_submitter']
display_form.varname = 'blah'
display_form.hide_submit_button = False
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
fields.BlockField(id='1', label='test', block_slug='foobar', varname='fooblock', max_items=3),
fields.FileField(id='2', label='test2', varname='file'),
]
jump = status.add_action('jumponsubmit', id='_jump')
jump.status = status.id
wf.store()
formdef = create_formdef()
formdef.workflow_id = wf.id
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
app = login(get_app(pub), username='foo', password='foo')
resp = app.get(formdata.get_url(backoffice=False))
resp.form['fblah_1$element0$f123$file'] = Upload('test1.txt', b'foobar1', 'text/plain')
resp = resp.form.submit('fblah_1$add_element')
resp.form['fblah_1$element1$f123$file'] = Upload('test2.txt', b'foobar2', 'text/plain')
resp.form['fblah_2$file'] = Upload('test3.txt', b'foobar3', 'text/plain')
with mock.patch('wcs.clamd.subprocess') as subp:
attrs = {'run.return_value': mock.Mock(returncode=0)}
subp.configure_mock(**attrs)
resp = resp.form.submit('submit').follow()
assert subp.run.call_count == 3
formdata = formdef.data_class().select()[0]
data_part = formdata.evolution[0].parts[-1].data
calls = [
mock.call(
['clamdscan', '--fdpass', data_part['blah_1']['data'][0]['123'].get_fs_filename()],
check=False,
),
mock.call(
['clamdscan', '--fdpass', data_part['blah_1']['data'][1]['123'].get_fs_filename()],
check=False,
),
mock.call(['clamdscan', '--fdpass', data_part['blah_2'].get_fs_filename()], check=False),
]
subp.run.assert_has_calls(calls)

View File

@ -46,6 +46,28 @@ def rewind(formdata, seconds):
formdata.evolution[-1].time = formdata.evolution[-1].time - datetime.timedelta(seconds=seconds)
def test_jump_migrate_mode(pub):
item = JumpWorkflowStatusItem()
item.migrate()
assert item.mode == 'immediate'
item = JumpWorkflowStatusItem()
item.trigger = 'plop'
item.migrate()
assert item.mode == 'trigger'
item = JumpWorkflowStatusItem()
item.timeout = '12'
item.migrate()
assert item.mode == 'timeout'
item = JumpWorkflowStatusItem()
item.trigger = 'plop'
item.timeout = '12'
item.migrate()
assert item.mode == 'trigger'
def test_jump_nothing(pub):
FormDef.wipe()
formdef = FormDef()
@ -246,10 +268,17 @@ def test_timeout(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
workflow.store()
# test timeout_parse
assert jump.timeout_parse(None) is None # no value, kept as is
assert jump.timeout_parse('') == '' # no value, kept as is
assert jump.timeout_parse('20 minutes') == 20 * 60
assert jump.timeout_parse('20') == 0 # not unit
assert jump.timeout_parse('error') == 0 # not a valid value
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
@ -311,6 +340,7 @@ def test_timeout_with_humantime_template(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = '{{ 30 }} minutes'
jump.mode = 'timeout'
jump.status = 'st2'
workflow.store()
@ -385,6 +415,7 @@ def test_legacy_timeout(pub):
jump = st1.add_action('timeout', id='_jump')
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
workflow.store()
@ -415,6 +446,7 @@ def test_timeout_then_remove(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
st2.add_action('remove')
@ -453,6 +485,7 @@ def test_timeout_with_mark(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
jump.set_marker_on_status = True
@ -484,6 +517,7 @@ def test_timeout_on_anonymised(pub):
jump = st1.add_action('timeout', id='_jump')
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
workflow.store()
@ -518,6 +552,7 @@ def test_jump_missing_previous_mark(pub):
jump.by = ['_submitter', '_receiver']
jump.status = '_previous'
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
workflow.store()
@ -575,6 +610,7 @@ def test_timeout_tracing(pub, admin_user):
jump = st1.add_action('timeout', id='_jump')
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
add_message = st2.add_action('register-comment')
@ -616,6 +652,7 @@ def test_jump_self_timeout(pub):
jump = st1.add_action('jump')
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st1'
workflow.store()
@ -645,6 +682,7 @@ def test_timeout_cron_debug_log(pub):
jump = st1.add_action('jump', id='_jump')
jump.by = ['_submitter', '_receiver']
jump.timeout = 30 * 60 # 30 minutes
jump.mode = 'timeout'
jump.status = 'st2'
workflow.store()
@ -672,4 +710,5 @@ def test_timeout_cron_debug_log(pub):
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
assert get_logs('example.net')[:2] == ['start', "running jobs: ['evaluate_jumps']"]
assert 'applying timeouts on baz' in get_logs('example.net')[2]
assert 'event: timeout-jump' in get_logs('example.net')[3]
assert 'SELECT' in get_logs('example.net')[3]
assert 'event: timeout-jump' in [x for x in get_logs('example.net') if 'SQL' not in x][3]

View File

@ -26,6 +26,7 @@ from wcs.backoffice.deprecations import DeprecationsDirectory
from wcs.backoffice.snapshots import SnapshotsDirectory
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.categories import BlockCategory
from wcs.fields.page import PostConditionsTableWidget
from wcs.formdef import UpdateStatisticsDataAfterJob
from wcs.qommon import _, misc, template
from wcs.qommon.errors import AccessForbiddenError, TraversalError
@ -115,7 +116,7 @@ class BlockDirectory(FieldsDirectory):
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('</ul>')
r += self.get_documentable_button()
r += htmltext('<a href="settings" rel="popup" role="button">%s</a>') % _('Settings')
r += htmltext('<a href="settings" role="button">%s</a>') % _('Settings')
r += htmltext('</span>')
r += htmltext('</div>')
r += utils.last_modification_block(obj=self.objectdef)
@ -297,6 +298,13 @@ class BlockDirectory(FieldsDirectory):
size=50,
hint=_('Use block_var_... to refer to fields.'),
)
form.add(
PostConditionsTableWidget,
'post_conditions',
title=_('Validation conditions'),
value=self.objectdef.post_conditions,
)
if not self.objectdef.is_readonly():
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
@ -314,6 +322,7 @@ class BlockDirectory(FieldsDirectory):
form.get_widget('slug').set_error(_('This identifier is already used.'))
if form.get_widget('category_id'):
self.objectdef.category_id = form.get_widget('category_id').parse()
self.objectdef.post_conditions = form.get_widget('post_conditions').parse()
widget_template = form.get_widget('digest_template')
if widget_template.parse() and 'form_var_' in widget_template.parse():
widget_template.set_error(

View File

@ -371,12 +371,11 @@ class NamedDataSourcePage(Directory, DocumentableMixin):
)
def usage_in_formdefs(self):
formdefs = []
fields = []
for formdef in get_formdefs_of_all_kinds():
if self.datasource.is_used_in_formdef(formdef):
formdefs.append(formdef)
formdefs.sort(key=lambda x: x.name.lower())
return formdefs
fields.extend(list(self.datasource.usage_in_formdef(formdef)))
fields.sort(key=lambda x: x._formdef.name.lower())
return fields
def has_preview_block(self):
return bool(self.datasource.type in ('json', 'geojson', 'formula', 'jsonvalue', 'wcs:users'))

View File

@ -243,6 +243,7 @@ class OptionsDirectory(Directory):
category_class = Category
category_empty_choice = _('Select a category for this form')
section = 'forms'
backoffice_submission_options_label = _('Backoffice submission')
_q_exports = [
'confirmation',
@ -256,6 +257,7 @@ class OptionsDirectory(Directory):
'appearance',
'templates',
'user_support',
('backoffice-submission', 'backoffice_submission'),
]
def __init__(self, formdef, formdefui):
@ -368,6 +370,18 @@ class OptionsDirectory(Directory):
)
return self.handle(form, _('Management'))
def backoffice_submission(self):
form = Form(enctype='multipart/form-data')
form.add(
CheckboxesWidget,
'submission_sidebar_items',
title=_('Sidebar elements'),
options=[(x[0], x[1], x[0]) for x in self.formdef.get_submission_sidebar_available_items()],
value=self.formdef.get_submission_sidebar_items(),
inline=False,
)
return self.handle(form, self.backoffice_submission_options_label)
def online_status(self):
form = Form(enctype='multipart/form-data')
form.add(CheckboxWidget, 'disabled', title=_('Disable access to form'), value=self.formdef.disabled)
@ -519,6 +533,7 @@ class OptionsDirectory(Directory):
'drafts_max_per_user',
'user_support',
'management_sidebar_items',
'submission_sidebar_items',
'history_pane_default_mode',
]
for attr in attrs:
@ -533,6 +548,10 @@ class OptionsDirectory(Directory):
new_value = set(new_value or [])
if new_value == self.formdef.get_default_management_sidebar_items():
new_value = {'__default__'}
if attr == 'submission_sidebar_items':
new_value = set(new_value or [])
if new_value == self.formdef.get_default_submission_sidebar_items():
new_value = {'__default__'}
if attr == 'digest_template':
if self.formdef.default_digest_template != new_value:
self.changed = True
@ -837,6 +856,16 @@ class FormDefPage(Directory, TempfileDirectoryMixin, DocumentableMixin):
and self.formdef.appearance_keywords
or pgettext_lazy('appearance', 'Standard'),
),
'backoffice_submission_options': self.add_option_line(
'options/backoffice-submission',
self.options_directory_class.backoffice_submission_options_label,
_('Custom')
if (
self.formdef.submission_sidebar_items
not in ({'__default__'}, self.formdef.get_default_submission_sidebar_items())
)
else _('Default'),
),
}
unknown_wf = self.formdef.workflow.id == Workflow.get_unknown_workflow().id
if get_publisher().get_backoffice_root().is_accessible('workflows') and not unknown_wf:

View File

@ -458,7 +458,7 @@ class CardFileByTokenDirectory(Directory):
context = token.data
carddef = CardDef.get_by_urlname(context['carddef_slug'])
data = carddef.data_class().get(context['data_id'])
for field_data in data.get_all_file_data():
for field_data in data.get_all_file_data(with_history=True):
if not hasattr(field_data, 'file_digest'):
continue
if field_data.file_digest() == context['file_digest']:

View File

@ -43,6 +43,7 @@ class CardDefUI(FormDefUI):
class CardDefOptionsDirectory(OptionsDirectory):
category_class = CardDefCategory
category_empty_choice = _('Select a category for this card model')
backoffice_submission_options_label = _('Submission')
section = 'cards'
def get_templates_form(self):

View File

@ -2618,6 +2618,7 @@ class FormPage(Directory, TempfileDirectoryMixin):
include_submission = get_query_flag('include-submission') or full
include_workflow = get_query_flag('include-workflow') or full
include_workflow_data = get_query_flag('include-workflow-data') or full
include_actions = get_query_flag('include-actions') or full
if include_evolution or include_workflow:
self.formdef.data_class().load_all_evolutions(items)
# noqa pylint: disable=too-many-boolean-expressions
@ -2628,6 +2629,7 @@ class FormPage(Directory, TempfileDirectoryMixin):
or include_submission
or include_workflow
or include_workflow_data
or include_actions
):
job = JsonFileExportAfterJob(self.formdef)
output = job.create_json_export(
@ -2643,6 +2645,7 @@ class FormPage(Directory, TempfileDirectoryMixin):
include_unnamed_fields=False,
include_workflow=include_workflow,
include_workflow_data=include_workflow_data,
include_actions=include_actions,
values_at=get_request().form.get('at'),
)
if job.id:
@ -4467,6 +4470,7 @@ class JsonFileExportAfterJob(CsvExportAfterJob):
include_unnamed_fields,
include_workflow,
include_workflow_data,
include_actions,
values_at=None,
):
# noqa pylint: disable=too-many-arguments
@ -4528,6 +4532,7 @@ class JsonFileExportAfterJob(CsvExportAfterJob):
include_unnamed_fields=include_unnamed_fields,
include_workflow=include_workflow,
include_workflow_data=include_workflow_data,
include_actions=include_actions,
values_at=values_at,
)
except NoContentSnapshotAt:
@ -4555,6 +4560,7 @@ class JsonFileExportAfterJob(CsvExportAfterJob):
include_unnamed_fields=True,
include_workflow=True,
include_workflow_data=True,
include_actions=False,
)
},
indent=2,

View File

@ -235,6 +235,8 @@ class FormFillPage(PublicFormFillPage):
def get_sidebar(self, data):
r = TemplateIO(html=True)
sidebar_items = self.formdef.get_submission_sidebar_items()
formdata = None
if self.edit_mode:
formdata = self.edited_data
@ -249,12 +251,13 @@ class FormFillPage(PublicFormFillPage):
if formdata and self.selected_user_id:
formdata.user_id = self.selected_user_id
if self.formdef.enable_tracking_codes and not self.edit_mode:
r += htmltext('<h3>%s</h3>') % _('Tracking Code')
if formdata and formdata.tracking_code:
r += htmltext('<p>%s</p>') % formdata.tracking_code
else:
r += htmltext('<p>-</p>')
if 'general' in sidebar_items:
if self.formdef.enable_tracking_codes and not self.edit_mode:
r += htmltext('<h3>%s</h3>') % _('Tracking Code')
if formdata and formdata.tracking_code:
r += htmltext('<p>%s</p>') % formdata.tracking_code
else:
r += htmltext('<p>-</p>')
if formdata and self.on_validation_page:
if self.has_channel_support and self.selected_submission_channel:
@ -269,13 +272,16 @@ class FormFillPage(PublicFormFillPage):
r += FormBackOfficeStatusPage(self.formdef, formdata).get_extra_context_bar(parent=self)
else:
if (
formdata
'submission-context' in sidebar_items
and formdata
and formdata.submission_context
and set(formdata.submission_context.keys()).difference({'return_url', 'cancel_url'})
):
r += FormBackOfficeStatusPage(self.formdef, formdata).get_extra_submission_context_bar()
if formdata and formdata.submission_channel:
if 'submission-context' not in sidebar_items:
pass
elif formdata and formdata.submission_channel:
r += FormBackOfficeStatusPage(self.formdef, formdata).get_extra_submission_channel_bar()
elif self.has_channel_support:
r += htmltext('<div class="submit-channel-selection" style="display: none;">')
@ -292,16 +298,16 @@ class FormFillPage(PublicFormFillPage):
r += htmltext('</select>')
r += htmltext('</div>')
if formdata and formdata.user_id:
if 'user' in sidebar_items and formdata and formdata.user_id:
r += FormBackOfficeStatusPage(self.formdef, formdata).get_extra_submission_user_id_bar(
parent=self
)
elif self.has_user_support:
elif 'user' in sidebar_items and self.has_user_support:
r += FormBackOfficeStatusPage(self.formdef, formdata).get_extra_submission_user_selection_bar(
parent=self
)
if self.formdef.submission_lateral_template:
if 'custom-template' in sidebar_items and self.formdef.submission_lateral_template:
r += htmltext(
'<div data-async-url="%slateral-block?ctx=%s"></div>'
% (

View File

@ -24,7 +24,7 @@ from contextlib import contextmanager
from quixote import get_publisher, get_request, get_response
from quixote.html import htmltag, htmltext
from . import data_sources, fields
from . import conditions, data_sources, fields
from .categories import BlockCategory
from .formdata import FormData
from .qommon import _, misc
@ -33,6 +33,7 @@ from .qommon.form import CompositeWidget, SingleSelectHintWidget, WidgetList
from .qommon.storage import Equal, StorableObject
from .qommon.substitution import CompatibilityNamesDict
from .qommon.template import Template
from .qommon.xml_storage import PostConditionsXmlMixin
class BlockdefImportError(Exception):
@ -46,7 +47,7 @@ class BlockdefImportUnknownReferencedError(UnknownReferencedErrorMixin, Blockdef
pass
class BlockDef(StorableObject):
class BlockDef(StorableObject, PostConditionsXmlMixin):
_names = 'blockdefs'
_indexes = ['slug']
backoffice_class = 'wcs.admin.blocks.BlockDirectory'
@ -61,6 +62,7 @@ class BlockDef(StorableObject):
digest_template = None
category_id = None
documentation = None
post_conditions = None
SLUG_DASH = '_'
@ -177,6 +179,8 @@ class BlockDef(StorableObject):
for field in self.fields or []:
fields.append(field.export_to_xml(include_id=True))
self.post_conditions_export_to_xml(root, include_id=include_id)
return root
@classmethod
@ -247,6 +251,9 @@ class BlockDef(StorableObject):
BlockCategory.object_category_xml_import(blockdef, tree, include_id=include_id)
post_conditions_node = tree.find('post_conditions')
blockdef.post_conditions_init_with_xml(post_conditions_node, include_id=include_id)
unknown_datasources = set()
if check_datasources:
from wcs.carddef import CardDef
@ -333,6 +340,8 @@ class BlockDef(StorableObject):
location = 'forms/blocks/%s/' % self.id
for field in self.fields or []:
yield from field.i18n_scan(base_location=location)
for post_condition in self.post_conditions or []:
yield location, None, post_condition.get('error_message')
def get_all_fields(self):
return self.fields
@ -446,6 +455,22 @@ class BlockSubWidget(CompositeWidget):
all_lists = False
if widget_value.get(widget.field.id) not in empty_values:
empty = False
if not empty and self.block.post_conditions:
error_messages = []
with self.block.visibility_context(value, self.index):
for i, post_condition in enumerate(self.block.post_conditions):
condition = post_condition.get('condition')
try:
if conditions.Condition(condition, record_errors=False).evaluate():
continue
except RuntimeError:
pass
error_message = post_condition.get('error_message')
error_messages.append(get_publisher().translate(error_message))
if error_messages:
self.set_error(' '.join(error_messages))
if empty and not all_lists and not get_publisher().keep_all_block_rows_mode:
value = None
for widget in self.get_widgets(): # reset "required" errors
@ -550,6 +575,10 @@ class BlockWidget(WidgetList):
self._parse(request)
if self.required and self.value is None:
self.set_error(_(self.REQUIRED_ERROR))
for widget in self.widgets:
# mark required rows with a special attribute, to avoid doubling the
# error messages in the template.
widget.is_required_error = bool(widget.error == self.REQUIRED_ERROR)
return self.value
def add_media(self):

View File

@ -334,6 +334,18 @@ class CardDef(FormDef):
excluded_parts.append('user')
return [x for x in super().get_management_sidebar_available_items() if x[0] not in excluded_parts]
def get_default_submission_sidebar_items(self):
sidebar_items = super().get_default_submission_sidebar_items()
if not self.user_support:
sidebar_items.remove('user')
return sidebar_items
def get_submission_sidebar_available_items(self):
excluded_parts = []
if not self.user_support:
excluded_parts.append('user')
return [x for x in super().get_submission_sidebar_available_items() if x[0] not in excluded_parts]
def get_cards_graph(category=None, show_orphans=False):
out = io.StringIO()

72
wcs/clamd.py Normal file
View File

@ -0,0 +1,72 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2024 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import subprocess
from quixote import get_publisher, get_response
from wcs.qommon import _
from wcs.qommon.afterjobs import AfterJob
RETURNCODE_MAP = {
0: _('No virus found'),
1: _('Virus(es) found'),
2: _('An error occurred'),
}
class PickableClamD:
@property
def clamd(self):
if getattr(self, '_clamd', None) is None:
self._clamd = {}
return self._clamd
def init_clamd(self):
if get_publisher().has_site_option('enable-clamd') and 'waiting_scan' not in self.clamd:
self.clamd['waiting_scan'] = True
class ClamDScanJob(AfterJob):
def __init__(self, formdata, **kwargs):
super().__init__(**kwargs)
self.formdata = formdata
def run_clamd(self, obj):
path = None
if hasattr(obj, 'get_fs_filename'):
path = obj.get_fs_filename()
elif hasattr(obj, 'get_file_path'):
path = obj.get_file_path()
clamd = subprocess.run(['clamdscan', '--fdpass', path], check=False)
obj.clamd['returncode'] = clamd.returncode
obj.clamd['waiting_scan'] = False
def execute(self):
store = False
for field_data in self.formdata.get_all_file_data(with_history=False):
if field_data.clamd.get('waiting_scan', False):
self.run_clamd(field_data)
store = True
if store:
self.formdata.store()
self.status = 'completed'
def add_clamd_scan_job(formdata):
if get_publisher().has_site_option('enable-clamd'):
job = get_response().add_after_job(ClamDScanJob(formdata=formdata))
job.store()

View File

@ -1192,18 +1192,18 @@ class NamedDataSource(XmlStorableObject):
from wcs.formdef import get_formdefs_of_all_kinds
for formdef in get_formdefs_of_all_kinds():
if self.is_used_in_formdef(formdef):
if any(self.usage_in_formdef(formdef)):
return True
return False
def is_used_in_formdef(self, formdef):
def usage_in_formdef(self, formdef):
for field in formdef.fields or []:
data_source = getattr(field, 'data_source', None)
if not data_source:
continue
if data_source.get('type') == self.slug:
return True
return False
field._formdef = formdef
yield field
class StubNamedDataSource(NamedDataSource):

View File

@ -256,6 +256,12 @@ class Field:
return ''
return self._formdef.get_field_admin_url(field=self)
def get_admin_url_label(self):
return _('%(form)s, field: "%(field)s"') % {
'form': self._formdef.name,
'field': self.ellipsized_label,
}
@property
def include_in_listing(self):
return 'listings' in (self.display_locations or [])

View File

@ -15,17 +15,13 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import xml.etree.ElementTree as ET
from django.utils.encoding import force_str
from quixote import get_publisher
from quixote.html import TemplateIO, htmltext
from wcs.blocks import BlockDef, BlockWidget
from wcs.qommon import _
from wcs.qommon.form import CheckboxWidget, IntWidget, SingleSelectWidget, StringWidget
from wcs.qommon.ods import NS as OD_NS
from wcs.qommon.ods import clean_text as od_clean_text
from .base import SetValueError, WidgetField
from .item import UnknownCardValueError
@ -200,67 +196,25 @@ class BlockField(WidgetField):
parts.append(self.block.get_display_value(subvalue) or '')
return ', '.join(parts)
def get_view_value(self, value, summary=False, include_unset_required_fields=False, **kwargs):
from wcs.workflows import template_on_formdata
if 'value_id' not in kwargs:
# when called from get_rst_view_value()
return str(value or '')
value = kwargs['value_id']
if value is None:
return ''
r = TemplateIO(html=True)
for i, row_value in enumerate(value['data']):
def get_value_details(self, formdata, value, include_unset_required_fields):
for i, row_value in enumerate((value or {}).get('data') or []):
try:
block = self.block
except KeyError:
# block was deleted, ignore
continue
context = block.get_substitution_counter_variables(i)
for field in block.fields:
if summary and not field.include_in_summary_page:
continue
if not hasattr(field, 'get_value_info'):
# inert field
if field.include_in_summary_page:
with get_publisher().substitutions.temporary_feed(context):
if field.key == 'title':
label = template_on_formdata(None, field.label, autoescape=False)
r += htmltext('<div class="title %s"><h3>%s</h3></div>') % (
field.extra_css_class or '',
label,
)
elif field.key == 'subtitle':
label = template_on_formdata(None, field.label, autoescape=False)
r += htmltext('<div class="subtitle %s"><h4>%s</h4></div>') % (
field.extra_css_class or '',
label,
)
elif field.key == 'comment':
r += htmltext(
'<div class="comment-field %s">%s</div>'
% (field.extra_css_class or '', field.get_text())
)
continue
css_classes = ['field', 'field-type-%s' % field.key]
if field.extra_css_class:
css_classes.append(field.extra_css_class)
sub_value, sub_value_details = field.get_value_info(row_value)
if sub_value is None and not (field.required and include_unset_required_fields):
continue
label_id = f'form-field-label-f{self.id}-r{i}-s{field.id}'
r += htmltext('<div class="%s">' % ' '.join(css_classes))
r += htmltext('<p id="%s" class="label">%s</p> ') % (label_id, field.label)
if sub_value is None:
r += htmltext('<div class="value"><i>%s</i></div>') % _('Not set')
else:
r += htmltext('<div class="value">')
kwargs = {'parent_field': self, 'parent_field_index': i, 'label_id': label_id}
kwargs.update(**sub_value_details)
r += field.get_view_value(sub_value, **kwargs)
r += htmltext('</div>')
r += htmltext('</div>\n')
return r.getvalue()
with get_publisher().substitutions.temporary_feed(context):
yield from formdata.get_summary_field_details(
fields=block.fields,
include_unset_required_fields=include_unset_required_fields,
data=row_value,
parent_field=self,
parent_field_index=i,
)
def get_view_value(self, value, summary=False, include_unset_required_fields=False, **kwargs):
return str(value or '')
def get_value_info(self, data):
value = data.get(self.id)
@ -338,11 +292,6 @@ class BlockField(WidgetField):
return {'data': result, 'schema': {x.id: x.key for x in self.block.fields}}
def get_opendocument_node_value(self, value, formdata=None, **kwargs):
node = ET.Element('{%s}span' % OD_NS['text'])
node.text = od_clean_text(force_str(value))
return node
def __getstate__(self):
# do not store _block cache
odict = super().__getstate__()

View File

@ -26,7 +26,6 @@ from wcs.qommon.form import (
CheckboxWidget,
ComputedExpressionWidget,
HiddenWidget,
IntWidget,
JsonpSingleSelectWidget,
MapMarkerSelectionWidget,
RadiobuttonsWidget,
@ -60,20 +59,32 @@ class ItemWithImageFieldMixin:
image_mobile_size = 75
def fill_image_options_admin_form(self, form, **kwargs):
def validate_image_size(value):
if value.isnumeric():
return
elif 'x' in value:
width, height = value.split('x')
if not (width.isnumeric() and height.isnumeric()):
raise ValueError(_('Wrong format'))
else:
raise ValueError(_('Wrong format'))
form.add(
IntWidget,
StringWidget,
'image_desktop_size',
title=_('Image size on desktop'),
value=self.image_desktop_size,
validation_function=validate_image_size,
hint=_('In pixels.'),
**kwargs,
)
form.add(
IntWidget,
StringWidget,
'image_mobile_size',
title=_('Image size on mobile'),
value=self.image_mobile_size,
validation_function=validate_image_size,
hint=_('In pixels.'),
**kwargs,
)

View File

@ -15,16 +15,15 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import xml.etree.ElementTree as ET
from django.utils.encoding import force_str
from quixote import get_publisher
from quixote.html import TemplateIO, htmltext
from wcs.conditions import Condition
from wcs.qommon import _
from wcs.qommon.form import CompositeWidget, ConditionWidget, StringWidget, VarnameWidget, WidgetListAsTable
from wcs.qommon.misc import get_dependencies_from_template, xml_node_text
from wcs.qommon.misc import get_dependencies_from_template
from wcs.qommon.xml_storage import PostConditionsXmlMixin
from .base import Field, register_field_class
@ -138,55 +137,13 @@ class PageCondition(Condition):
return data
class PageField(Field):
class PageField(Field, PostConditionsXmlMixin):
key = 'page'
description = _('Page')
is_no_data_field = True
post_conditions = None
def post_conditions_init_with_xml(self, node, include_id=False, snapshot=False):
if node is None:
return
self.post_conditions = []
for post_condition_node in node.findall('post_condition'):
if post_condition_node.findall('condition/type'):
condition = {
'type': xml_node_text(post_condition_node.find('condition/type')),
'value': xml_node_text(post_condition_node.find('condition/value')),
}
elif post_condition_node.find('condition').text:
condition = {
'type': 'python',
'value': xml_node_text(post_condition_node.find('condition')),
}
else:
continue
self.post_conditions.append(
{
'condition': condition,
'error_message': xml_node_text(post_condition_node.find('error_message')),
}
)
def post_conditions_export_to_xml(self, node, include_id=False):
if not self.post_conditions:
return
conditions_node = ET.SubElement(node, 'post_conditions')
for post_condition in self.post_conditions:
post_condition_node = ET.SubElement(conditions_node, 'post_condition')
condition_node = ET.SubElement(post_condition_node, 'condition')
ET.SubElement(condition_node, 'type').text = force_str(
(post_condition['condition'] or {}).get('type') or ''
)
ET.SubElement(condition_node, 'value').text = force_str(
(post_condition['condition'] or {}).get('value') or ''
)
ET.SubElement(post_condition_node, 'error_message').text = force_str(
post_condition['error_message'] or ''
)
def fill_admin_form(self, form):
form.add(StringWidget, 'label', title=_('Label'), value=self.label, required=True, size=50)
form.add(

View File

@ -429,7 +429,7 @@ class FormData(StorableObject):
elif isinstance(part, WorkflowFormEvolutionPart):
for field_data in (part.data or {}).values():
yield from check_field_data(field_data)
elif isinstance(part, ContentSnapshotPart):
elif isinstance(part, ContentSnapshotPart) and with_history:
# look into old and new values (belt and suspenders)
for field_data in list((part.old_data or {}).values()) + list((part.new_data or {}).values()):
yield from check_field_data(field_data)
@ -1547,6 +1547,7 @@ class FormData(StorableObject):
include_unnamed_fields=False,
include_workflow=True,
include_workflow_data=True,
include_actions=True,
values_at=None,
):
# noqa pylint: disable=too-many-arguments
@ -1647,6 +1648,32 @@ class FormData(StorableObject):
data['workflow'] = {}
data['workflow']['data'] = self.workflow_data
# include actions
if include_actions:
actions = {}
data['actions'] = actions
for trigger in self.formdef.workflow.get_all_global_action_triggers():
if (
trigger.key == 'webservice'
and trigger.identifier
and trigger.check_executable(self, user)
):
actions[
f'global-action:{trigger.identifier}'
] = f'{self.get_api_url()}hooks/{trigger.identifier}/'
status = self.get_status()
if status:
for item in self.get_status().items:
if (
item.key == 'jump'
and item.trigger
and item.check_auth(self, user)
and item.check_condition(self, trigger=item.trigger)
):
actions[f'jump:{item.trigger}'] = f'{self.get_api_url()}jump/trigger/{item.trigger}/'
if include_roles:
# add a roles dictionary, with workflow functions and two special
# entries for concerned/actions roles.
@ -1720,6 +1747,7 @@ class FormData(StorableObject):
def export_to_json(
self,
anonymise=False,
user=None,
include_evolution=True,
include_files=True,
include_roles=True,
@ -1728,11 +1756,13 @@ class FormData(StorableObject):
include_unnamed_fields=False,
include_workflow=True,
include_workflow_data=True,
include_actions=True,
values_at=None,
):
# noqa pylint: disable=too-many-arguments
data = self.get_json_export_dict(
anonymise=anonymise,
user=user,
include_evolution=include_evolution,
include_files=include_files,
include_roles=include_roles,
@ -1741,6 +1771,7 @@ class FormData(StorableObject):
include_unnamed_fields=include_unnamed_fields,
include_workflow=include_workflow,
include_workflow_data=include_workflow_data,
include_actions=include_actions,
values_at=values_at,
)
return json.dumps(data, cls=misc.JSONEncoder)
@ -1754,10 +1785,20 @@ class FormData(StorableObject):
for field in self.formdef.fields:
field.feed_session(self.data.get(field.id), self.data.get('%s_display' % field.id))
def get_summary_field_details(self, fields=None, include_unset_required_fields=False):
def get_summary_field_details(
self,
fields=None,
include_unset_required_fields=False,
data=None,
parent_field=None,
parent_field_index=None,
):
if fields is None:
fields = self.formdef.fields
if data is None:
data = self.data
on_page = False
current_page_fields = []
pages = []
@ -1819,10 +1860,14 @@ class FormData(StorableObject):
if not f.include_in_summary_page:
continue
value, value_details = f.get_value_info(self.data)
value, value_details = f.get_value_info(data)
if value is None and not (f.required and include_unset_required_fields):
continue
if parent_field:
value_details['parent_field'] = parent_field
value_details['parent_field_index'] = parent_field_index
current_page_fields.append({'field': f, 'value': value, 'value_details': value_details})
has_contents_since_latest_subtitle = True
has_contents_since_latest_title = True
@ -1959,15 +2004,23 @@ class FormData(StorableObject):
)
def get_summary_display_actions(self, fields=None, form_url='', include_unset_required_fields=False):
from wcs.workflows import template_on_formdata
field_details = self.get_summary_field_details(
fields, include_unset_required_fields=include_unset_required_fields
)
yield from self.iter_summary_display_actions(
field_details,
form_url=form_url,
include_unset_required_fields=include_unset_required_fields,
)
def iter_summary_display_actions(self, field_details, form_url='', include_unset_required_fields=False):
from wcs.workflows import template_on_formdata
on_page = None
for field_value_info in field_details:
f = field_value_info['field']
parent_field = field_value_info.get('value_details', {}).get('parent_field')
parent_field_index = field_value_info.get('value_details', {}).get('parent_field_index')
if f.key == 'page':
if on_page:
yield {'action': 'close-page'}
@ -1994,6 +2047,8 @@ class FormData(StorableObject):
css_classes.append(f.extra_css_class)
css_classes = ' '.join(css_classes)
label_id = f'form-field-label-f{f.id}'
if parent_field:
label_id = f'form-field-label-f{parent_field.id}-r{parent_field_index}-s{f.id}'
yield {'action': 'open-field', 'css': css_classes}
if f.key == 'block' and f.label_display == 'subtitle':
yield {
@ -2014,6 +2069,19 @@ class FormData(StorableObject):
if value is None:
if not (f.key == 'block' and f.label_display == 'hidden'):
yield {'action': 'value', 'value': None}
elif f.key == 'block':
block_field_details = f.get_value_details(
formdata=self,
value=value_details.get('value_id'),
include_unset_required_fields=include_unset_required_fields,
)
yield {'action': 'open-block-value'}
yield from self.iter_summary_display_actions(
block_field_details,
form_url=form_url,
include_unset_required_fields=include_unset_required_fields,
)
yield {'action': 'close-block-value'}
else:
s = f.get_view_value(
value,

View File

@ -118,7 +118,7 @@ class FormDefForm(Form):
if hasattr(widget, 'field') and widget.has_error() and not getattr(widget, 'is_hidden', False):
widget_with_errors.append(widget)
if widget_with_errors:
t += htmltext('<p>')
t += htmltext('<p id="field-error-links">')
t += str(
ngettext(
'The following field has an error:',
@ -128,9 +128,13 @@ class FormDefForm(Form):
)
t += ' '
for i, widget in enumerate(widget_with_errors):
t += htmltext('<a href="#form_label_%s">%s</a>') % (widget.get_name_for_id(), widget.title)
t += htmltext('<a data-field-name="%s" href="#form_label_%s">%s</a>') % (
widget.get_name_for_id(),
widget.get_name_for_id(),
widget.title,
)
if i < len(widget_with_errors) - 1:
t += str(_(', '))
t += htmltext('<span class="list-comma">%s</span>') % _(', ')
t += htmltext('</p>')
return t.getvalue()
@ -177,6 +181,7 @@ class FormDef(StorableObject):
has_captcha = False
skip_from_360_view = False
management_sidebar_items = {'__default__'}
submission_sidebar_items = {'__default__'}
include_download_all_button = False
appearance_keywords = None
digest_templates = None
@ -334,6 +339,34 @@ class FormDef(StorableObject):
return self.get_default_management_sidebar_items()
return self.management_sidebar_items or []
def get_default_submission_sidebar_items(self):
return {
'general',
'submission-context',
'user',
'custom-template',
}
def get_submission_sidebar_available_items(self):
return [
('general', _('General Information')),
('submission-context', _('Submission context')),
('user', _('Associated User')),
('custom-template', _('Custom template')),
]
def submission_sidebar_items_labels(self):
# return ordered labels
submission_sidebar_items = self.get_submission_sidebar_items()
for key, label in self.get_submission_sidebar_available_items():
if key in submission_sidebar_items:
yield label
def get_submission_sidebar_items(self):
if self.submission_sidebar_items == {'__default__'}:
return self.get_default_submission_sidebar_items()
return self.submission_sidebar_items or []
@property
def data_class_name(self):
return '_wcs_%s' % self.url_name.title()

View File

@ -245,6 +245,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
include_unnamed_fields=False,
include_workflow=get_query_flag('include-workflow', default=True),
include_workflow_data=get_query_flag('include-workflow-data', default=True),
include_actions=get_query_flag('include-actions', default=False),
values_at=values_at,
)
@ -479,12 +480,15 @@ class FormStatusPage(Directory, FormTemplateMixin):
include_unnamed_fields=False,
include_workflow=True,
include_workflow_data=True,
include_actions=True,
values_at=None,
):
# noqa pylint: disable=too-many-arguments
get_response().set_content_type('application/json')
user = get_user_from_api_query_string() or get_request().user
return self.filled.export_to_json(
anonymise=anonymise,
user=user,
include_evolution=include_evolution,
include_files=include_files,
include_roles=include_roles,
@ -493,6 +497,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
include_unnamed_fields=include_unnamed_fields,
include_workflow=include_workflow,
include_workflow_data=include_workflow_data,
include_actions=include_actions,
values_at=values_at,
)
@ -581,6 +586,10 @@ class FormStatusPage(Directory, FormTemplateMixin):
r += htmltext('<div class="page">')
r += htmltext('<h3>%s</h3>') % field_action['value']
r += htmltext('<div>')
elif field_action['action'] == 'open-block-value':
r += htmltext('<div class="value value--block">')
elif field_action['action'] == 'close-block-value':
r += htmltext('</div>')
elif field_action['action'] == 'title':
r += htmltext('<div class="title %s"><h3>%s</h3></div>') % (
field_action['css'],
@ -784,7 +793,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
if get_request().form and get_request().form.get('hash'):
# look in all known formdata files for file with given hash
file_digest = get_request().form.get('hash')
for field_data in self.filled.get_all_file_data():
for field_data in self.filled.get_all_file_data(with_history=True):
if not hasattr(field_data, 'file_digest'):
continue
if field_data.file_digest() == file_digest:
@ -916,11 +925,13 @@ class FormStatusPage(Directory, FormTemplateMixin):
yield (widget.field, block_row, field_widget.field, field_widget)
block_row += 1
# get dictionary with blocks data, from workflow form, or defaults to formdata
blocks_formdata_data = getattr(form, 'blocks_formdata_data', formdata.data)
for block, block_row, field, widget in get_all_field_widgets(form):
t0 = time.time()
if block:
try:
block_data = formdata.data.get(block.id)['data'][block_row]
block_data = blocks_formdata_data.get(block.id)['data'][block_row]
except (IndexError, TypeError):
block_data = {}
@ -992,7 +1003,8 @@ class FormStatusPage(Directory, FormTemplateMixin):
entry['locked'] = locked
request.add_timing_mark(
f'field-block-{block.id}-row-{block_row}' if block else f'field-{field.id}', relative_start=t0
f'field-block-{block.id}--{field.id}-row-{block_row}' if block else f'field-{field.id}',
relative_start=t0,
)
return json.dumps({'result': result})
@ -1015,7 +1027,8 @@ class FormStatusPage(Directory, FormTemplateMixin):
if form is None:
return result_error('no more form')
self.filled.evaluate_live_workflow_form(user, form)
with get_publisher().keep_all_block_rows():
self.filled.evaluate_live_workflow_form(user, form)
get_publisher().substitutions.unfeed(lambda x: x is self.filled)
get_publisher().substitutions.feed(self.filled)
# reevaluate workflow form according to possible new content

View File

@ -38,6 +38,7 @@ from quixote.util import randbytes
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.clamd import add_clamd_scan_job
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
from wcs.formdata import Evolution, FormData
from wcs.formdef import FormDef
@ -1971,6 +1972,9 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
url = filled.get_url(backoffice=True)
else:
url = filled.get_url(language=get_publisher().current_language)
add_clamd_scan_job(filled)
return redirect(url)
def cancelled(self):

View File

@ -19,8 +19,7 @@ import json
from quixote import get_request, get_response
from quixote.directory import Directory
from wcs.api import get_user_from_api_query_string, is_url_signed
from wcs.roles import logged_users_role
from wcs.api import get_user_from_api_query_string
from wcs.wf.jump import WorkflowTriggeredEvolutionPart
from wcs.workflows import WorkflowGlobalActionWebserviceTrigger, perform_items, push_perform_workflow
@ -42,19 +41,8 @@ class HookDirectory(Directory):
raise errors.AccessForbiddenError('must be POST')
user = get_user_from_api_query_string() or get_request().user
if self.trigger.roles:
for role in self.trigger.roles:
if role == logged_users_role().id and (user or is_url_signed()):
break
if role == '_submitter' and self.formdata.is_submitter(user):
break
if not user:
continue
if self.formdata.get_function_roles(role).intersection(user.get_roles()):
break
else:
if not ('_signed_calls' in self.trigger.roles and is_url_signed()):
raise errors.AccessForbiddenError('insufficient roles')
if not self.trigger.check_executable(self.formdata, user):
raise errors.AccessForbiddenError('insufficient roles')
workflow_data = get_request().json if hasattr(get_request(), '_json') else None
self.formdata.evolution[-1].add_part(

View File

@ -4,8 +4,8 @@ msgid ""
msgstr ""
"Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-04-15 17:07+0200\n"
"PO-Revision-Date: 2024-04-15 17:06+0200\n"
"POT-Creation-Date: 2024-04-26 14:42+0200\n"
"PO-Revision-Date: 2024-04-26 14:42+0200\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n"
"Language: fr\n"
@ -334,6 +334,10 @@ msgstr "Gabarit du résumé"
msgid "Use block_var_... to refer to fields."
msgstr "Utilisez block_var_… pour faire référence aux champs."
#: admin/blocks.py
msgid "Validation conditions"
msgstr "Conditions de validation"
#: admin/blocks.py admin/forms.py admin/workflows.py
msgid "This identifier is already used."
msgstr "Cet identifiant est déjà utilisé."
@ -1164,6 +1168,10 @@ msgstr "Activer"
msgid "Select a category for this form"
msgstr "Sélectionner une catégorie pour ce formulaire"
#: admin/forms.py wf/create_formdata.py
msgid "Backoffice submission"
msgstr "Saisie backoffice"
#: admin/forms.py
msgid "Include confirmation page"
msgstr "Inclure une page de confirmation"
@ -2881,7 +2889,7 @@ msgstr "Marqué comme supprimé le %(date)s."
msgid "Profile"
msgstr "Profil"
#: admin/users.py fields/block.py forms/common.py wf/export_to_model.py
#: admin/users.py forms/common.py wf/export_to_model.py
msgid "Not set"
msgstr "Non renseigné"
@ -3934,6 +3942,12 @@ msgstr "Modification aux paramètres globaux"
msgid "Select a category for this card model"
msgstr "Sélectionner une catégorie pour ce modèle de fiche"
#: backoffice/cards.py backoffice/root.py backoffice/submission.py
#: templates/wcs/backoffice/submission.html
#: templates/wcs/backoffice/test_edit_sidebar.html
msgid "Submission"
msgstr "Saisie"
#: backoffice/cards.py
msgid "Identifier cannot be modified if there are existing cards."
msgstr "Lidentifiant ne peut pas être modifié car il existe des fiches."
@ -5163,12 +5177,6 @@ msgstr "Page suivante"
msgid "Per page: "
msgstr "Par page : "
#: backoffice/root.py backoffice/submission.py
#: templates/wcs/backoffice/submission.html
#: templates/wcs/backoffice/test_edit_sidebar.html
msgid "Submission"
msgstr "Saisie"
#: backoffice/root.py backoffice/studio.py templates/wcs/backoffice/studio.html
msgid "Studio"
msgstr "Studio"
@ -5211,6 +5219,11 @@ msgstr "Workflows"
msgid "Compare"
msgstr "Comparer"
#: backoffice/snapshots.py
#, python-format
msgid "Can not display snapshot (%s)"
msgstr "Impossible dafficher la sauvegarde (%s)"
#: backoffice/snapshots.py
#, python-format
msgid "Version %s"
@ -5241,11 +5254,6 @@ msgstr "Restaurer"
msgid "Restore snapshot"
msgstr "Restaurer une sauvegarde"
#: backoffice/snapshots.py
#, python-format
msgid "Can not display snapshot (%s)"
msgstr "Impossible dafficher la sauvegarde (%s)"
#: backoffice/snapshots.py
#, python-format
msgid "Can not inspect snapshot (%s)"
@ -5446,7 +5454,7 @@ msgid "%s (filtered on user)"
msgstr "%s (filtrage sur lusager)"
#: categories.py qommon/form.py qommon/publisher.py
#: templates/wcs/backoffice/logged-error.html wf/form.py
#: templates/wcs/backoffice/logged-error.html wf/form.py wf/jump.py
msgid "General"
msgstr "Général"
@ -5612,6 +5620,11 @@ msgstr "Verrouillé"
msgid "invalid expression: %s"
msgstr "expression invalide : %s"
#: fields/base.py
#, python-format
msgid "%(form)s, field: \"%(field)s\""
msgstr "%(form)s, champ « %(field)s »"
#: fields/base.py
msgid "Validation Page"
msgstr "Page de récapitulatif"
@ -5873,6 +5886,10 @@ msgstr "fichier.bin"
msgid "unknown card value (%r)"
msgstr "valeur de fiche inconnue (%r)"
#: fields/item.py
msgid "Wrong format"
msgstr "Format invalide"
#: fields/item.py
msgid "Image size on desktop"
msgstr "Taille des images sur ordinateur"
@ -5980,6 +5997,10 @@ msgstr "Position initiale"
msgid "Default position (from markers)"
msgstr "Position par défaut (selon les marqueurs)"
#: fields/item.py fields/map.py
msgid "Device geolocation"
msgstr "Selon la géolocalisation"
#: fields/item.py fields/map.py
msgid "From template"
msgstr "Selon un gabarit"
@ -6089,10 +6110,6 @@ msgstr "Position par défaut"
msgid "Specific point"
msgstr "Point spécifique"
#: fields/map.py
msgid "Device geolocation"
msgstr "Selon la géolocalisation"
#: fields/map.py
#, python-format
msgid "invalid coordinates %r (missing ;) (field id: %s)"
@ -9596,7 +9613,7 @@ msgstr "Tout statut final"
msgid "Time between %(start_status)s and %(end_status)s"
msgstr "Durée entre %(start_status)s et %(end_status)s"
#: statistics/views.py testdef.py workflows.py
#: statistics/views.py testdef.py
#, python-format
msgid "\"%s\""
msgstr "« %s »"
@ -11031,11 +11048,6 @@ msgstr "Nom complet de lutilisateur connecté"
msgid "Session User Email"
msgstr "Courriel de lutilisateur connecté"
#: utils.py
#, python-format
msgid "%(form)s, field: \"%(field)s\""
msgstr "%(form)s, champ « %(field)s »"
#: utils.py
#, python-format
msgid "%s is taking too long"
@ -11519,10 +11531,6 @@ msgstr "À partir dune page"
msgid "Page Identifier"
msgstr "Identifiant de page"
#: wf/create_formdata.py
msgid "Backoffice submission"
msgstr "Saisie backoffice"
#: wf/create_formdata.py
msgid "Keep Current User"
msgstr "Garder lutilisateur actuel"
@ -12073,6 +12081,22 @@ msgstr "cassé"
msgid "to %(name)s, %(jump_type_label)s"
msgstr "vers %(name)s, %(jump_type_label)s"
#: wf/jump.py
msgid "Execution mode"
msgstr "Mode dexécution"
#: wf/jump.py
msgid "Immediate"
msgstr "Immédiate"
#: wf/jump.py
msgid "After timeout delay"
msgstr "Après expiration dun délai"
#: wf/jump.py
msgid "After call to webservice trigger"
msgstr "Lors dun appel webservice"
#: wf/jump.py
msgid "Identifier for webservice"
msgstr "Identifiant dappel webservice"
@ -13133,10 +13157,22 @@ msgstr "Référence à un rôle inconnu"
msgid "Unknown roles"
msgstr "Rôles inconnus"
#: workflows.py
msgid "from final status"
msgstr "depuis un statut final"
#: workflows.py
msgid "from pause status"
msgstr "depuis un statut de pause"
#: workflows.py
msgid "from transition status"
msgstr "depuis un statut de transition"
#: workflows.py
#, python-format
msgid "from status %s"
msgstr "depuis le statut %s"
msgid "from status \"%s\""
msgstr "depuis le statut « %s »"
#: workflows.py
msgid " or "
@ -13150,6 +13186,18 @@ msgstr "non assigné"
msgid "Allow as mass action"
msgstr "Proposer en action de masse"
#: workflows.py
msgid "Pause status"
msgstr "Statut de pause"
#: workflows.py
msgid "Final status"
msgstr "Statut final"
#: workflows.py
msgid "Transition status"
msgstr "Statut de transition"
#: workflows.py
msgid "Only display to following statuses"
msgstr "Afficher uniquement pour les statuts suivants"

View File

@ -75,7 +75,7 @@ class CronJob:
log_dir = os.path.join(base_dir, 'cron-logs', now.strftime('%Y'))
os.makedirs(log_dir, exist_ok=True)
with open(os.path.join(log_dir, 'cron.log-%s' % now.strftime('%Y%m%d')), 'a+') as fd:
fd.write('%s [%s] %s\n' % (now.isoformat(), os.getpid(), message))
fd.write('%s [%s] %s\n' % (now.isoformat(), os.getpid(), message.replace('\n', ' ')))
def log_debug(self, message, in_tenant=True):
if get_publisher().get_site_option('cron-log-level') != 'debug':
@ -83,6 +83,10 @@ class CronJob:
memory = psutil.Process().memory_info().rss / (1024 * 1024)
self.log(f'(mem: {memory:.1f}M) {message}', in_tenant=in_tenant)
@classmethod
def log_sql(cls, message, in_tenant=True):
cls.log(f'SQL: {message}', in_tenant=in_tenant)
def is_time(self, timetuple):
minutes = self.minutes
if minutes:
@ -127,6 +131,13 @@ def cron_worker(publisher, since, job_name=None):
if jobs:
CronJob.log('running jobs: %r' % sorted([x.name or x for x in jobs]))
import wcs.sql
wcs.sql.LoggingCursor.queries_count = 0
if get_publisher().get_site_option('cron-log-level') == 'debug':
wcs.sql.LoggingCursor.queries_log_function = CronJob.log_sql
process_start = time.process_time()
memory_start = psutil.Process().memory_info().rss / (1024 * 1024)
for job in jobs:
publisher.current_cron_job = job
publisher.install_lang()
@ -139,3 +150,15 @@ def cron_worker(publisher, since, job_name=None):
except Exception as e:
job.log(f'exception running job {job.name}: {e}')
publisher.capture_exception(sys.exc_info())
if jobs:
wcs.sql.LoggingCursor.queries_log_function = None
process_end = time.process_time()
memory_end = psutil.Process().memory_info().rss / (1024 * 1024)
CronJob.log(
'resource usage summary: CPU time: %.2fs / Memory: %.2fM / SQL queries: %s'
% (
process_end - process_start,
memory_end - memory_start,
wcs.sql.LoggingCursor.queries_count,
)
)

View File

@ -1,11 +1,13 @@
.CheckboxesWithImagesWidget, .RadiobuttonsWithImagesWidget {
.content {
--image-size: var(--image-mobile-size);
--image-width: var(--image-mobile-width);
--image-height: var(--image-mobile-height);
display: flex;
flex-wrap: wrap;
margin-top: 5px;
@media (min-width: 761px) {
--image-size: var(--image-desktop-size);
--image-width: var(--image-desktop-width);
--image-height: var(--image-desktop-height);
}
&:focus-within {
outline: 1px dashed var(--primary-color, #bbb);
@ -34,10 +36,10 @@
.item-with-image {
box-sizing: content-box;
padding: 10px;
flex: 0 0 var(--image-size);
flex: 0 0 var(--image-width);
display: grid;
grid-template-columns: auto 1fr;
grid-template-rows: max-content auto;
grid-template-rows: var(--image-height) auto;
grid-row-gap: 5px;
grid-template-areas:
"picture picture"
@ -47,7 +49,8 @@
&--picture {
margin-bottom: 5px;
grid-area: picture;
width: var(--image-size);
width: var(--image-width);
height: var(--image-height);
object-fit: contain;
object-position: bottom;
}

View File

@ -1048,6 +1048,21 @@ const LiveValidation = (function(){
field.setAttribute("aria-describedby", this.errorEl.id)
this.widget.classList.remove(this.errorClass)
this.hasError = false
var base_field_widget_id = null
var current_widget = this.widget
// for fields in blocks, a single error is displayed on top, using the block name,
// look for it and remove it as soon as the user is correcting the form
// (even if there are still some errors in other subfields)
while (current_widget.nodeName != 'FORM') {
if (current_widget.dataset.widgetNameForId) base_field_widget_id = current_widget.dataset.widgetNameForId
current_widget = current_widget.parentNode
}
var comma = document.querySelector(`#field-error-links [data-field-name="${base_field_widget_id}"] + span.list-comma`)
if (comma) comma.remove()
document.querySelector(`#field-error-links [data-field-name="${base_field_widget_id}"]`).remove()
if (! document.querySelector('#field-error-links a')) {
document.querySelector('#field-error-links').remove()
}
}
init() {

View File

@ -64,6 +64,7 @@ $(function() {
}
});
}
window.prepare_confirmation_buttons = prepare_confirmation_buttons;
function prepare_select_empty_label() {
$('[data-first-element-empty-label]').off('change').on('change', function() {

View File

@ -233,6 +233,7 @@ $(document).on('backoffice-filter-change', function(event, listing_settings) {
prepare_page_links();
prepare_row_links();
prepare_column_headers();
window.prepare_confirmation_buttons();
$('a[data-base-href]').each(function(idx, elem) {
$(elem).attr('href', $(elem).data('base-href') + '?' + listing_settings.qs);
});

View File

@ -6,6 +6,7 @@
{% endblock %}
{% block widget-content %}
{% if not widget.readonly and widget.error and not widget.is_required_error %}<div class="error"><p>{{ widget.error }}</p></div>{% endif %}
{% for subwidget in widget.get_widgets %}
{% if widget.readonly and not subwidget.field.include_in_validation_page %}<div style="display: none">{% endif %}
{{ subwidget.render|safe }}

View File

@ -1,8 +1,7 @@
{% extends "qommon/forms/widgets/checkboxes.html" %}
{% block widget-style-attributes %}
--image-desktop-size: {{ widget.field.image_desktop_size }}px;--image-mobile-size: {{ widget.field.image_mobile_size }}px;
{{ block.super }}
{% if widget.field.image_desktop_size %}--image-desktop-width: {{ widget.field.image_desktop_size|split:'x'|first }}px;--image-desktop-height: {{ widget.field.image_desktop_size|split:'x'|last }}px;{% endif %}{% if widget.field.image_mobile_size %}--image-mobile-width: {{ widget.field.image_mobile_size|split:'x'|first }}px;--image-mobile-height: {{ widget.field.image_mobile_size|split:'x'|last }}px;{% endif %}{{ block.super }}
{% endblock %}
{% block widget-control %}

View File

@ -1,8 +1,7 @@
{% extends "qommon/forms/widgets/radiobuttons.html" %}
{% block widget-style-attributes %}
--image-desktop-size: {{ widget.field.image_desktop_size }}px;--image-mobile-size: {{ widget.field.image_mobile_size }}px;
{{ block.super }}
{% if widget.field.image_desktop_size %}--image-desktop-width: {{ widget.field.image_desktop_size|split:'x'|first }}px;--image-desktop-height: {{ widget.field.image_desktop_size|split:'x'|last }}px;{% endif %}{% if widget.field.image_mobile_size %}--image-mobile-width: {{ widget.field.image_mobile_size|split:'x'|first }}px;--image-mobile-height: {{ widget.field.image_mobile_size|split:'x'|last }}px;{% endif %}{{ block.super }}
{% endblock %}
{% block widget-control %}

View File

@ -24,13 +24,16 @@ from django.utils.module_loading import import_string
from quixote import get_publisher
from quixote.http_request import Upload
from wcs.clamd import PickableClamD
from .errors import ConnectionError
from .misc import Image, can_thumbnail, file_digest
from .storage import atomic_write
class PicklableUpload(Upload):
class PicklableUpload(Upload, PickableClamD):
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if 'fp' in odict:
del odict['fp']

View File

@ -177,3 +177,47 @@ class XmlStorableObject(StorableObject):
sub.attrib['role-id'] = role.id # always include id
sub.attrib['role-slug'] = role.slug
sub.text = role.name
class PostConditionsXmlMixin:
def post_conditions_init_with_xml(self, node, include_id=False, snapshot=False):
if node is None:
return
self.post_conditions = []
for post_condition_node in node.findall('post_condition'):
if post_condition_node.findall('condition/type'):
condition = {
'type': xml_node_text(post_condition_node.find('condition/type')),
'value': xml_node_text(post_condition_node.find('condition/value')),
}
elif post_condition_node.find('condition').text:
condition = {
'type': 'python',
'value': xml_node_text(post_condition_node.find('condition')),
}
else:
continue
self.post_conditions.append(
{
'condition': condition,
'error_message': xml_node_text(post_condition_node.find('error_message')),
}
)
def post_conditions_export_to_xml(self, node, include_id=False):
if not self.post_conditions:
return
conditions_node = ET.SubElement(node, 'post_conditions')
for post_condition in self.post_conditions:
post_condition_node = ET.SubElement(conditions_node, 'post_condition')
condition_node = ET.SubElement(post_condition_node, 'condition')
ET.SubElement(condition_node, 'type').text = str(
(post_condition['condition'] or {}).get('type') or ''
)
ET.SubElement(condition_node, 'value').text = str(
(post_condition['condition'] or {}).get('value') or ''
)
ET.SubElement(post_condition_node, 'error_message').text = str(
post_condition['error_message'] or ''
)

View File

@ -96,9 +96,25 @@ SQL_TYPE_MAPPING = {
}
class LoggingCursor(psycopg2.extensions.cursor):
# keep track of (number of) queries, for tests and cron logging and usage summary.
queries = None
queries_count = 0
queries_log_function = None
def execute(self, query, vars=None):
LoggingCursor.queries_count += 1
if self.queries_log_function:
self.queries_log_function(query)
if self.queries is not None:
self.queries.append(query)
return super().execute(query, vars)
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cursor_factory = LoggingCursor
self._wcs_in_transaction = False
self._wcs_savepoints = []

View File

@ -44,6 +44,7 @@
<ul class="biglist optionslist">
{{ options.templates|safe }}
{{ options.user_support|safe }}
{% if formdef.backoffice_submission_roles %}{{ options.backoffice_submission_options|safe }}{% endif %}
{{ options.management|safe }}
</ul>
</div>

View File

@ -89,13 +89,13 @@
</div>
{% endif %}
{% with formdefs=view.usage_in_formdefs %}
{% if formdefs %}
<div class="section">
{% with fields=view.usage_in_formdefs %}
{% if fields %}
<div class="section usage-in-forms">
<h3>{% trans "Usage in forms" %}</h3>
<ul class="objects-list single-links">
{% for formdef in formdefs %}
<li><a href="{{ formdef.get_admin_url }}">{{ formdef.name }}</a></li>
{% for field in fields %}
<li><a href="{{ field.get_admin_url }}">{{ field.get_admin_url_label }}</a></li>
{% endfor %}
</ul>
</div>

View File

@ -70,6 +70,7 @@
{{ options.confirmation|safe }}
{{ options.only_allow_one|safe }}
{% if formdef.roles %}{{ options.always_advertise|safe }}{% endif %}
{% if formdef.backoffice_submission_roles %}{{ options.backoffice_submission_options|safe }}{% endif %}
{{ options.management|safe }}
{{ options.tracking_code|safe }}
{% if has_captcha_option %}{{ options.captcha|safe }}{% endif %}

View File

@ -84,11 +84,9 @@ def grep_strings(string, hit_function):
break
for field in formdef.fields or []:
field._formdef = formdef
url = formdef.get_field_admin_url(field)
source_name = _('%(form)s, field: "%(field)s"') % {
'form': formdef.name,
'field': field.ellipsized_label,
}
source_name = field.get_admin_url_label()
for attr in field.get_admin_attributes():
if check_string(getattr(field, attr, None), source_url=url, source_name=source_name):
break

View File

@ -664,7 +664,7 @@ class LazyFormDefObjectsManager:
if attribute == 'formdef':
warnings.warn('Deprecated access to formdef', DeprecationWarning)
return self._formdef
raise AttributeError('No such attribute %r' % attribute)
raise AttributeError(attribute)
def __len__(self):
if self._cached_resultset is not None:
@ -1903,7 +1903,11 @@ class LazyFieldVarBlock(LazyFieldVar):
if field.varname == key:
break
else:
raise AttributeError('No such attribute %r' % key)
try:
value = [CompatibilityNamesDict({'X': x})[f'X_{key}'] for x in self]
except KeyError:
raise AttributeError(str(key))
return value
return [data.get(field.id) for data in self._formdata.data.get(self._field.id)['data']]
def getlistdict(self, keys):

View File

@ -18,8 +18,9 @@ import os
import urllib.parse
import xml.etree.ElementTree as ET
from quixote import get_request, redirect
from quixote import get_publisher, get_request, get_response, redirect
from wcs.clamd import add_clamd_scan_job
from wcs.forms.common import FileDirectory, FormStatusPage
from wcs.portfolio import has_portfolio, push_document
from wcs.workflows import AttachmentEvolutionPart, WorkflowStatusItem, register_item_class
@ -171,6 +172,7 @@ class AddAttachmentWorkflowStatusItem(WorkflowStatusItem):
evo_part = AttachmentEvolutionPart.from_upload(f, varname=self.varname)
evo_part.display_in_history = self.attach_to_history
evo.add_part(evo_part)
add_clamd_scan_job(formdata)
def get_parameters(self):
parameters = (

View File

@ -20,6 +20,7 @@ from quixote import get_publisher
from quixote.html import TemplateIO, htmltext
from wcs.admin.fields import FieldDefPage, FieldsDirectory
from wcs.clamd import add_clamd_scan_job
from wcs.fields import SetValueError
from wcs.formdata import get_dict_with_varnames
from wcs.formdef import FormDef
@ -327,6 +328,16 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
self.formdef.set_live_condition_sources(form, self.formdef.fields)
if (
formdata.evolution
and formdata.evolution[-1].parts
and isinstance(formdata.evolution[-1].parts[-1], WorkflowFormEvolutionPart)
and formdata.evolution[-1].parts[-1].live
):
# attach live evaluated data to form object, to be used in live_process_fields
# for block conditions.
form.blocks_formdata_data = formdata.evolution[-1].parts[-1].data
if form.is_submitted():
# skip prefilling part when form is being submitted
return
@ -356,6 +367,7 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
formdata.evolution[-1].add_part(
WorkflowFormEvolutionPart(self, formdef_data, live=bool(not submit))
)
form.formdata_data = formdef_data
def submit_form(self, form, formdata, user, evo):
if not self.formdef:
@ -373,6 +385,8 @@ class FormWorkflowStatusItem(WorkflowStatusItem):
if button and not getattr(button, 'ignore_form_errors', False):
self.evaluate_live_form(form, formdata, user, submit=True)
formdata.store()
add_clamd_scan_job(formdata)
get_publisher().substitutions.unfeed(lambda x: x.__class__.__name__ == 'ConditionVars')
def get_parameters_view(self):

View File

@ -38,7 +38,13 @@ from wcs.workflows import (
from ..qommon import _, errors, force_str, misc
from ..qommon.cron import CronJob
from ..qommon.form import ComputedExpressionWidget, SingleSelectWidget, StringWidget, WidgetList
from ..qommon.form import (
ComputedExpressionWidget,
RadiobuttonsWidget,
SingleSelectWidget,
StringWidget,
WidgetList,
)
from ..qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from ..qommon.publisher import get_publisher_class
from ..qommon.template import Template
@ -94,9 +100,7 @@ class TriggerDirectory(Directory):
for item in self.wfstatus.items:
if not isinstance(item, JumpWorkflowStatusItem):
continue
if not hasattr(item, 'trigger'):
continue
if component == item.trigger:
if item.mode == 'trigger' and item.trigger == component:
if not item.get_target_status():
raise errors.PublishError('broken jump / missing target')
if not get_request().get_method() == 'POST':
@ -152,6 +156,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
key = 'jump'
by = []
mode = None
condition = None
trigger = None
timeout = None
@ -160,6 +165,18 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
directory_name = 'jump'
directory_class = JumpDirectory
def migrate(self):
changed = super().migrate()
if not self.mode: # 2024-03-29
if self.trigger:
self.mode = 'trigger'
elif self.timeout:
self.mode = 'timeout'
else:
self.mode = 'immediate'
changed = True
return changed
def timeout_init_with_xml(self, elem, include_id=False, snapshot=False):
if elem is None or elem.text is None:
self.timeout = None
@ -217,12 +234,38 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
def get_parameters(self):
if hasattr(self, 'parent') and isinstance(self.parent, WorkflowGlobalAction):
return ('status', 'condition', 'set_marker_on_status')
return ('status', 'condition', 'trigger', 'by', 'timeout', 'set_marker_on_status')
return ('status', 'condition', 'mode', 'trigger', 'by', 'timeout', 'set_marker_on_status')
def get_inspect_parameters(self):
parameters = list(self.get_parameters())
if self.mode != 'trigger' and 'trigger' in parameters:
parameters.remove('trigger')
if self.mode != 'trigger' and 'by' in parameters:
parameters.remove('by')
if self.mode != 'timeout' and 'timeout' in parameters:
parameters.remove('timeout')
return parameters
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix, formdef, **kwargs)
if 'condition' in parameters:
form.get_widget('%scondition' % prefix).advanced = False
form.get_widget('%scondition' % prefix).tab = ('general', _('General'))
if 'mode' in parameters:
form.add(
RadiobuttonsWidget,
'%smode' % prefix,
title=_('Execution mode'),
options=[
('immediate', _('Immediate'), 'immediate'),
('timeout', _('After timeout delay'), 'timeout'),
('trigger', _('After call to webservice trigger'), 'trigger'),
],
value=self.mode if self.mode else 'immediate',
default_value='immediate',
attrs={'data-dynamic-display-parent': 'true'},
extra_css_class='widget-inline-radio',
)
if 'trigger' in parameters:
form.add(
StringWidget,
@ -234,6 +277,10 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
),
value=self.trigger,
size=40,
attrs={
'data-dynamic-display-child-of': '%smode' % prefix,
'data-dynamic-display-value': 'trigger',
},
)
if 'by' in parameters:
if get_publisher().has_site_option('workflow-functions-only'):
@ -251,6 +298,10 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False),
},
attrs={
'data-dynamic-display-child-of': '%smode' % prefix,
'data-dynamic-display-value': 'trigger',
},
)
if 'timeout' in parameters:
_hint = htmltext(
@ -267,6 +318,10 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
title=_('Timeout'),
value=self.timeout,
hint=_hint,
attrs={
'data-dynamic-display-child-of': '%smode' % prefix,
'data-dynamic-display-value': 'timeout',
},
)
else:
form.add(
@ -275,6 +330,10 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
title=_('Timeout'),
value=seconds2humanduration(self.timeout),
hint=_hint,
attrs={
'data-dynamic-display-child-of': '%smode' % prefix,
'data-dynamic-display-value': 'timeout',
},
)
def timeout_parse(self, value):
@ -282,10 +341,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
return value
if self.get_expression(value)['type'] != 'text':
return value
try:
return humanduration2seconds(value)
except ValueError:
return None
return humanduration2seconds(value)
def get_computed_strings(self):
yield from super().get_computed_strings()
@ -302,7 +358,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
if not result:
return False
if self.timeout:
if self.mode == 'timeout' and self.timeout:
timeout_str = self.compute(self.timeout)
try:
timeout_seconds = float(timeout_str)
@ -327,7 +383,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
if diff < timeout_seconds:
return False
if self.trigger:
if self.mode == 'trigger' and self.trigger:
if trigger is None or trigger != self.trigger:
return False

View File

@ -19,6 +19,7 @@ import uuid
import freezegun
from django.utils.timezone import localtime
from pyquery import PyQuery as pq
from quixote import get_publisher, get_session
from quixote.html import htmltext
@ -504,8 +505,8 @@ class AssertEmail(WorkflowTestAction):
raise WorkflowTestError(_('No email was sent.'))
for address in self.addresses:
details = [_('Email addresses: %s') % ', '.join(email.email_msg.to)]
if address not in email.email_msg.to:
details = [_('Email addresses: %s') % ', '.join(sorted(email.email_msg.recipients()))]
if address not in email.email_msg.recipients():
raise WorkflowTestError(_('Email was not sent to address "%s".') % address, details=details)
for subject in self.subject_strings:
@ -938,7 +939,7 @@ class AssertAlert(WorkflowTestAction):
return misc.ellipsize(self.message)
def perform(self, formdata):
messages = formdata.get_workflow_messages()
messages = [pq(x).text() for x in formdata.get_workflow_messages()]
for message in messages:
if self.message in message:

View File

@ -35,6 +35,8 @@ from quixote import get_publisher, get_request, get_response, get_session
from quixote.html import TemplateIO, htmlescape, htmltext
import wcs.qommon.storage as st
from wcs.api_utils import is_url_signed
from wcs.clamd import PickableClamD
from wcs.qommon.storage import StorableObject, atomic_write
from wcs.sql_criterias import Contains, LessOrEqual, Null, StatusReachedTimeoutCriteria, StrictNotEqual
@ -364,7 +366,7 @@ class EvolutionPart:
return illegal_fts_chars.sub(' ', misc.html2text(self.view() or ''))
class AttachmentEvolutionPart(EvolutionPart):
class AttachmentEvolutionPart(EvolutionPart, PickableClamD):
orig_filename = None
base_filename = None
content_type = None
@ -423,6 +425,7 @@ class AttachmentEvolutionPart(EvolutionPart):
return open(self.get_file_path(), 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
self.init_clamd()
odict = self.__dict__.copy()
if not odict.get('fp') and 'filename' not in odict:
# we need a filename as an identifier: create one from nothing
@ -1101,7 +1104,7 @@ class Workflow(StorableObject):
if not trigger.allow_as_mass_action:
continue
roles.extend(trigger.roles or [])
statuses.extend(trigger.statuses or [])
statuses.extend(trigger.get_statuses_ids())
action.require_confirmation = trigger.require_confirmation
action.confirmation_text = trigger.confirmation_text
functions = [x for x in roles if x in self.roles]
@ -1459,6 +1462,21 @@ class Workflow(StorableObject):
root['fields'] = []
for field in self.get_backoffice_fields():
root['fields'].append(field.export_to_json(include_id=include_id))
root['actions'] = {}
for trigger in self.get_all_global_action_triggers():
if trigger.key == 'webservice' and trigger.identifier:
root['actions'][f'global-action:{trigger.identifier}'] = {
'label': f'{trigger.parent.name} ({trigger.identifier})'
}
for status in self.possible_status:
for item in status.items:
if item.key != 'jump' or not item.trigger:
continue
root['actions'][f'jump:{item.trigger}'] = {'label': f'{item.parent.name} ({item.trigger})'}
return root
@classmethod
@ -1789,12 +1807,35 @@ class WorkflowGlobalActionManualTrigger(WorkflowGlobalActionTrigger):
def get_parameters(self):
return ('roles', 'statuses', 'allow_as_mass_action', 'require_confirmation', 'confirmation_text')
def get_statuses_ids(self):
statuses = self.statuses or []
for status in self.get_workflow().possible_status or []:
if status in statuses:
yield status.id
if '_endpoint_status' in statuses and status.is_endpoint():
yield status.id
if '_waitpoint_status' in statuses and (status.is_waitpoint() and not status.is_endpoint()):
yield status.id
if '_transition_status' in statuses and not (status.is_waitpoint() or status.is_endpoint()):
yield status.id
def get_status_option_label(self, status_id):
if status_id == '_endpoint_status':
return _('from final status')
if status_id == '_waitpoint_status':
return _('from pause status')
if status_id == '_transition_status':
return _('from transition status')
try:
return _('from status "%s"') % self.get_workflow().get_status(status_id).name
except KeyError:
return None
def render_as_line(self):
parts = [_('Manual')]
if self.statuses:
labels = [x.name for x in self.get_workflow().possible_status if x.id in self.statuses]
if labels:
parts.append(_('from status %s') % _(' or ').join([_('"%s"') % x for x in labels]))
status_labels = [self.get_status_option_label(x) for x in self.statuses or []]
if status_labels:
parts.append(_(' or ').join([str(x) for x in status_labels if x]))
if self.roles:
parts.append(_('by %s') % render_list_of_roles(self.get_workflow(), self.roles))
else:
@ -1841,7 +1882,13 @@ class WorkflowGlobalActionManualTrigger(WorkflowGlobalActionTrigger):
add_element_label=workflow.get_add_role_label(),
element_kwargs={'render_br': False, 'options': options},
)
status_options = [(None, '---', None)]
status_options = [
(None, '---', ''),
('_waitpoint_status', _('Pause status'), '_wait_status'),
('_endpoint_status', _('Final status'), '_endpoint_status'),
('_transition_status', _('Transition status'), '_transition_status'),
(None, '---', ''),
]
status_options += [(str(x.id), x.name, str(x.id)) for x in self.get_workflow().possible_status]
form.add(
WidgetList,
@ -2387,6 +2434,25 @@ class WorkflowGlobalActionWebserviceTrigger(WorkflowGlobalActionManualTrigger):
def get_dependencies(self):
return []
def check_executable(self, formdata, user):
if self.roles is None:
return True
for role in self.roles: # noqa pylint: disable=not-an-iterable
if role == logged_users_role().id and (user or is_url_signed()):
return True
if role == '_submitter' and formdata.is_submitter(user):
return True
if not user:
continue
if formdata.get_function_roles(role).intersection(user.get_roles()):
return True
if '_signed_calls' in self.roles and is_url_signed():
return True
return False
class SerieOfActionsMixin:
items = None
@ -2639,7 +2705,7 @@ class WorkflowGlobalAction(SerieOfActionsMixin):
for trigger in self.triggers or []:
self.trigger = trigger # attach trigger to action, to have trigger options available in form
if trigger.key == 'manual':
if trigger.statuses and current_status_id not in trigger.statuses:
if trigger.statuses and current_status_id not in trigger.get_statuses_ids():
continue
if '_submitter' in (trigger.roles or []) and formdata.is_submitter(user):
return True