Compare commits

...

30 Commits

Author SHA1 Message Date
Pierre Ducroquet ac48ebb70d sql: test purge of search tokens (#86527)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-27 16:42:22 +01:00
Pierre Ducroquet 2962b9dd3f wcs_search_tokens: new FTS mechanism with fuzzy-match (#86527)
introduce a new mechanism to implement FTS with fuzzy-match.
This is made possible by adding and maintaining a table of the
FTS tokens, wcs_search_tokens, fed with searchable_formdefs
and wcs_all_forms.
When a query is issued, its tokens are matched against the
tokens with a fuzzy match when no direct match is found, and
the query is then rebuilt.
2024-03-27 16:42:22 +01:00
Pierre Ducroquet 03015aa750 tests: add a test for new FTS on formdefs (#86527) 2024-03-27 16:42:22 +01:00
Valentin Deniaud dc473b7378 workflow_tests: preserve response of webservice assertion on test duplication (#88729)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-27 11:23:33 +01:00
Frédéric Péters d0358afa40 tests: check update of items relations (#88687)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-26 14:53:21 +01:00
Frédéric Péters 4d5b309986 misc: use custom id in breadcrumb (#88557)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-23 12:15:31 +01:00
Lauréline Guérin e0857ce653
admin: rename overwrite button for blockdef (#88502)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-22 10:51:45 +01:00
Frédéric Péters 8c3374e790 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 19:00:39 +01:00
Frédéric Péters 03435d40a6 backoffice: warn about data loss when removing a page fields (#87505)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 18:59:35 +01:00
Frédéric Péters 70b7087ad9 backoffice: add support for default value for date workflow options (#88346) 2024-03-21 18:59:28 +01:00
Valentin Deniaud 9afbbccb13 workflow_tests: add support for global action in button click action (#88311)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 15:07:55 +01:00
Corentin Sechet 0c225cf254 misc: increase DATA_UPLOAD_MAX_NUMBER_FIELDS (#88443)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 14:33:40 +01:00
Valentin Deniaud a23457fdbf tests: really fix results count in test_block_test_results (#88458)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 13:52:29 +01:00
Frédéric Péters 9c12c01712 misc: ignore no_data fields in filter_by (#88454)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 13:46:32 +01:00
Thomas NOËL 89b4d350ab translation update
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-21 12:08:31 +01:00
Valentin Deniaud 721bdc4e44 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 12:05:59 +01:00
Thomas NOËL 955f012b3d forms: add option to control max number of drafts per user (#88237)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-21 11:41:47 +01:00
Valentin Deniaud 0ed9d5d0a0 tests: fix results count in test_block_test_results (#88445)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 11:08:35 +01:00
Valentin Deniaud 6fd4b87ff5 workflow_tests: allow testing criticality workflow action (#88108)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 10:44:06 +01:00
Valentin Deniaud d4c3e7dc4e workflow_tests: allow testing alert workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 7199e84903 workflow_tests: allow testing history message workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud e76e33808b workflow_tests: move formdata test attributes to method (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 0d82f03e59 workflow_tests: allow testing redirect_to_url workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 03669bb847 workflow_tests: allow testing anonymise workflow action (#88108) 2024-03-21 10:44:06 +01:00
Frédéric Péters c0d2d36b3c translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 09:20:33 +01:00
Frédéric Péters bdb24e21e9 workflows: give correct URL for variable fields URLs (#88435)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 08:59:53 +01:00
Frédéric Péters 3a4b8c9cc7 misc: always declare lingo_url if lingo is deployed (#88419)
gitea/wcs/pipeline/head Build queued... Details
2024-03-21 08:59:44 +01:00
Frédéric Péters 083f3cf3dd misc: check "manual address" box when a manual address has been entered (#88332)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 08:59:38 +01:00
Frédéric Péters 520e52d1a7 cards: extend "update cards" checkbox to CSV imports (#88294)
gitea/wcs/pipeline/head Build queued... Details
2024-03-21 08:59:29 +01:00
Frédéric Péters 69249df789 misc: do not allow storing AnyFormData objects (#88338)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-20 17:07:33 +01:00
42 changed files with 1542 additions and 88 deletions

View File

@ -386,6 +386,7 @@ Une API existe pour récupérer le schéma de données dun modèle de fiches.
"disabled_redirection" : null, "disabled_redirection" : null,
"discussion" : false, "discussion" : false,
"drafts_lifespan" : null, "drafts_lifespan" : null,
"drafts_max_per_user" : null,
"enable_tracking_codes" : false, "enable_tracking_codes" : false,
"expiration_date" : null, "expiration_date" : null,
"fields" : [ "fields" : [

View File

@ -642,6 +642,8 @@ def test_block_field_statistics_data_update(pub):
def test_block_test_results(pub): def test_block_test_results(pub):
create_superuser(pub) create_superuser(pub)
TestDef.wipe()
TestResult.wipe()
BlockDef.wipe() BlockDef.wipe()
block = BlockDef() block = BlockDef()
block.name = 'foobar' block.name = 'foobar'

View File

@ -23,7 +23,12 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem from wcs.wf.notification import SendNotificationWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError from wcs.workflows import (
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowImportError,
WorkflowVariablesFieldsFormDef,
)
from wcs.wscalls import NamedWsCall, NamedWsCallImportError from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -102,6 +107,10 @@ def test_deprecations(pub):
workflow.backoffice_fields_formdef.fields = [ workflow.backoffice_fields_formdef.fields = [
fields.TableField(id='bo1', label='table field'), fields.TableField(id='bo1', label='table field'),
] ]
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow)
workflow.variables_formdef.fields = [
fields.TableField(id='wfvar1', label='other table field'),
]
st0 = workflow.add_status('Status0', 'st0') st0 = workflow.add_status('Status0', 'st0')
display = st0.add_action('displaymsg') display = st0.add_action('displaymsg')
@ -266,6 +275,7 @@ def test_deprecations(pub):
assert [x.text for x in resp.pyquery('.section--fields li a')] == [ assert [x.text for x in resp.pyquery('.section--fields li a')] == [
'foobar / Field "table field"', 'foobar / Field "table field"',
'foobar / Field "ranked field"', 'foobar / Field "ranked field"',
'Options of workflow "test" / Field "other table field"',
'Backoffice fields of workflow "test" / Field "table field"', 'Backoffice fields of workflow "test" / Field "table field"',
] ]
assert [x.text for x in resp.pyquery('.section--actions li a')] == [ assert [x.text for x in resp.pyquery('.section--actions li a')] == [

View File

@ -284,6 +284,7 @@ def test_forms_edit_tracking_code(pub, formdef):
resp = resp.click('Form Tracking') resp = resp.click('Form Tracking')
assert resp.forms[0]['drafts_lifespan'].value == '' assert resp.forms[0]['drafts_lifespan'].value == ''
assert resp.forms[0]['drafts_max_per_user'].value == ''
resp = resp.forms[0].submit().follow() # check empty value is ok resp = resp.forms[0].submit().follow() # check empty value is ok
resp = resp.click('Form Tracking') resp = resp.click('Form Tracking')
@ -297,6 +298,20 @@ def test_forms_edit_tracking_code(pub, formdef):
resp = resp.forms[0].submit().follow() resp = resp.forms[0].submit().follow()
assert FormDef.get(1).drafts_lifespan == '5' assert FormDef.get(1).drafts_lifespan == '5'
resp = resp.click('Form Tracking')
resp.forms[0]['drafts_max_per_user'].value = 'xxx'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '120'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '1'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '3'
resp = resp.forms[0].submit().follow()
assert FormDef.get(1).drafts_max_per_user == '3'
formdef.fields = [ formdef.fields = [
fields.StringField(id='1', label='VerifyString'), fields.StringField(id='1', label='VerifyString'),
fields.DateField(id='2', label='VerifyDate'), fields.DateField(id='2', label='VerifyDate'),

View File

@ -1167,6 +1167,11 @@ def test_tests_duplicate(pub):
response.name = 'Response xxx' response.name = 'Response xxx'
response.store() response.store()
testdef.workflow_tests.actions.append(
workflow_tests.AssertWebserviceCall(id='3', webservice_response_uuid=response.uuid),
)
testdef.store()
app = login(get_app(pub)) app = login(get_app(pub))
assert TestDef.count() == 1 assert TestDef.count() == 1
@ -1196,6 +1201,8 @@ def test_tests_duplicate(pub):
assert testdef2.workflow_tests.actions[0].button_name == 'Go to end status' assert testdef2.workflow_tests.actions[0].button_name == 'Go to end status'
assert testdef1.get_webservice_responses()[0].name == 'Changed' assert testdef1.get_webservice_responses()[0].name == 'Changed'
assert testdef2.get_webservice_responses()[0].name == 'Response xxx' assert testdef2.get_webservice_responses()[0].name == 'Response xxx'
assert testdef1.workflow_tests.actions[2].details_label == 'Changed'
assert testdef2.workflow_tests.actions[2].details_label == 'Response xxx'
resp = app.get('/backoffice/forms/1/tests/%s/' % testdef.id) resp = app.get('/backoffice/forms/1/tests/%s/' % testdef.id)
resp = resp.click('Duplicate') resp = resp.click('Duplicate')

View File

@ -9,7 +9,7 @@ from wcs import workflow_tests
from wcs.formdef import FormDef, fields from wcs.formdef import FormDef, fields
from wcs.qommon.http_request import HTTPRequest from wcs.qommon.http_request import HTTPRequest
from wcs.testdef import TestDef, WebserviceResponse from wcs.testdef import TestDef, WebserviceResponse
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowCriticalityLevel
from ..utilities import create_temporary_pub, get_app, login from ..utilities import create_temporary_pub, get_app, login
from .test_all import create_superuser from .test_all import create_superuser
@ -237,10 +237,16 @@ def test_workflow_tests_action_button_click(pub):
jump = new_status.add_action('choice') jump = new_status.add_action('choice')
jump.label = 'Button no target status' jump.label = 'Button no target status'
workflow.add_global_action('Action 1')
interactive_action = workflow.add_global_action('Interactive action (should not be shown)')
interactive_action.add_action('form')
workflow.store() workflow.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id) resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
assert resp.form['button_name'].options == [ assert resp.form['button_name'].options == [
('Action 1', False, 'Action 1'),
('Button 1', False, 'Button 1'), ('Button 1', False, 'Button 1'),
('Button 2', False, 'Button 2'), ('Button 2', False, 'Button 2'),
('Button 4 (not available)', True, 'Button 4 (not available)'), ('Button 4 (not available)', True, 'Button 4 (not available)'),
@ -418,6 +424,156 @@ def test_workflow_tests_action_assert_sms(pub):
assert escape('SMS to 0123456789 (+2)') in resp.text assert escape('SMS to 0123456789 (+2)') in resp.text
def test_workflow_tests_action_assert_anonymise(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertAnonymise(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'Edit' not in resp.text
def test_workflow_tests_action_assert_redirect(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['url'] = 'http://example.com'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
assert 'http://example.com' in resp.text
def test_workflow_tests_action_assert_history_message(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['message'] = 'Hello'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
def test_workflow_tests_action_assert_alert(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['message'] = 'Hello'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
def test_workflow_tests_action_assert_criticality(pub):
create_superuser(pub)
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.workflow_id = workflow.id
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertCriticality(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
assert 'Workflow has no criticality levels.' in resp.text
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='red'),
]
workflow.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
resp.form['level_id'].select(text='green')
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
assert escape('Criticality is "green"') in resp.text
def test_workflow_tests_action_assert_backoffice_field(pub): def test_workflow_tests_action_assert_backoffice_field(pub):
create_superuser(pub) create_superuser(pub)
@ -508,13 +664,13 @@ def test_workflow_tests_action_assert_webservice_call(pub):
response3.store() response3.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id) resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
assert resp.form['webservice_response_id'].options == [ assert resp.form['webservice_response_uuid'].options == [
(str(response.id), False, 'Fake response'), (str(response.uuid), False, 'Fake response'),
(str(response2.id), False, 'Fake response 2'), (str(response2.uuid), False, 'Fake response 2'),
] ]
assert resp.form['call_count'].value == '1' assert resp.form['call_count'].value == '1'
resp.form['webservice_response_id'] = 1 resp.form['webservice_response_uuid'] = response.uuid
resp.form['call_count'] = 2 resp.form['call_count'] = 2
resp = resp.form.submit().follow() resp = resp.form.submit().follow()
@ -522,7 +678,7 @@ def test_workflow_tests_action_assert_webservice_call(pub):
assert 'Broken' not in resp.text assert 'Broken' not in resp.text
assert_webservice_call = TestDef.get(testdef.id).workflow_tests.actions[0] assert_webservice_call = TestDef.get(testdef.id).workflow_tests.actions[0]
assert assert_webservice_call.webservice_response_id == '1' assert assert_webservice_call.webservice_response_uuid == response.uuid
assert assert_webservice_call.call_count == 2 assert assert_webservice_call.call_count == 2
response.remove_self() response.remove_self()

View File

@ -429,6 +429,9 @@ def test_backoffice_submission_formdef_list_search(pub, local_user, access, auth
resp = get_url('/api/formdefs/?backoffice-submission=on&q=test') resp = get_url('/api/formdefs/?backoffice-submission=on&q=test')
assert len(resp.json['data']) == 2 assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=tes')
assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz') resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz')
assert len(resp.json['data']) == 0 assert len(resp.json['data']) == 0

View File

@ -398,6 +398,7 @@ def test_backoffice_card_item_link_id_template(pub):
resp = resp.form.submit('submit') resp = resp.form.submit('submit')
assert resp.location.endswith('/backoffice/data/foo/blah/') assert resp.location.endswith('/backoffice/data/foo/blah/')
resp = resp.follow() resp = resp.follow()
assert resp.pyquery('.breadcrumbs a')[-1].attrib['href'] == '/backoffice/data/foo/blah/'
resp = app.get('/backoffice/data/foo/') resp = app.get('/backoffice/data/foo/')
assert [x.attrib['href'] for x in resp.pyquery('table a')] == ['blah/', 'test/'] assert [x.attrib['href'] for x in resp.pyquery('table a')] == ['blah/', 'test/']
@ -681,6 +682,58 @@ def test_backoffice_cards_import_data_csv_no_backoffice_fields(pub):
assert carddef.data_class().count() == 2 assert carddef.data_class().count() == 2
def test_backoffice_cards_import_data_csv_custom_id_no_update(pub):
user = create_user(pub)
user.name_identifiers = [str(uuid.uuid4())]
user.store()
Workflow.wipe()
workflow = Workflow(name='form-title')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo0', varname='foo_bovar', label='bo variable'),
]
workflow.add_status('st0')
workflow.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [
fields.StringField(id='1', label='String', varname='custom_id'),
fields.ItemField(id='2', label='List', items=['item1', 'item2']),
]
carddef.backoffice_submission_roles = user.roles
carddef.id_template = '{{form_var_custom_id}}'
carddef.workflow = workflow
carddef.store()
carddef.data_class().wipe()
card = carddef.data_class()()
card.data = {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'}
card.just_created()
card.store()
app = login(get_app(pub))
data = b'''\
"String","List"
"plop","item1"
"test","item2"
'''
resp = app.get('/backoffice/data/test/import-file')
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
resp.form['update_existing_cards'].checked = False
resp = resp.forms[0].submit().follow()
assert carddef.data_class().count() == 2
card.refresh_from_storage()
assert card.data == {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'} # no change
other_card = carddef.data_class().select(order_by='-receipt_time')[0]
assert other_card.data == {'1': 'test', '2': 'item2', '2_display': 'item2', 'bo0': None}
assert other_card.id_display == 'test'
def test_backoffice_cards_import_data_csv_custom_id_update(pub): def test_backoffice_cards_import_data_csv_custom_id_update(pub):
user = create_user(pub) user = create_user(pub)
user.name_identifiers = [str(uuid.uuid4())] user.name_identifiers = [str(uuid.uuid4())]
@ -721,6 +774,7 @@ def test_backoffice_cards_import_data_csv_custom_id_update(pub):
''' '''
resp = app.get('/backoffice/data/test/import-file') resp = app.get('/backoffice/data/test/import-file')
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv') resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
resp.form['update_existing_cards'].checked = True
resp = resp.forms[0].submit().follow() resp = resp.forms[0].submit().follow()
assert carddef.data_class().count() == 2 assert carddef.data_class().count() == 2

View File

@ -22,6 +22,7 @@ from wcs.workflows import (
Workflow, Workflow,
WorkflowBackofficeFieldsFormDef, WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel, WorkflowCriticalityLevel,
WorkflowVariablesFieldsFormDef,
) )
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -1205,3 +1206,72 @@ def test_inspect_page_idp_role(pub):
resp.pyquery('[data-function-key="_receiver"] a').attr.href resp.pyquery('[data-function-key="_receiver"] a').attr.href
== 'https://idp.example.net/manage/roles/uuid:d4b59e1ffb204dfd99fd3760f4952999/' == 'https://idp.example.net/manage/roles/uuid:d4b59e1ffb204dfd99fd3760f4952999/'
) )
def test_inspect_page_form_option(pub):
create_user(pub, is_admin=True)
FormDef.wipe()
wf = Workflow(name='variables')
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.add_status('st1')
wf.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow = wf
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
app = login(get_app(pub))
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert 'form_option' not in resp.text
wf.variables_formdef.fields = [
fields.StringField(label='String test', varname='string_test'),
fields.DateField(label='Date test', varname='date_test'),
]
wf.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'None (no value)'
)
wf.variables_formdef.fields[0].default_value = 'xxx'
wf.variables_formdef.fields[1].default_value = '2024-03-20'
wf.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'xxx'
)
assert (
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
== '2024-03-20'
)
assert (
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
== '2024 (integer number)'
)
formdef.workflow_options = {'string_test': 'yyy', 'date_test': datetime.date(2024, 3, 21).timetuple()}
formdef.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'yyy'
)
assert (
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
== '2024-03-21'
)
assert (
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
== '2024 (integer number)'
)

View File

@ -414,6 +414,14 @@ def test_form_max_drafts(pub):
assert not formdef.data_class().has_key(drafts[0].id) # oldest draft was removed assert not formdef.data_class().has_key(drafts[0].id) # oldest draft was removed
formdef.drafts_max_per_user = '3'
formdef.store()
resp = app.get('/test/')
resp.form['f0'] = 'hello2'
resp = resp.form.submit('submit')
assert formdef.data_class().count([Equal('status', 'draft')]) == 4
def test_form_draft_temporary_access_url(pub): def test_form_draft_temporary_access_url(pub):
FormDef.wipe() FormDef.wipe()

View File

@ -1626,6 +1626,57 @@ def test_card_update_related_cascading_loop(pub):
assert carddata2.data['2_display'] == 'card1 card2 card1 None' assert carddata2.data['2_display'] == 'card1 card2 card1 None'
def test_card_update_related_items_relation(pub):
CardDef.wipe()
FormDef.wipe()
carddef = CardDef()
carddef.name = 'foo'
carddef.fields = [
StringField(id='1', label='Test', varname='foo'),
]
carddef.digest_templates = {'default': '{{ form_var_foo }}'}
carddef.store()
carddef.data_class().wipe()
carddata1 = carddef.data_class()()
carddata1.data = {'1': 'card1'}
carddata1.just_created()
carddata1.store()
carddata2 = carddef.data_class()()
carddata2.data = {'1': 'card2'}
carddata2.just_created()
carddata2.store()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
ItemField(id='1', label='Test', data_source={'type': 'carddef:foo'}),
ItemsField(id='2', label='Test2', data_source={'type': 'carddef:foo'}),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '1', '2': ['1', '2']}
formdata.data['1_display'] = formdef.fields[0].store_display_value(formdata.data, formdef.fields[0].id)
formdata.data['2_display'] = formdef.fields[1].store_display_value(formdata.data, formdef.fields[1].id)
assert formdata.data['1_display'] == 'card1'
assert formdata.data['2_display'] == 'card1, card2'
formdata.just_created()
formdata.store()
pub.cleanup()
carddef = carddef.get(carddef.id)
carddata1 = carddef.data_class().get(carddata1.id)
carddata1.data = {'1': 'card1-change1'}
carddata1.store()
formdata.refresh_from_storage()
assert formdata.data['1_display'] == 'card1-change1'
assert formdata.data['2_display'] == 'card1-change1, card2'
def test_card_update_related_deleted(pub): def test_card_update_related_deleted(pub):
BlockDef.wipe() BlockDef.wipe()
CardDef.wipe() CardDef.wipe()

View File

@ -1997,6 +1997,31 @@ def test_lazy_formdata_queryset_filter_non_unique_varname(pub, variable_test_dat
assert tmpl.render(context) == '1' assert tmpl.render(context) == '1'
def test_filter_on_page_field(pub):
pub.loggederror_class.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.PageField(id='1', label='Page', varname='page'),
]
formdef.store()
data_class = formdef.data_class()
formdata = data_class()
formdata.just_created()
formdata.store()
context = pub.substitutions.get_context_variables(mode='lazy')
tmpl = Template('{{forms|objects:"test"|filter_by:"page"|filter_value:"100"}}')
tmpl.render(context)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid filter "page"'
def test_numeric_filter_on_string(pub): def test_numeric_filter_on_string(pub):
FormDef.wipe() FormDef.wipe()
formdef = FormDef() formdef = FormDef()

View File

@ -76,6 +76,12 @@ HOBO_JSON = {
}, },
], ],
}, },
{
'service-id': 'lingo',
'title': 'Lingo',
'base_url': 'http://payment.example.net/',
'secret_key': 'aaa',
},
], ],
'profile': { 'profile': {
'fields': [ 'fields': [
@ -293,6 +299,7 @@ def test_configure_site_options(setuptest, alt_tempdir):
assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD' assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD'
assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/' assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/'
assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/' assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/'
assert pub.get_site_option('lingo_url', 'variables') == 'http://payment.example.net/'
assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/' assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/'
assert pub.get_site_option('disable_cron_jobs', 'variables') == 'True' assert pub.get_site_option('disable_cron_jobs', 'variables') == 'True'
assert pub.get_site_option('maintenance_page', 'variables') == 'True' assert pub.get_site_option('maintenance_page', 'variables') == 'True'

View File

@ -1183,6 +1183,45 @@ def test_sql_criteria_fts(pub):
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
def test_search_tokens_purge(pub):
_, cur = sql.get_connection_and_cursor()
# purge garbage from other tests
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
start = cur.fetchone()[0]
# define a new table
test_formdef = FormDef()
test_formdef.name = 'tableSelectFTStokens'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 1
t = data_class()
t.data = {'3': 'foofortokensofcourse'}
t.just_created()
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
t.data = {'3': 'chaussettefortokensofcourse'}
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 3
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
def table_exists(cur, table_name): def table_exists(cur, table_name):
cur.execute( cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables '''SELECT COUNT(*) FROM information_schema.tables
@ -1692,6 +1731,26 @@ def test_load_all_evolutions_on_any_formdata(pub):
assert len([x for x in objects if x._evolution is not None]) == 100 assert len([x for x in objects if x._evolution is not None]) == 100
def test_store_on_any_formdata(pub):
drop_formdef_tables()
formdef = FormDef()
formdef.name = 'test any store'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.receipt_time = localtime()
formdata.store()
objects = sql.AnyFormData.select()
assert len(objects) == 1
with pytest.raises(TypeError):
objects[0].store()
def test_geoloc_in_global_view(pub): def test_geoloc_in_global_view(pub):
drop_formdef_tables() drop_formdef_tables()

View File

@ -9,7 +9,12 @@ from wcs.qommon.http_request import HTTPRequest
from wcs.testdef import TestDef, WebserviceResponse from wcs.testdef import TestDef, WebserviceResponse
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.workflow_tests import WorkflowTestError from wcs.workflow_tests import WorkflowTestError
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem from wcs.workflows import (
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel,
WorkflowStatusItem,
)
from .backoffice_pages.test_all import create_user from .backoffice_pages.test_all import create_user
from .utilities import create_temporary_pub, get_app, login from .utilities import create_temporary_pub, get_app, login
@ -204,6 +209,65 @@ def test_workflow_tests_button_click(pub):
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.' assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
def test_workflow_tests_button_click_global_action(pub):
role = pub.role_class(name='test role')
role.store()
user = pub.user_class(name='test user')
user.roles = [role.id]
user.store()
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
end_status = workflow.add_status(name='End status')
global_action = workflow.add_global_action('Go to end status')
global_action.triggers[0].roles = [role.id]
sendmail = global_action.add_action('sendmail')
sendmail.to = ['test@example.org']
sendmail.subject = 'In new status'
sendmail.body = 'xxx'
jump = global_action.add_action('jump')
jump.status = end_status.id
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='End status'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to end status'),
workflow_tests.AssertEmail(),
workflow_tests.AssertStatus(status_name='End status'),
]
testdef.run(formdef)
# hide button from test user
user.roles = []
user.store()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
def test_workflow_tests_button_click_who(pub): def test_workflow_tests_button_click_who(pub):
role = pub.role_class(name='test role') role = pub.role_class(name='test role')
role.store() role.store()
@ -603,6 +667,230 @@ def test_workflow_tests_sms(pub):
assert 'SMS body: "Hello"' in excinfo.value.details assert 'SMS body: "Hello"' in excinfo.value.details
def test_workflow_tests_anonymise(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertAnonymise(),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form was not anonymised.'
anonymise_action = new_status.add_action('anonymise')
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
anonymise_action.mode = 'intermediate'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
anonymise_action.mode = 'unlink_user'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
def test_workflow_tests_redirect(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(url='https://example.com/'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No redirection occured.'
redirect_action = new_status.add_action('redirect_to_url')
redirect_action.url = 'https://test.com/'
workflow.store()
formdef.refresh_from_storage()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert (
str(excinfo.value)
== 'Expected redirection to https://example.com/ but was redirected to https://test.com/.'
)
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(url='https://test.com/'),
]
testdef.run(formdef)
def test_workflow_tests_history_message(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(message='Hello 42'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No history message.'
register_comment = new_status.add_action('register-comment')
register_comment.comment = 'Hello {{ 41|add:1 }}'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(message='Hello 43'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Wrong history message content.'
assert 'Displayed history message: <div>Hello 42</div>' in excinfo.value.details
assert 'Expected history message: Hello 43' in excinfo.value.details
def test_workflow_tests_alert(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(message='Hello 42'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: None' in excinfo.value.details
assert 'Expected alert: Hello 42' in excinfo.value.details
alert = new_status.add_action('displaymsg')
alert.message = 'Hello {{ 41|add:1 }}'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(message='Hello 43'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: <p>Hello 42</p>' in excinfo.value.details
assert 'Expected alert: Hello 43' in excinfo.value.details
def test_workflow_tests_criticality(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
green_level = WorkflowCriticalityLevel(name='green')
red_level = WorkflowCriticalityLevel(name='red')
workflow.criticality_levels = [green_level, red_level]
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertCriticality(level_id=red_level.id),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form should have criticality level "red" but has level "green".'
new_status.add_action('modify_criticality')
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
workflow.criticality_levels = []
workflow.store()
formdef.refresh_from_storage()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Broken, missing criticality level'
def test_workflow_tests_backoffice_fields(pub): def test_workflow_tests_backoffice_fields(pub):
user = pub.user_class(name='test user') user = pub.user_class(name='test user')
user.store() user.store()
@ -701,7 +989,7 @@ def test_workflow_tests_webservice(pub):
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='End status'), workflow_tests.AssertStatus(status_name='End status'),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
] ]
with pytest.raises(WorkflowTestError) as excinfo: with pytest.raises(WorkflowTestError) as excinfo:
@ -716,7 +1004,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 2 times (instead of 1).' assert str(excinfo.value) == 'Webservice response Fake response was used 2 times (instead of 1).'
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=2), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=2),
] ]
testdef.run(formdef) testdef.run(formdef)
@ -733,8 +1021,8 @@ def test_workflow_tests_webservice(pub):
response2.store() response2.store()
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_id=response2.id, call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response2.uuid, call_count=1),
] ]
testdef.run(formdef) testdef.run(formdef)
@ -744,8 +1032,8 @@ def test_workflow_tests_webservice(pub):
testdef.run(formdef) testdef.run(formdef)
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
] ]
with pytest.raises(WorkflowTestError) as excinfo: with pytest.raises(WorkflowTestError) as excinfo:
@ -753,7 +1041,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).' assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=0), workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=0),
] ]
with pytest.raises(WorkflowTestError) as excinfo: with pytest.raises(WorkflowTestError) as excinfo:
@ -761,7 +1049,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).' assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).'
testdef.workflow_tests.actions = [ testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id='xxx', call_count=1), workflow_tests.AssertWebserviceCall(webservice_response_uuid='xxx', call_count=1),
] ]
with pytest.raises(WorkflowTestError) as excinfo: with pytest.raises(WorkflowTestError) as excinfo:
@ -834,6 +1122,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump') status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
status_with_button = workflow.add_status('Status with button', 'status-with-button') status_with_button = workflow.add_status('Status with button', 'status-with-button')
transition_status = workflow.add_status('Transition status', 'transition-status') transition_status = workflow.add_status('Transition status', 'transition-status')
transition_status2 = workflow.add_status('Transition status 2', 'transition-status-2')
end_status = workflow.add_status('End status', 'end-status') end_status = workflow.add_status('End status', 'end-status')
jump = new_status.add_action('jump') jump = new_status.add_action('jump')
@ -864,7 +1153,24 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
sendsms.to = ['0123456789'] sendsms.to = ['0123456789']
sendsms.body = 'Hello' sendsms.body = 'Hello'
jump = transition_status.add_action('jump') anonymise_action = transition_status.add_action('anonymise')
anonymise_action.mode = 'intermediate'
redirect_action = transition_status.add_action('redirect_to_url')
redirect_action.url = 'https://test.com/'
register_comment = transition_status.add_action('register-comment')
register_comment.comment = 'Hello'
transition_status.add_action('modify_criticality')
global_action = workflow.add_global_action('Action 1')
global_action.triggers[0].roles = [role.id]
jump = global_action.add_action('jump')
jump.status = transition_status2.id
jump = transition_status2.add_action('jump')
jump.status = end_status.id jump.status = end_status.id
workflow.store() workflow.store()
@ -889,6 +1195,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
app = login(get_app(pub)) app = login(get_app(pub))
resp = app.get(formdata.get_url()) resp = app.get(formdata.get_url())
resp.form.submit('button1').follow() resp.form.submit('button1').follow()
resp.form.submit('button-action-1').follow()
formdata.refresh_from_storage() formdata.refresh_from_storage()
assert formdata.status == 'wf-end-status' assert formdata.status == 'wf-end-status'
@ -896,7 +1203,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
testdef.run(formdef) testdef.run(formdef)
actions = testdef.workflow_tests.actions actions = testdef.workflow_tests.actions
assert len(actions) == 9 assert len(actions) == 15
assert actions[0].key == 'assert-status' assert actions[0].key == 'assert-status'
assert actions[0].status_name == 'Status with timeout jump' assert actions[0].status_name == 'Status with timeout jump'
@ -914,6 +1221,16 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
assert actions[5].key == 'assert-email' assert actions[5].key == 'assert-email'
assert actions[6].key == 'assert-backoffice-field' assert actions[6].key == 'assert-backoffice-field'
assert actions[7].key == 'assert-sms' assert actions[7].key == 'assert-sms'
assert actions[8].key == 'assert-anonymise'
assert actions[9].key == 'assert-redirect'
assert actions[10].key == 'assert-history-message'
assert actions[11].key == 'assert-criticality'
assert actions[12].key == 'assert-status'
assert actions[12].status_name == 'Transition status'
assert actions[13].key == 'button-click'
assert actions[13].button_name == 'Action 1'
assert actions[-1].key == 'assert-status' assert actions[-1].key == 'assert-status'
assert actions[-1].status_name == 'End status' assert actions[-1].status_name == 'End status'

View File

@ -152,7 +152,7 @@ class BlockDirectory(FieldsDirectory):
'Save snapshot' 'Save snapshot'
) )
r += htmltext('<li><a class="button button-paragraph" rel="popup" href="overwrite">%s</a>') % _( r += htmltext('<li><a class="button button-paragraph" rel="popup" href="overwrite">%s</a>') % _(
'Overwrite' 'Overwrite with new import'
) )
r += htmltext('</ul>') r += htmltext('</ul>')
r += htmltext('<h3>%s</h3>') % _('Navigation') r += htmltext('<h3>%s</h3>') % _('Navigation')

View File

@ -196,7 +196,20 @@ class FieldDefPage(Directory):
to_be_deleted.reverse() to_be_deleted.reverse()
# add delete_fields checkbox only if the page has fields # add delete_fields checkbox only if the page has fields
if to_be_deleted: if to_be_deleted:
form.add(CheckboxWidget, 'delete_fields', title=_('Also remove all fields from the page')) form.add(
CheckboxWidget,
'delete_fields',
title=_('Also remove all fields from the page'),
attrs={'data-dynamic-display-parent': 'true'},
)
form.widgets.append(
HtmlWidget(
'<div class="warningnotice" '
'data-dynamic-display-child-of="delete_fields" '
'data-dynamic-display-checked="true">%s</div>'
% _('Warning: the page fields data will be permanently deleted.')
)
)
form.add_submit('delete', _('Delete')) form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel')) form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse(): if form.get_widget('cancel').parse():

View File

@ -30,6 +30,7 @@ from wcs.carddef import CardDef
from wcs.categories import Category from wcs.categories import Category
from wcs.formdef import ( from wcs.formdef import (
DRAFTS_DEFAULT_LIFESPAN, DRAFTS_DEFAULT_LIFESPAN,
DRAFTS_DEFAULT_MAX_PER_USER,
FormDef, FormDef,
FormdefImportError, FormdefImportError,
FormdefImportRecoverableError, FormdefImportRecoverableError,
@ -290,6 +291,23 @@ class OptionsDirectory(Directory):
widget.validation_function = check_lifespan widget.validation_function = check_lifespan
widget.validation_function_error_message = _('Lifespan must be between 2 and 100 days.') widget.validation_function_error_message = _('Lifespan must be between 2 and 100 days.')
widget = form.add(
WcsExtraStringWidget,
'drafts_max_per_user',
title=_('Maximum number of drafts per user (between 2 and 100)'),
value=self.formdef.drafts_max_per_user,
hint=_('%s drafts per user by default') % DRAFTS_DEFAULT_MAX_PER_USER,
)
def check_max_per_user(value):
try:
return bool(int(value) >= 2 and int(value) <= 100)
except (ValueError, TypeError):
return False
widget.validation_function = check_max_per_user
widget.validation_function_error_message = _('Maximum must be between 2 and 100 drafts.')
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Tracking Code'))) form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Tracking Code')))
form.add( form.add(
CheckboxWidget, CheckboxWidget,
@ -495,6 +513,7 @@ class OptionsDirectory(Directory):
'id_template', 'id_template',
'submission_lateral_template', 'submission_lateral_template',
'drafts_lifespan', 'drafts_lifespan',
'drafts_max_per_user',
'user_support', 'user_support',
'management_sidebar_items', 'management_sidebar_items',
] ]

View File

@ -155,6 +155,7 @@ class TestPage(FormBackOfficeStatusPage):
self.testdef = TestDef.get(component) self.testdef = TestDef.get(component)
except KeyError: except KeyError:
raise TraversalError() raise TraversalError()
self.testdef.formdef = objectdef
filled = self.testdef.build_formdata(objectdef, include_fields=True) filled = self.testdef.build_formdata(objectdef, include_fields=True)
super().__init__(objectdef, filled) super().__init__(objectdef, filled)

View File

@ -228,10 +228,16 @@ class CardPage(FormPage):
form.add( form.add(
CheckboxWidget, CheckboxWidget,
'update_existing_cards', 'update_existing_cards',
title=_('Update existing cards (only for JSON imports)'), title=_('Update existing cards (only for JSON imports)')
if not self.formdef.id_template
else _('Update existing cards'),
hint=_('Cards will be matched using their unique identifier ("uuid" property).') hint=_('Cards will be matched using their unique identifier ("uuid" property).')
if not self.formdef.id_template if not self.formdef.id_template
else _('Cards will be matched using their custom identifier ("id" property).'), else _(
'Cards will be matched using their custom identifier ("id" property). '
'If this option is enabled cards with the same identifiers will be updated, '
'otherwise they will be skipped.'
),
value=False, value=False,
) )
form.add_submit('submit', _('Submit')) form.add_submit('submit', _('Submit'))
@ -241,19 +247,22 @@ class CardPage(FormPage):
if form.is_submitted() and not form.has_errors(): if form.is_submitted() and not form.has_errors():
file_content = form.get_widget('file').parse().fp.read() file_content = form.get_widget('file').parse().fp.read()
update_existing_cards = form.get_widget('update_existing_cards').parse()
try: try:
json_content = json.loads(file_content) json_content = json.loads(file_content)
except ValueError: except ValueError:
# not json -> CSV # not json -> CSV
try: try:
return self.import_csv_submit(file_content, submission_agent_id=get_request().user.id) return self.import_csv_submit(
file_content,
update_existing_cards=update_existing_cards,
submission_agent_id=get_request().user.id,
)
except ValueError as e: except ValueError as e:
form.set_error('file', e) form.set_error('file', e)
else: else:
try: try:
return self.import_json_submit( return self.import_json_submit(json_content, update_existing_cards=update_existing_cards)
json_content, update_existing_cards=form.get_widget('update_existing_cards').parse()
)
except ValueError as e: except ValueError as e:
form.set_error('file', e) form.set_error('file', e)
@ -275,7 +284,9 @@ class CardPage(FormPage):
impossible_fields.append(field.label) impossible_fields.append(field.label)
return impossible_fields return impossible_fields
def import_csv_submit(self, content, afterjob=True, api=False, submission_agent_id=None): def import_csv_submit(
self, content, afterjob=True, api=False, update_existing_cards=False, submission_agent_id=None
):
if b'\0' in content: if b'\0' in content:
raise ValueError(_('Invalid file format.')) raise ValueError(_('Invalid file format.'))
@ -328,7 +339,10 @@ class CardPage(FormPage):
raise ValueError(error_message) raise ValueError(error_message)
job = ImportFromCsvAfterJob( job = ImportFromCsvAfterJob(
carddef=self.formdef, data_lines=data_lines, submission_agent_id=submission_agent_id carddef=self.formdef,
data_lines=data_lines,
update_existing_cards=update_existing_cards,
submission_agent_id=submission_agent_id,
) )
if afterjob: if afterjob:
get_response().add_after_job(job) get_response().add_after_job(job)
@ -432,12 +446,13 @@ class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
class ImportFromCsvAfterJob(AfterJob): class ImportFromCsvAfterJob(AfterJob):
def __init__(self, carddef, data_lines, submission_agent_id): def __init__(self, carddef, data_lines, update_existing_cards, submission_agent_id):
super().__init__( super().__init__(
label=_('Importing data into cards'), label=_('Importing data into cards'),
carddef_class=carddef.__class__, carddef_class=carddef.__class__,
carddef_id=carddef.id, carddef_id=carddef.id,
data_lines=data_lines, data_lines=data_lines,
update_existing_cards=update_existing_cards,
submission_agent_id=submission_agent_id, submission_agent_id=submission_agent_id,
) )
@ -448,6 +463,7 @@ class ImportFromCsvAfterJob(AfterJob):
def execute(self): def execute(self):
self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id']) self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
update_existing_cards = self.kwargs['update_existing_cards']
carddata_class = self.carddef.data_class() carddata_class = self.carddef.data_class()
self.submission_agent_id = self.kwargs['submission_agent_id'] self.submission_agent_id = self.kwargs['submission_agent_id']
self.total_count = len(self.kwargs['data_lines']) self.total_count = len(self.kwargs['data_lines'])
@ -507,6 +523,9 @@ class ImportFromCsvAfterJob(AfterJob):
except KeyError: except KeyError:
pass # unique id, fine pass # unique id, fine
else: else:
if not update_existing_cards:
self.increment_count()
continue
# overwrite (only fields from CSV columns, not unsupported or backoffice fields) # overwrite (only fields from CSV columns, not unsupported or backoffice fields)
new_card = False new_card = False
orig_data = copy.copy(carddata_with_same_id.data) orig_data = copy.copy(carddata_with_same_id.data)

View File

@ -495,6 +495,10 @@ class Command(TenantCommand):
variables['portal_user_url'] = service_url variables['portal_user_url'] = service_url
variables['portal_user_title'] = service.get('title') variables['portal_user_title'] = service.get('title')
config.set('options', 'theme_skeleton_url', service.get('base_url') + '__skeleton__/') config.set('options', 'theme_skeleton_url', service.get('base_url') + '__skeleton__/')
if service.get('service-id') == 'lingo':
variables['lingo_url'] = urllib.parse.urljoin(service_url, '/')
for legacy_url in service.get('legacy_urls', []): for legacy_url in service.get('legacy_urls', []):
legacy_domain = urllib.parse.urlparse(legacy_url['base_url']).netloc.split(':')[0] legacy_domain = urllib.parse.urlparse(legacy_url['base_url']).netloc.split(':')[0]
legacy_urls[legacy_domain] = domain legacy_urls[legacy_domain] = domain

View File

@ -61,6 +61,7 @@ from .qommon.upload_storage import PicklableUpload
from .roles import logged_users_role from .roles import logged_users_role
DRAFTS_DEFAULT_LIFESPAN = 100 # days DRAFTS_DEFAULT_LIFESPAN = 100 # days
DRAFTS_DEFAULT_MAX_PER_USER = 5
if not hasattr(types, 'ClassType'): if not hasattr(types, 'ClassType'):
types.ClassType = type types.ClassType = type
@ -190,6 +191,7 @@ class FormDef(StorableObject):
submission_lateral_template = None submission_lateral_template = None
id_template = None id_template = None
drafts_lifespan = None drafts_lifespan = None
drafts_max_per_user = None
user_support = None user_support = None
geolocations = None geolocations = None
@ -220,6 +222,7 @@ class FormDef(StorableObject):
'submission_lateral_template', 'submission_lateral_template',
'id_template', 'id_template',
'drafts_lifespan', 'drafts_lifespan',
'drafts_max_per_user',
'user_support', 'user_support',
] ]
BOOLEAN_ATTRIBUTES = [ BOOLEAN_ATTRIBUTES = [
@ -573,9 +576,11 @@ class FormDef(StorableObject):
def get_all_fields(self): def get_all_fields(self):
return (self.fields or []) + self.workflow.get_backoffice_fields() return (self.fields or []) + self.workflow.get_backoffice_fields()
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True): def iter_fields(self, include_block_fields=False, with_backoffice_fields=True, with_no_data_fields=True):
def _iter_fields(fields, block_field=None): def _iter_fields(fields, block_field=None):
for field in fields: for field in fields:
if with_no_data_fields is False and field.is_no_data_field:
continue
# add contextual_id/contextual_varname attributes # add contextual_id/contextual_varname attributes
# they are id/varname for normal fields # they are id/varname for normal fields
# but in case of blocks they are concatenation of block id/varname + field id/varname # but in case of blocks they are concatenation of block id/varname + field id/varname
@ -634,6 +639,9 @@ class FormDef(StorableObject):
def get_drafts_lifespan(self): def get_drafts_lifespan(self):
return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN) return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN)
def get_drafts_max_per_user(self):
return int(self.drafts_max_per_user or DRAFTS_DEFAULT_MAX_PER_USER)
_workflow = None _workflow = None
def get_workflow(self): def get_workflow(self):

View File

@ -1030,7 +1030,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
return Directory._q_lookup(self, component) return Directory._q_lookup(self, component)
def _q_traverse(self, path): def _q_traverse(self, path):
get_response().breadcrumb.append((str(self.filled.id) + '/', self.filled.get_display_id())) get_response().breadcrumb.append((self.filled.identifier + '/', self.filled.get_display_id()))
return super()._q_traverse(path) return super()._q_traverse(path)
def wfedit(self, action_id): def wfedit(self, action_id):

View File

@ -1778,11 +1778,11 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
if get_session().mark_anonymous_formdata(filled): if get_session().mark_anonymous_formdata(filled):
get_session().store() get_session().store()
elif new_draft: elif new_draft:
# keep at most 5 drafts per user # keep at most "max_per_user" drafts per user
data_class = self.formdef.data_class() data_class = self.formdef.data_class()
for id in data_class.get_sorted_ids( for id in data_class.get_sorted_ids(
'-last_update_time', [Equal('status', 'draft'), Equal('user_id', str(filled.user_id))] '-last_update_time', [Equal('status', 'draft'), Equal('user_id', str(filled.user_id))]
)[5:]: )[self.formdef.get_drafts_max_per_user() :]:
data_class.remove_object(id) data_class.remove_object(id)
if new_draft: if new_draft:

View File

@ -4,8 +4,8 @@ msgid ""
msgstr "" msgstr ""
"Project-Id-Version: wcs 0\n" "Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-03-18 14:22+0100\n" "POT-Creation-Date: 2024-03-21 19:00+0100\n"
"PO-Revision-Date: 2024-03-18 14:22+0100\n" "PO-Revision-Date: 2024-03-21 19:00+0100\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n" "Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n" "Language-Team: french\n"
"Language: fr\n" "Language: fr\n"
@ -18,7 +18,7 @@ msgstr ""
#: admin/data_sources.py admin/forms.py admin/mail_templates.py admin/tests.py #: admin/data_sources.py admin/forms.py admin/mail_templates.py admin/tests.py
#: admin/users.py admin/workflows.py admin/wscalls.py backoffice/management.py #: admin/users.py admin/workflows.py admin/wscalls.py backoffice/management.py
#: fields/base.py qommon/ident/franceconnect.py #: fields/base.py qommon/ident/franceconnect.py
#: templates/wcs/backoffice/test-result.html wf/profile.py #: templates/wcs/backoffice/test-result.html wf/profile.py workflow_tests.py
msgid "Name" msgid "Name"
msgstr "Nom" msgstr "Nom"
@ -866,7 +866,7 @@ msgstr ""
#: admin/fields.py admin/settings.py admin/users.py backoffice/management.py #: admin/fields.py admin/settings.py admin/users.py backoffice/management.py
#: data_sources.py fields/base.py qommon/form.py qommon/ident/password.py #: data_sources.py fields/base.py qommon/form.py qommon/ident/password.py
#: statistics/views.py wf/create_formdata.py #: statistics/views.py wf/create_formdata.py workflow_tests.py
msgid "None" msgid "None"
msgstr "Aucun" msgstr "Aucun"
@ -931,6 +931,12 @@ msgstr "Vous allez supprimer le champ « %s »."
msgid "Also remove all fields from the page" msgid "Also remove all fields from the page"
msgstr "Supprimer tous les champs de la page" msgstr "Supprimer tous les champs de la page"
#: admin/fields.py
msgid "Warning: the page fields data will be permanently deleted."
msgstr ""
"Attention : les informations contenues dans les champs de la page seront "
"perdues de façon irréversible."
#: admin/fields.py #: admin/fields.py
#, python-format #, python-format
msgid "Deletion of field \"%s\"" msgid "Deletion of field \"%s\""
@ -1166,6 +1172,19 @@ msgstr "Par défaut les brouillons sont supprimés après %s jours."
msgid "Lifespan must be between 2 and 100 days." msgid "Lifespan must be between 2 and 100 days."
msgstr "La durée de vie doit être entre 2 et 100 jours." msgstr "La durée de vie doit être entre 2 et 100 jours."
#: admin/forms.py
msgid "Maximum number of drafts per user (between 2 and 100)"
msgstr "Nombre maximum de brouillons par utilisateur (entre 2 et 100)"
#: admin/forms.py
#, python-format
msgid "%s drafts per user by default"
msgstr "%s brouillons par utilisateur par défaut"
#: admin/forms.py
msgid "Maximum must be between 2 and 100 drafts."
msgstr "Le nombre maximum doit être entre 2 et 100 brouillons."
#: admin/forms.py backoffice/management.py backoffice/submission.py #: admin/forms.py backoffice/management.py backoffice/submission.py
#: forms/root.py #: forms/root.py
msgid "Tracking Code" msgid "Tracking Code"
@ -2369,7 +2388,7 @@ msgid "Sender (number or name)"
msgstr "Expéditeur (nom ou numéro)" msgstr "Expéditeur (nom ou numéro)"
#: admin/settings.py admin/tests.py wf/notification.py wf/redirect_to_url.py #: admin/settings.py admin/tests.py wf/notification.py wf/redirect_to_url.py
#: wf/wscall.py wscalls.py #: wf/wscall.py workflow_tests.py wscalls.py
msgid "URL" msgid "URL"
msgstr "URL" msgstr "URL"
@ -3940,6 +3959,10 @@ msgid "Update existing cards (only for JSON imports)"
msgstr "" msgstr ""
"Mettre à jour les fiches existantes (uniquement pour les fichiers JSON)" "Mettre à jour les fiches existantes (uniquement pour les fichiers JSON)"
#: backoffice/data_management.py
msgid "Update existing cards"
msgstr "Mettre à jour les fiches existantes"
#: backoffice/data_management.py #: backoffice/data_management.py
msgid "" msgid ""
"Cards will be matched using their unique identifier (\"uuid\" property)." "Cards will be matched using their unique identifier (\"uuid\" property)."
@ -3948,10 +3971,15 @@ msgstr ""
"identifiant unique (propriété « uuid »)." "identifiant unique (propriété « uuid »)."
#: backoffice/data_management.py #: backoffice/data_management.py
msgid "Cards will be matched using their custom identifier (\"id\" property)." msgid ""
"Cards will be matched using their custom identifier (\"id\" property). If "
"this option is enabled cards with the same identifiers will be updated, "
"otherwise they will be skipped."
msgstr "" msgstr ""
"La correspondance avec les fiches existantes se fera sur base de leur " "La correspondance avec les fiches existantes se fera sur base de leur "
"identifiant personnalisé (propriété « id »)." "identifiant personnalisé (propriété « id »). Si cette option est activée les "
"fiches avec un identifiant existant seront mises à jour, sinon elles seront "
"ignorées."
#: backoffice/data_management.py backoffice/i18n.py #: backoffice/data_management.py backoffice/i18n.py
#: templates/wcs/backoffice/card-data-import-form.html #: templates/wcs/backoffice/card-data-import-form.html
@ -9728,6 +9756,10 @@ msgstr "Contenu de la barre latéral pour le traitement"
msgid "Tracking codes" msgid "Tracking codes"
msgstr "Codes de suivi" msgstr "Codes de suivi"
#: templates/wcs/backoffice/formdef-inspect.html
msgid "Maximum number of drafts per user"
msgstr "Nombre maximum de brouillons par utilisateur"
#: templates/wcs/backoffice/formdef-inspect.html #: templates/wcs/backoffice/formdef-inspect.html
msgid "Redirection when disabled" msgid "Redirection when disabled"
msgstr "Redirection quand désactivé" msgstr "Redirection quand désactivé"
@ -11392,7 +11424,7 @@ msgstr "Erreur dans le gabarit du message de workflow (%s)"
msgid "Error rendering message." msgid "Error rendering message."
msgstr "Erreur de rendu du message." msgstr "Erreur de rendu du message."
#: wf/display_message.py wf/register_comment.py #: wf/display_message.py wf/register_comment.py workflow_tests.py
msgid "Message" msgid "Message"
msgstr "Message" msgstr "Message"
@ -12422,6 +12454,104 @@ msgstr "Numéros de téléphone"
msgid "Add phone number" msgid "Add phone number"
msgstr "Ajouter un numéro de téléphone" msgstr "Ajouter un numéro de téléphone"
#: workflow_tests.py
msgid "Assert anonymisation is performed"
msgstr "Vérifier que lanonymisation a lieu"
#: workflow_tests.py
msgid "Form was not anonymised."
msgstr "La demande na pas été anonymisée."
#: workflow_tests.py
msgid "Assert redirect is performed"
msgstr "Vérifier quune redirection a lieu"
#: workflow_tests.py
msgid "No redirection occured."
msgstr "Aucune redirection na eu lieu."
#: workflow_tests.py
#, python-format
msgid "Expected redirection to %(expected_url)s but was redirected to %(url)s."
msgstr ""
"Une redirection vers « %(expected_url)s » était attendue mais une "
"redirection vers « %(url)s » a eu lieu."
#: workflow_tests.py
msgid "Assert history message is displayed"
msgstr "Vérifier laffichage dun message dans lhistorique"
#: workflow_tests.py
msgid "No history message."
msgstr "Pas de message dans lhistorique."
#: workflow_tests.py
#, python-format
msgid "Displayed history message: %s"
msgstr "Message affiché dans lhistorique : %s"
#: workflow_tests.py
#, python-format
msgid "Expected history message: %s"
msgstr "Message attendu dans lhistorique : %s"
#: workflow_tests.py
msgid "Wrong history message content."
msgstr "Contenu du message dans lhistorique incorrect."
#: workflow_tests.py
msgid "Assertion will pass if the text is contained in history message."
msgstr "La vérification réussira si le texte est contenu dans le message."
#: workflow_tests.py
msgid "Assert alert is displayed"
msgstr "Vérifier laffichage dune alerte"
#: workflow_tests.py
#, python-format
msgid "Displayed alerts: %s"
msgstr "Alertes affichées : %s"
#: workflow_tests.py
#, python-format
msgid "Expected alert: %s"
msgstr "Alerte attendue : %s"
#: workflow_tests.py
msgid "No alert matching message."
msgstr "Pas dalerte correspondant au message attendu."
#: workflow_tests.py
msgid "Assertion will pass if the text is contained in alert message."
msgstr ""
"La vérification réussira si le texte est contenu dans le message dalerte."
#: workflow_tests.py
msgid "Assert criticality level"
msgstr "Vérifier le niveau de criticité"
#: workflow_tests.py
msgid "Workflow has no criticality levels."
msgstr "Le workflow na pas de niveaux de criticité."
#: workflow_tests.py
msgid "Broken, missing criticality level"
msgstr "Cassé, niveau de criticité manquant"
#: workflow_tests.py
#, python-format
msgid "Criticality is \"%s\""
msgstr "Le niveau de criticité est « %s »"
#: workflow_tests.py
#, python-format
msgid ""
"Form should have criticality level \"%(expected_level)s\" but has level "
"\"%(level)s\"."
msgstr ""
"La demande devrait avoir le niveau de criticité « %(expected_level)s » mais "
"elle a le niveau « %(level)s »."
#: workflow_traces.py #: workflow_traces.py
msgid "Created (by API)" msgid "Created (by API)"
msgstr "Création (par lAPI)" msgstr "Création (par lAPI)"

View File

@ -485,6 +485,7 @@ class WcsPublisher(QommonPublisher):
for _formdef in FormDef.select() + CardDef.select(): for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef) sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur) sql.migrate_global_views(conn, cur)
sql.init_search_tokens()
cur.close() cur.close()
def record_deprecated_usage(self, *args, **kwargs): def record_deprecated_usage(self, *args, **kwargs):

View File

@ -692,6 +692,11 @@ class QommonPublisher(Publisher):
for error in self.loggederror_class.select(clause=clauses): for error in self.loggederror_class.select(clause=clauses):
self.loggederror_class.remove_object(error.id) self.loggederror_class.remove_object(error.id)
def clean_search_tokens(self, **kwargs):
from wcs import sql
sql.purge_obsolete_search_tokens()
@classmethod @classmethod
def register_cronjobs(cls): def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions')) cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
@ -704,6 +709,9 @@ class QommonPublisher(Publisher):
cls.register_cronjob( cls.register_cronjob(
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors') CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
) )
cls.register_cronjob(
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens')
)
_initialized = False _initialized = False

View File

@ -113,9 +113,26 @@ function init_sync_from_template_address() {
} }
} }
$(widget_selector).each(function(idx, elem) { $(widget_selector).each(function(idx, elem) {
// enable manual address mode if there is an error in one of the manual address fields. var $manual_checkbox = $(elem).find('input.wcs-manual-address');
if ($(elem).nextUntil('.template-address', '[data-geolocation].widget-with-error').length) { if ($(elem).nextUntil('.template-address', '[data-geolocation].widget-with-error').length) {
$(elem).find('input.wcs-manual-address').prop('checked', true).trigger('change'); // enable manual address mode if there is an error in one of the manual address fields.
$manual_checkbox.prop('checked', true).trigger('change');
} else {
// enable manual address mode if a manual field has data while the select is empty
// (typically when going back to a previous page)
var has_val = $(elem).find('select').val();
if (! has_val) {
var has_manual_var = false;
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('input').each(function(idx, manual_elem) {
if ($(manual_elem).val()) has_manual_var = true;
})
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('textarea').each(function(idx, manual_elem) {
if ($(manual_elem).val()) has_manual_var = true;
})
if (has_manual_var) {
$manual_checkbox.prop('checked', true).trigger('change');
}
}
} }
}); });
} }

View File

@ -188,7 +188,7 @@ REQUESTS_CERT = {}
DISABLE_CRON_JOBS = False DISABLE_CRON_JOBS = False
# w.c.s. can have very large forms, in backoffice and frontoffice # w.c.s. can have very large forms, in backoffice and frontoffice
DATA_UPLOAD_MAX_NUMBER_FIELDS = 2000 # Django default is 1000 DATA_UPLOAD_MAX_NUMBER_FIELDS = 3000 # Django default is 1000
# workalendar config # workalendar config
WORKING_DAY_CALENDAR = 'workalendar.europe.France' WORKING_DAY_CALENDAR = 'workalendar.europe.France'

View File

@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = {
} }
def _table_exists(cur, table_name):
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s', (table_name,))
rows = cur.fetchall()
return len(rows) > 0
def _trigger_exists(cur, table_name, trigger_name):
cur.execute(
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s', (table_name, trigger_name)
)
rows = cur.fetchall()
return len(rows) > 0
class WcsPgConnection(psycopg2.extensions.connection): class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -1582,6 +1596,8 @@ def do_global_views(conn, cur):
% (name, category.id) % (name, category.id)
) )
init_search_tokens_triggers(cur)
def clean_global_views(conn, cur): def clean_global_views(conn, cur):
# Purge of any dead data # Purge of any dead data
@ -1674,11 +1690,178 @@ def init_global_table(conn=None, cur=None):
endpoint_status=endpoint_status_filter, endpoint_status=endpoint_status_filter,
) )
) )
init_search_tokens_data(cur)
if own_conn: if own_conn:
cur.close() cur.close()
def init_search_tokens(conn=None, cur=None):
"""Initialize the search_tokens mechanism.
It's based on three parts:
- a token table
- triggers to feed this table from the tsvectors used in the database
- a search function that will leverage these tokens to extend the search query.
So far, the sources used are wcs_all_forms and searchable_formdefs.
Example: let's say the sources texts are "Tarif d'école" and "La cantine".
This gives the following tsvectors: ('tarif', 'écol') and ('cantin')
Our tokens table will have these three words.
When the search function is launched, it splits the search query and will
replace unavailable tokens by those close, if available.
The search query 'tari' will be expanded to 'tarif'.
The search query 'collège' will remain unchanged (and return nothing)
If several tokens match or are close enough, the query will be expanded to
an OR.
"""
own_cur = False
if cur is None:
own_cur = True
conn, cur = get_connection_and_cursor()
# Create table
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
# Create triggers
init_search_tokens_triggers(cur)
# Fill table
init_search_tokens_data(cur)
# Index at the end, small performance trick... not that useful, but it's free...
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
cur.execute(
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
)
# And last: functions to use this brand new table
# These two aggregates make the search query far simpler to write
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
cur.execute(
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
RETURNS tsquery
LANGUAGE sql
STABLE
AS $function$
WITH
tokenized AS (SELECT unnest(regexp_split_to_array($1, '\s+')) w),
super_tokenized AS (
-- perfect: tokens that are found as is in table, thus no OR required
-- partial: tokens found using distance search on tokens table (note: numbers are excluded here)
-- otherwise: token as is and likely no search result later
SELECT w,
coalesce((select plainto_tsquery(perfect.token) FROM wcs_search_tokens AS perfect WHERE perfect.token = plainto_tsquery(w)::text),
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
plainto_tsquery(w)) tokens
FROM tokenized
LEFT JOIN wcs_search_tokens AS partial ON partial.token % w AND w not similar to '%[0-9]{2,}%'
GROUP BY w)
SELECT tsquery_agg_and(tokens) FROM super_tokenized;
$function$;"""
)
if own_cur:
cur.close()
def init_search_tokens_triggers(cur):
# We define only appending triggers, ie on INSERT and UPDATE.
# It would be far heavier to maintain deletions here, and having extra data has
# no or marginal side effect on search performances, and absolutely no impact
# on search results.
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
# personal data is kept uselessly.
# First part: the appending function
cur.execute(
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
RETURN NEW;
END;
$function$;"""
)
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort trigger creation if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
):
# Second part: insert and update triggers for wcs_all_forms
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
AFTER INSERT ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
AFTER UPDATE OF fts ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
):
# Third part: insert and update triggers for searchable_formdefs
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
AFTER INSERT ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
AFTER UPDATE OF fts ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
def init_search_tokens_data(cur):
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort table data initialization if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
ON CONFLICT(token) DO NOTHING;"""
)
if _table_exists(cur, 'searchable_formdefs'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
ON CONFLICT(token) DO NOTHING;"""
)
def purge_obsolete_search_tokens(cur=None):
own_cur = False
if cur is None:
own_cur = True
_, cur = get_connection_and_cursor()
cur.execute(
"""DELETE FROM wcs_search_tokens
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
)
if own_cur:
cur.close()
class SqlMixin: class SqlMixin:
_table_name = None _table_name = None
_numerical_id = True _numerical_id = True
@ -4725,6 +4908,8 @@ class AnyFormData(SqlMixin):
# convert back unstructured geolocation to the 'native' formdata format. # convert back unstructured geolocation to the 'native' formdata format.
if o.geoloc_base_x is not None: if o.geoloc_base_x is not None:
o.geolocations = {'base': {'lon': o.geoloc_base_x, 'lat': o.geoloc_base_y}} o.geolocations = {'base': {'lon': o.geoloc_base_x, 'lat': o.geoloc_base_y}}
# do not allow storing those partial objects
o.store = None
return o return o
@classmethod @classmethod
@ -4809,7 +4994,6 @@ class SearchableFormDef(SqlMixin):
% (cls._table_name, cls._table_name) % (cls._table_name, cls._table_name)
) )
cls.do_indexes(cur) cls.do_indexes(cur)
cur.close()
from wcs.carddef import CardDef from wcs.carddef import CardDef
from wcs.formdef import FormDef from wcs.formdef import FormDef
@ -4818,6 +5002,8 @@ class SearchableFormDef(SqlMixin):
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True) CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
): ):
cls.update(obj=objectdef) cls.update(obj=objectdef)
init_search_tokens(cur)
cur.close()
@classmethod @classmethod
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None): def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
@ -4855,7 +5041,7 @@ class SearchableFormDef(SqlMixin):
def search(cls, obj_type, string): def search(cls, obj_type, string):
_, cur = get_connection_and_cursor() _, cur = get_connection_and_cursor()
cur.execute( cur.execute(
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)', 'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
(FtsMatch.get_fts_value(string),), (FtsMatch.get_fts_value(string),),
) )
ids = [x[0] for x in cur.fetchall()] ids = [x[0] for x in cur.fetchall()]
@ -5120,7 +5306,7 @@ def get_period_total(
# latest migration, number + description (description is not used # latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are # programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number) # separately added with the same number)
SQL_LEVEL = (106, 'add context column to logged_errors table') SQL_LEVEL = (107, 'new fts mechanism with tokens table')
def migrate_global_views(conn, cur): def migrate_global_views(conn, cur):
@ -5454,6 +5640,10 @@ def migrate():
for formdef in FormDef.select() + CardDef.select(): for formdef in FormDef.select() + CardDef.select():
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False) do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
if sql_level < 107:
# 107: new fts mechanism with tokens table
init_search_tokens()
if sql_level != SQL_LEVEL[0]: if sql_level != SQL_LEVEL[0]:
cur.execute( cur.execute(
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''', '''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',

View File

@ -379,6 +379,11 @@ class FtsMatch(Criteria):
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value) return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
class WcsFtsMatch(FtsMatch):
def as_sql(self):
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
class ElementEqual(Criteria): class ElementEqual(Criteria):
def __init__(self, attribute, key, value, **kwargs): def __init__(self, attribute, key, value, **kwargs):
super().__init__(attribute, value) super().__init__(attribute, value)

View File

@ -67,6 +67,7 @@
<li><span class="parameter">{% trans "Fields to check after entering the tracking code" %}{% trans ":" %}</span> {{ tracking_code_verify_fields_labels|default:"-" }}</li> <li><span class="parameter">{% trans "Fields to check after entering the tracking code" %}{% trans ":" %}</span> {{ tracking_code_verify_fields_labels|default:"-" }}</li>
{% endif %} {% endif %}
<li><span class="parameter">{% trans "Lifespan of drafts (in days)" %}{% trans ":" %}</span> {{ formdef.get_drafts_lifespan }}</li> <li><span class="parameter">{% trans "Lifespan of drafts (in days)" %}{% trans ":" %}</span> {{ formdef.get_drafts_lifespan }}</li>
<li><span class="parameter">{% trans "Maximum number of drafts per user" %}{% trans ":" %}</span> {{ formdef.get_drafts_max_per_user }}</li>
<li><span class="parameter">{% trans "Templates" %}</span> <li><span class="parameter">{% trans "Templates" %}</span>
<ul> <ul>
<li><span class="parameter">{% trans "Digest" %}{% trans ":" %}</span> {{ formdef.default_digest_template|default:"-" }}</li> <li><span class="parameter">{% trans "Digest" %}{% trans ":" %}</span> {{ formdef.default_digest_template|default:"-" }}</li>

View File

@ -31,9 +31,11 @@
</span> </span>
</span> </span>
<p class="commands"> <p class="commands">
<span class="edit"> {% if action.editable %}
<a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a> <span class="edit">
</span> <a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a>
</span>
{% endif %}
<span class="duplicate"> <span class="duplicate">
<a href="{{ action.id }}/duplicate" title="{% trans "Duplicate" %}">{% trans "Duplicate" %}</a> <a href="{{ action.id }}/duplicate" title="{% trans "Duplicate" %}">{% trans "Duplicate" %}</a>
</span> </span>

View File

@ -21,6 +21,7 @@ import io
import json import json
import socket import socket
import urllib.parse import urllib.parse
import uuid
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from contextlib import contextmanager from contextlib import contextmanager
@ -75,8 +76,8 @@ class TestDefXmlProxy(XmlStorableObject):
} }
excluded_fields = ['id', 'object_type', 'object_id'] excluded_fields = ['id', 'object_type', 'object_id']
extra_fields = [ extra_fields = [
('workflow_tests', 'workflow_tests'),
('_webservice_responses', 'webservice_responses'), ('_webservice_responses', 'webservice_responses'),
('workflow_tests', 'workflow_tests'),
] ]
return [ return [
@ -674,6 +675,7 @@ class WebserviceResponse(XmlStorableObject):
_names = 'webservice-response' _names = 'webservice-response'
xml_root_node = 'webservice-response' xml_root_node = 'webservice-response'
uuid = None
testdef_id = None testdef_id = None
name = '' name = ''
payload = None payload = None
@ -684,6 +686,7 @@ class WebserviceResponse(XmlStorableObject):
post_data = None post_data = None
XML_NODES = [ XML_NODES = [
('uuid', 'str'),
('testdef_id', 'int'), ('testdef_id', 'int'),
('name', 'str'), ('name', 'str'),
('payload', 'str'), ('payload', 'str'),
@ -694,6 +697,10 @@ class WebserviceResponse(XmlStorableObject):
('post_data', 'kv_data'), ('post_data', 'kv_data'),
] ]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
def __str__(self): def __str__(self):
return self.name return self.name

View File

@ -270,7 +270,7 @@ class LazyFormDefObjectsManager:
return self._clone(self._criterias + [self._formdef.get_by_id_criteria(str(value))]) return self._clone(self._criterias + [self._formdef.get_by_id_criteria(str(value))])
def get_fields(self, key): def get_fields(self, key):
for field in self._formdef.iter_fields(include_block_fields=True): for field in self._formdef.iter_fields(include_block_fields=True, with_no_data_fields=False):
if getattr(field, 'block_field', None): if getattr(field, 'block_field', None):
if field.key == 'items': if field.key == 'items':
# not yet # not yet
@ -757,8 +757,6 @@ class LazyFormDef:
@property @property
def option(self): def option(self):
if not self._formdef.workflow.variables_formdef:
return {}
return LazyFormDefOptions(self._formdef) return LazyFormDefOptions(self._formdef)
@property @property
@ -1989,14 +1987,20 @@ class LazyRequest:
class LazyFormDefOptions(LazyFormDataVar): class LazyFormDefOptions(LazyFormDataVar):
def __init__(self, formdef): def __init__(self, formdef):
self._formdef = formdef self._formdef = formdef
fields = self._formdef.workflow.variables_formdef.fields try:
fields = self._formdef.workflow.variables_formdef.fields
except AttributeError:
fields = []
data = self._formdef.workflow_options or {} data = self._formdef.workflow_options or {}
for field in fields: for field in fields:
# change field IDs as options are stored in data with their # change field IDs as options are stored in data with their
# varnames, not id. # varnames, not id.
field.id = field.varname or field.id field.id = field.varname or field.id
if hasattr(field, 'default_value') and data.get(field.varname) is None: if hasattr(field, 'default_value') and data.get(field.varname) is None:
data[field.varname] = field.default_value if isinstance(field.default_value, str):
data[field.varname] = field.convert_value_from_str(field.default_value)
else:
data[field.varname] = field.default_value
super().__init__(fields, data) super().__init__(fields, data)
def inspect_keys(self): def inspect_keys(self):

View File

@ -82,5 +82,15 @@ class AnonymiseWorkflowStatusItem(WorkflowStatusItem):
default_value=self.__class__.mode, default_value=self.__class__.mode,
) )
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_perform = self.perform
def perform(formdata):
original_perform(formdata)
formdata.anonymisation_performed = True
setattr(self, 'perform', perform)
return self
register_item_class(AnonymiseWorkflowStatusItem) register_item_class(AnonymiseWorkflowStatusItem)

View File

@ -86,5 +86,8 @@ class ModifyCriticalityWorkflowStatusItem(WorkflowStatusItem):
elif self.mode == MODE_SET: elif self.mode == MODE_SET:
formdata.set_criticality_level(int(self.absolute_value)) formdata.set_criticality_level(int(self.absolute_value))
def get_workflow_test_action(self, *args, **kwargs):
return self
register_item_class(ModifyCriticalityWorkflowStatusItem) register_item_class(ModifyCriticalityWorkflowStatusItem)

View File

@ -169,5 +169,8 @@ class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
location = '%sitems/%s/' % (base_location, self.id) location = '%sitems/%s/' % (base_location, self.id)
yield location, None, self.message yield location, None, self.message
def get_workflow_test_action(self, *args, **kwargs):
return self
register_item_class(DisplayMessageWorkflowStatusItem) register_item_class(DisplayMessageWorkflowStatusItem)

View File

@ -63,5 +63,16 @@ class RedirectToUrlWorkflowStatusItem(WorkflowStatusItem):
return # don't redirect return # don't redirect
return url return url
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_perform = self.perform
def perform(formdata):
url = original_perform(formdata)
formdata.redirect_to_url = url
return url
setattr(self, 'perform', perform)
return self
register_item_class(RedirectToUrlWorkflowStatusItem) register_item_class(RedirectToUrlWorkflowStatusItem)

View File

@ -228,20 +228,34 @@ class RegisterCommenterWorkflowStatusItem(WorkflowStatusItem):
# the comment can use attachments done above # the comment can use attachments done above
if comment: if comment:
try: part = self.get_journal_evolution_part(formdata, comment)
formdata.evolution[-1].add_part( if part:
JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level) formdata.evolution[-1].add_part(part)
)
formdata.store() formdata.store()
except TemplateError as e:
get_publisher().record_error( def get_journal_evolution_part(self, formdata, comment):
_('Error in template, comment could not be generated'), formdata=formdata, exception=e try:
) return JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level)
except TemplateError as e:
get_publisher().record_error(
_('Error in template, comment could not be generated'), formdata=formdata, exception=e
)
def i18n_scan(self, base_location): def i18n_scan(self, base_location):
location = '%sitems/%s/' % (base_location, self.id) location = '%sitems/%s/' % (base_location, self.id)
if not self.comment_template: if not self.comment_template:
yield location, None, self.comment yield location, None, self.comment
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_get_journal_evolution_part = self.get_journal_evolution_part
def get_journal_evolution_part(formdata, comment):
part = original_get_journal_evolution_part(formdata, comment)
formdata.history_messages.append(part.content)
return part
setattr(self, 'get_journal_evolution_part', get_journal_evolution_part)
return self
register_item_class(RegisterCommenterWorkflowStatusItem) register_item_class(RegisterCommenterWorkflowStatusItem)

View File

@ -28,11 +28,12 @@ from wcs.qommon.form import (
RadiobuttonsWidget, RadiobuttonsWidget,
SingleSelectWidget, SingleSelectWidget,
StringWidget, StringWidget,
TextWidget,
WidgetList, WidgetList,
) )
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from wcs.qommon.xml_storage import XmlStorableObject from wcs.qommon.xml_storage import XmlStorableObject
from wcs.testdef import TestError, WebserviceResponse from wcs.testdef import TestError
from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget
from wcs.wf.profile import FieldNode from wcs.wf.profile import FieldNode
@ -90,9 +91,7 @@ class WorkflowTests(XmlStorableObject):
formdata.workflow_test = True formdata.workflow_test = True
formdata.frozen_receipt_time = formdata.receipt_time formdata.frozen_receipt_time = formdata.receipt_time
formdata.sent_sms = [] self.reset_formdata_test_attributes(formdata)
formdata.sent_emails = []
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
formdata.perform_workflow() formdata.perform_workflow()
for action in self.actions: for action in self.actions:
@ -102,9 +101,7 @@ class WorkflowTests(XmlStorableObject):
continue continue
if not action.is_assertion: if not action.is_assertion:
formdata.sent_sms.clear() self.reset_formdata_test_attributes(formdata)
formdata.sent_emails.clear()
formdata.used_webservice_responses.clear()
try: try:
action.perform(formdata) action.perform(formdata)
@ -127,6 +124,14 @@ class WorkflowTests(XmlStorableObject):
formdata.store = lambda *args, **kwargs: None formdata.store = lambda *args, **kwargs: None
def reset_formdata_test_attributes(self, formdata):
formdata.sent_sms = []
formdata.sent_emails = []
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
formdata.anonymisation_performed = False
formdata.redirect_to_url = None
formdata.history_messages = []
def get_new_action_id(self): def get_new_action_id(self):
if not self.actions: if not self.actions:
return '1' return '1'
@ -145,7 +150,12 @@ class WorkflowTests(XmlStorableObject):
'webservice_call': AssertWebserviceCall, 'webservice_call': AssertWebserviceCall,
'set-backoffice-fields': AssertBackofficeFieldValues, 'set-backoffice-fields': AssertBackofficeFieldValues,
'button': ButtonClick, 'button': ButtonClick,
'global-action-button': ButtonClick,
'timeout-jump': SkipTime, 'timeout-jump': SkipTime,
'anonymise': AssertAnonymise,
'redirect_to_url': AssertRedirect,
'register-comment': AssertHistoryMessage,
'modify_criticality': AssertCriticality,
} }
previous_trace = None previous_trace = None
@ -196,6 +206,7 @@ class WorkflowTestAction(XmlStorableObject):
optional_fields = [] optional_fields = []
is_assertion = True is_assertion = True
editable = True
XML_NODES = [ XML_NODES = [
('id', 'str'), ('id', 'str'),
@ -266,14 +277,26 @@ class ButtonClick(WorkflowTestAction):
return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user} return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user}
def set_attributes_from_trace(self, formdef, trace, previous_trace=None): def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
try: if 'action_item_id' in trace.event_args:
item = [ try:
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id'] button_name = [
][0] x.label
except IndexError: for x in self.get_all_choice_actions(formdef)
return if x.id == trace.event_args['action_item_id']
][0]
except IndexError:
return
elif 'global_action_id' in trace.event_args:
try:
button_name = [
x.name
for x in self.get_all_global_actions(formdef)
if x.id == trace.event_args['global_action_id']
][0]
except IndexError:
return
self.button_name = item.label self.button_name = button_name
def perform(self, formdata): def perform(self, formdata):
if self.who == 'receiver': if self.who == 'receiver':
@ -306,8 +329,15 @@ class ButtonClick(WorkflowTestAction):
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status: if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
yield item yield item
@staticmethod
def get_all_global_actions(formdef):
for action in formdef.workflow.global_actions or []:
if not action.is_interactive():
yield action
def fill_admin_form(self, form, formdef): def fill_admin_form(self, form, formdef):
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)} possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
possible_button_names.update(action.name for action in self.get_all_global_actions(formdef))
if not possible_button_names: if not possible_button_names:
return return
@ -631,20 +661,22 @@ class AssertWebserviceCall(WorkflowTestAction):
label = _('Assert webservice call') label = _('Assert webservice call')
key = 'assert-webservice-call' key = 'assert-webservice-call'
webservice_response_id = None webservice_response_uuid = None
call_count = 1 call_count = 1
optional_fields = ['call_count'] optional_fields = ['call_count']
XML_NODES = WorkflowTestAction.XML_NODES + [ XML_NODES = WorkflowTestAction.XML_NODES + [
('webservice_response_id', 'str'), ('webservice_response_uuid', 'str'),
('call_count', 'int'), ('call_count', 'int'),
] ]
@property @property
def details_label(self): def details_label(self):
webservice_responses = [ webservice_responses = [
x for x in self.parent.testdef.get_webservice_responses() if x.id == self.webservice_response_id x
for x in self.parent.testdef.get_webservice_responses()
if x.uuid == self.webservice_response_uuid
] ]
if webservice_responses: if webservice_responses:
return webservice_responses[0].name return webservice_responses[0].name
@ -664,13 +696,17 @@ class AssertWebserviceCall(WorkflowTestAction):
def perform(self, formdata): def perform(self, formdata):
try: try:
response = WebserviceResponse.get(self.webservice_response_id) response = [
except KeyError: x
for x in self.parent.testdef.get_webservice_responses()
if x.uuid == self.webservice_response_uuid
][0]
except IndexError:
raise WorkflowTestError(_('Broken, missing webservice response')) raise WorkflowTestError(_('Broken, missing webservice response'))
call_count = 0 call_count = 0
for used_response in formdata.used_webservice_responses.copy(): for used_response in formdata.used_webservice_responses.copy():
if used_response.id == self.webservice_response_id: if used_response.uuid == self.webservice_response_uuid:
formdata.used_webservice_responses.remove(used_response) formdata.used_webservice_responses.remove(used_response)
call_count += 1 call_count += 1
@ -682,7 +718,7 @@ class AssertWebserviceCall(WorkflowTestAction):
def fill_admin_form(self, form, formdef): def fill_admin_form(self, form, formdef):
webservice_response_options = [ webservice_response_options = [
(response.id, response.name, response.id) (response.uuid, response.name, response.uuid)
for response in self.parent.testdef.get_webservice_responses() for response in self.parent.testdef.get_webservice_responses()
] ]
@ -691,11 +727,11 @@ class AssertWebserviceCall(WorkflowTestAction):
form.add( form.add(
SingleSelectWidget, SingleSelectWidget,
'webservice_response_id', 'webservice_response_uuid',
title=_('Webservice response'), title=_('Webservice response'),
options=webservice_response_options, options=webservice_response_options,
required=True, required=True,
value=self.webservice_response_id, value=self.webservice_response_uuid,
) )
form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count) form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count)
@ -760,3 +796,163 @@ class AssertSMS(WorkflowTestAction):
title=_('Body'), title=_('Body'),
value=self.body, value=self.body,
) )
class AssertAnonymise(WorkflowTestAction):
label = _('Assert anonymisation is performed')
key = 'assert-anonymise'
editable = False
details_label = ''
def perform(self, formdata):
if not formdata.anonymisation_performed:
raise WorkflowTestError(_('Form was not anonymised.'))
class AssertRedirect(WorkflowTestAction):
label = _('Assert redirect is performed')
key = 'assert-redirect'
url = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('url', 'str'),
]
@property
def details_label(self):
return self.url
def perform(self, formdata):
if not formdata.redirect_to_url:
raise WorkflowTestError(_('No redirection occured.'))
if formdata.redirect_to_url != self.url:
raise WorkflowTestError(
_('Expected redirection to %(expected_url)s but was redirected to %(url)s.')
% {'expected_url': self.url, 'url': formdata.redirect_to_url}
)
def fill_admin_form(self, form, formdef):
form.add(
StringWidget,
'url',
title=_('URL'),
value=self.url,
)
class AssertHistoryMessage(WorkflowTestAction):
label = _('Assert history message is displayed')
details_label = ''
key = 'assert-history-message'
message = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('message', 'str'),
]
def perform(self, formdata):
try:
message = formdata.history_messages.pop(0)
except IndexError:
raise WorkflowTestError(_('No history message.'))
if self.message not in message:
details = [
_('Displayed history message: %s') % message,
_('Expected history message: %s') % self.message,
]
raise WorkflowTestError(_('Wrong history message content.'), details=details)
def fill_admin_form(self, form, formdef):
form.add(
TextWidget,
'message',
title=_('Message'),
value=self.message,
hint=_('Assertion will pass if the text is contained in history message.'),
)
class AssertAlert(WorkflowTestAction):
label = _('Assert alert is displayed')
details_label = ''
key = 'assert-alert'
message = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('message', 'str'),
]
def perform(self, formdata):
messages = formdata.get_workflow_messages()
for message in messages:
if self.message in message:
break
else:
details = [
_('Displayed alerts: %s') % (', '.join(messages) if messages else _('None')),
_('Expected alert: %s') % self.message,
]
raise WorkflowTestError(_('No alert matching message.'), details=details)
def fill_admin_form(self, form, formdef):
form.add(
TextWidget,
'message',
title=_('Message'),
value=self.message,
hint=_('Assertion will pass if the text is contained in alert message.'),
)
class AssertCriticality(WorkflowTestAction):
label = _('Assert criticality level')
empty_form_error = _('Workflow has no criticality levels.')
key = 'assert-criticality'
level_id = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('level_id', 'str'),
]
@property
def details_label(self):
levels = [
x for x in self.parent.testdef.formdef.workflow.criticality_levels or [] if x.id == self.level_id
]
if not levels:
return _('Broken, missing criticality level')
return _('Criticality is "%s"') % levels[0].name
def perform(self, formdata):
levels = [x for x in formdata.formdef.workflow.criticality_levels or [] if x.id == self.level_id]
if not levels:
raise WorkflowTestError(_('Broken, missing criticality level'))
current_level = formdata.get_criticality_level_object()
if current_level.id != self.level_id:
raise WorkflowTestError(
_('Form should have criticality level "%(expected_level)s" but has level "%(level)s".')
% {'expected_level': levels[0].name, 'level': current_level.name}
)
def fill_admin_form(self, form, formdef):
if not formdef.workflow.criticality_levels:
return
form.add(
SingleSelectWidget,
'level_id',
title=_('Name'),
value=self.level_id,
options=[(x.id, x.name, x.id) for x in formdef.workflow.criticality_levels],
)

View File

@ -720,6 +720,9 @@ class WorkflowVariablesFieldsFormDef(FormDef):
base_url = get_publisher().get_backoffice_url() base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id) return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id)
def get_field_admin_url(self, field):
return self.get_admin_url() + '%s/' % field.id
def get_new_field_id(self): def get_new_field_id(self):
return str(uuid.uuid4()) return str(uuid.uuid4())
@ -2733,7 +2736,7 @@ class WorkflowStatus(SerieOfActionsMixin):
if check_replay and form.get('_ts') != str(filled.last_update_time.timestamp()): if check_replay and form.get('_ts') != str(filled.last_update_time.timestamp()):
raise ReplayException() raise ReplayException()
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user): for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
if 'button-action-%s' % action.id in get_request().form: if form.get_submit() == 'button-action-%s' % action.id:
if action.is_interactive(): if action.is_interactive():
return action.get_global_interactive_form_url(formdef=filled.formdef, ids=[filled.id]) return action.get_global_interactive_form_url(formdef=filled.formdef, ids=[filled.id])
filled.record_workflow_event('global-action-button', global_action_id=action.id) filled.record_workflow_event('global-action-button', global_action_id=action.id)