Compare commits

...

30 Commits

Author SHA1 Message Date
Pierre Ducroquet ac48ebb70d sql: test purge of search tokens (#86527)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-27 16:42:22 +01:00
Pierre Ducroquet 2962b9dd3f wcs_search_tokens: new FTS mechanism with fuzzy-match (#86527)
introduce a new mechanism to implement FTS with fuzzy-match.
This is made possible by adding and maintaining a table of the
FTS tokens, wcs_search_tokens, fed with searchable_formdefs
and wcs_all_forms.
When a query is issued, its tokens are matched against the
tokens with a fuzzy match when no direct match is found, and
the query is then rebuilt.
2024-03-27 16:42:22 +01:00
Pierre Ducroquet 03015aa750 tests: add a test for new FTS on formdefs (#86527) 2024-03-27 16:42:22 +01:00
Valentin Deniaud dc473b7378 workflow_tests: preserve response of webservice assertion on test duplication (#88729)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-27 11:23:33 +01:00
Frédéric Péters d0358afa40 tests: check update of items relations (#88687)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-26 14:53:21 +01:00
Frédéric Péters 4d5b309986 misc: use custom id in breadcrumb (#88557)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-23 12:15:31 +01:00
Lauréline Guérin e0857ce653
admin: rename overwrite button for blockdef (#88502)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-22 10:51:45 +01:00
Frédéric Péters 8c3374e790 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 19:00:39 +01:00
Frédéric Péters 03435d40a6 backoffice: warn about data loss when removing a page fields (#87505)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 18:59:35 +01:00
Frédéric Péters 70b7087ad9 backoffice: add support for default value for date workflow options (#88346) 2024-03-21 18:59:28 +01:00
Valentin Deniaud 9afbbccb13 workflow_tests: add support for global action in button click action (#88311)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 15:07:55 +01:00
Corentin Sechet 0c225cf254 misc: increase DATA_UPLOAD_MAX_NUMBER_FIELDS (#88443)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 14:33:40 +01:00
Valentin Deniaud a23457fdbf tests: really fix results count in test_block_test_results (#88458)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 13:52:29 +01:00
Frédéric Péters 9c12c01712 misc: ignore no_data fields in filter_by (#88454)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 13:46:32 +01:00
Thomas NOËL 89b4d350ab translation update
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-21 12:08:31 +01:00
Valentin Deniaud 721bdc4e44 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 12:05:59 +01:00
Thomas NOËL 955f012b3d forms: add option to control max number of drafts per user (#88237)
gitea/wcs/pipeline/head There was a failure building this commit Details
2024-03-21 11:41:47 +01:00
Valentin Deniaud 0ed9d5d0a0 tests: fix results count in test_block_test_results (#88445)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 11:08:35 +01:00
Valentin Deniaud 6fd4b87ff5 workflow_tests: allow testing criticality workflow action (#88108)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 10:44:06 +01:00
Valentin Deniaud d4c3e7dc4e workflow_tests: allow testing alert workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 7199e84903 workflow_tests: allow testing history message workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud e76e33808b workflow_tests: move formdata test attributes to method (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 0d82f03e59 workflow_tests: allow testing redirect_to_url workflow action (#88108) 2024-03-21 10:44:06 +01:00
Valentin Deniaud 03669bb847 workflow_tests: allow testing anonymise workflow action (#88108) 2024-03-21 10:44:06 +01:00
Frédéric Péters c0d2d36b3c translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 09:20:33 +01:00
Frédéric Péters bdb24e21e9 workflows: give correct URL for variable fields URLs (#88435)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 08:59:53 +01:00
Frédéric Péters 3a4b8c9cc7 misc: always declare lingo_url if lingo is deployed (#88419)
gitea/wcs/pipeline/head Build queued... Details
2024-03-21 08:59:44 +01:00
Frédéric Péters 083f3cf3dd misc: check "manual address" box when a manual address has been entered (#88332)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-21 08:59:38 +01:00
Frédéric Péters 520e52d1a7 cards: extend "update cards" checkbox to CSV imports (#88294)
gitea/wcs/pipeline/head Build queued... Details
2024-03-21 08:59:29 +01:00
Frédéric Péters 69249df789 misc: do not allow storing AnyFormData objects (#88338)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-20 17:07:33 +01:00
42 changed files with 1542 additions and 88 deletions

View File

@ -386,6 +386,7 @@ Une API existe pour récupérer le schéma de données dun modèle de fiches.
"disabled_redirection" : null,
"discussion" : false,
"drafts_lifespan" : null,
"drafts_max_per_user" : null,
"enable_tracking_codes" : false,
"expiration_date" : null,
"fields" : [

View File

@ -642,6 +642,8 @@ def test_block_field_statistics_data_update(pub):
def test_block_test_results(pub):
create_superuser(pub)
TestDef.wipe()
TestResult.wipe()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'

View File

@ -23,7 +23,12 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError
from wcs.workflows import (
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowImportError,
WorkflowVariablesFieldsFormDef,
)
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -102,6 +107,10 @@ def test_deprecations(pub):
workflow.backoffice_fields_formdef.fields = [
fields.TableField(id='bo1', label='table field'),
]
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow)
workflow.variables_formdef.fields = [
fields.TableField(id='wfvar1', label='other table field'),
]
st0 = workflow.add_status('Status0', 'st0')
display = st0.add_action('displaymsg')
@ -266,6 +275,7 @@ def test_deprecations(pub):
assert [x.text for x in resp.pyquery('.section--fields li a')] == [
'foobar / Field "table field"',
'foobar / Field "ranked field"',
'Options of workflow "test" / Field "other table field"',
'Backoffice fields of workflow "test" / Field "table field"',
]
assert [x.text for x in resp.pyquery('.section--actions li a')] == [

View File

@ -284,6 +284,7 @@ def test_forms_edit_tracking_code(pub, formdef):
resp = resp.click('Form Tracking')
assert resp.forms[0]['drafts_lifespan'].value == ''
assert resp.forms[0]['drafts_max_per_user'].value == ''
resp = resp.forms[0].submit().follow() # check empty value is ok
resp = resp.click('Form Tracking')
@ -297,6 +298,20 @@ def test_forms_edit_tracking_code(pub, formdef):
resp = resp.forms[0].submit().follow()
assert FormDef.get(1).drafts_lifespan == '5'
resp = resp.click('Form Tracking')
resp.forms[0]['drafts_max_per_user'].value = 'xxx'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '120'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '1'
resp = resp.forms[0].submit()
assert 'Maximum must be between 2 and 100 drafts.' in resp
resp.forms[0]['drafts_max_per_user'].value = '3'
resp = resp.forms[0].submit().follow()
assert FormDef.get(1).drafts_max_per_user == '3'
formdef.fields = [
fields.StringField(id='1', label='VerifyString'),
fields.DateField(id='2', label='VerifyDate'),

View File

@ -1167,6 +1167,11 @@ def test_tests_duplicate(pub):
response.name = 'Response xxx'
response.store()
testdef.workflow_tests.actions.append(
workflow_tests.AssertWebserviceCall(id='3', webservice_response_uuid=response.uuid),
)
testdef.store()
app = login(get_app(pub))
assert TestDef.count() == 1
@ -1196,6 +1201,8 @@ def test_tests_duplicate(pub):
assert testdef2.workflow_tests.actions[0].button_name == 'Go to end status'
assert testdef1.get_webservice_responses()[0].name == 'Changed'
assert testdef2.get_webservice_responses()[0].name == 'Response xxx'
assert testdef1.workflow_tests.actions[2].details_label == 'Changed'
assert testdef2.workflow_tests.actions[2].details_label == 'Response xxx'
resp = app.get('/backoffice/forms/1/tests/%s/' % testdef.id)
resp = resp.click('Duplicate')

View File

@ -9,7 +9,7 @@ from wcs import workflow_tests
from wcs.formdef import FormDef, fields
from wcs.qommon.http_request import HTTPRequest
from wcs.testdef import TestDef, WebserviceResponse
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowCriticalityLevel
from ..utilities import create_temporary_pub, get_app, login
from .test_all import create_superuser
@ -237,10 +237,16 @@ def test_workflow_tests_action_button_click(pub):
jump = new_status.add_action('choice')
jump.label = 'Button no target status'
workflow.add_global_action('Action 1')
interactive_action = workflow.add_global_action('Interactive action (should not be shown)')
interactive_action.add_action('form')
workflow.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
assert resp.form['button_name'].options == [
('Action 1', False, 'Action 1'),
('Button 1', False, 'Button 1'),
('Button 2', False, 'Button 2'),
('Button 4 (not available)', True, 'Button 4 (not available)'),
@ -418,6 +424,156 @@ def test_workflow_tests_action_assert_sms(pub):
assert escape('SMS to 0123456789 (+2)') in resp.text
def test_workflow_tests_action_assert_anonymise(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertAnonymise(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'Edit' not in resp.text
def test_workflow_tests_action_assert_redirect(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['url'] = 'http://example.com'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
assert 'http://example.com' in resp.text
def test_workflow_tests_action_assert_history_message(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['message'] = 'Hello'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
def test_workflow_tests_action_assert_alert(pub):
create_superuser(pub)
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
resp.form['message'] = 'Hello'
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
def test_workflow_tests_action_assert_criticality(pub):
create_superuser(pub)
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.workflow_id = workflow.id
formdef.name = 'test title'
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.workflow_tests.actions = [
workflow_tests.AssertCriticality(id='1'),
]
testdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
assert 'not configured' in resp.text
resp = resp.click('Edit')
assert 'Workflow has no criticality levels.' in resp.text
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='red'),
]
workflow.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
resp.form['level_id'].select(text='green')
resp = resp.form.submit().follow()
assert 'not configured' not in resp.text
assert escape('Criticality is "green"') in resp.text
def test_workflow_tests_action_assert_backoffice_field(pub):
create_superuser(pub)
@ -508,13 +664,13 @@ def test_workflow_tests_action_assert_webservice_call(pub):
response3.store()
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
assert resp.form['webservice_response_id'].options == [
(str(response.id), False, 'Fake response'),
(str(response2.id), False, 'Fake response 2'),
assert resp.form['webservice_response_uuid'].options == [
(str(response.uuid), False, 'Fake response'),
(str(response2.uuid), False, 'Fake response 2'),
]
assert resp.form['call_count'].value == '1'
resp.form['webservice_response_id'] = 1
resp.form['webservice_response_uuid'] = response.uuid
resp.form['call_count'] = 2
resp = resp.form.submit().follow()
@ -522,7 +678,7 @@ def test_workflow_tests_action_assert_webservice_call(pub):
assert 'Broken' not in resp.text
assert_webservice_call = TestDef.get(testdef.id).workflow_tests.actions[0]
assert assert_webservice_call.webservice_response_id == '1'
assert assert_webservice_call.webservice_response_uuid == response.uuid
assert assert_webservice_call.call_count == 2
response.remove_self()

View File

@ -429,6 +429,9 @@ def test_backoffice_submission_formdef_list_search(pub, local_user, access, auth
resp = get_url('/api/formdefs/?backoffice-submission=on&q=test')
assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=tes')
assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz')
assert len(resp.json['data']) == 0

View File

@ -398,6 +398,7 @@ def test_backoffice_card_item_link_id_template(pub):
resp = resp.form.submit('submit')
assert resp.location.endswith('/backoffice/data/foo/blah/')
resp = resp.follow()
assert resp.pyquery('.breadcrumbs a')[-1].attrib['href'] == '/backoffice/data/foo/blah/'
resp = app.get('/backoffice/data/foo/')
assert [x.attrib['href'] for x in resp.pyquery('table a')] == ['blah/', 'test/']
@ -681,6 +682,58 @@ def test_backoffice_cards_import_data_csv_no_backoffice_fields(pub):
assert carddef.data_class().count() == 2
def test_backoffice_cards_import_data_csv_custom_id_no_update(pub):
user = create_user(pub)
user.name_identifiers = [str(uuid.uuid4())]
user.store()
Workflow.wipe()
workflow = Workflow(name='form-title')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(id='bo0', varname='foo_bovar', label='bo variable'),
]
workflow.add_status('st0')
workflow.store()
CardDef.wipe()
carddef = CardDef()
carddef.name = 'test'
carddef.fields = [
fields.StringField(id='1', label='String', varname='custom_id'),
fields.ItemField(id='2', label='List', items=['item1', 'item2']),
]
carddef.backoffice_submission_roles = user.roles
carddef.id_template = '{{form_var_custom_id}}'
carddef.workflow = workflow
carddef.store()
carddef.data_class().wipe()
card = carddef.data_class()()
card.data = {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'}
card.just_created()
card.store()
app = login(get_app(pub))
data = b'''\
"String","List"
"plop","item1"
"test","item2"
'''
resp = app.get('/backoffice/data/test/import-file')
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
resp.form['update_existing_cards'].checked = False
resp = resp.forms[0].submit().follow()
assert carddef.data_class().count() == 2
card.refresh_from_storage()
assert card.data == {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'} # no change
other_card = carddef.data_class().select(order_by='-receipt_time')[0]
assert other_card.data == {'1': 'test', '2': 'item2', '2_display': 'item2', 'bo0': None}
assert other_card.id_display == 'test'
def test_backoffice_cards_import_data_csv_custom_id_update(pub):
user = create_user(pub)
user.name_identifiers = [str(uuid.uuid4())]
@ -721,6 +774,7 @@ def test_backoffice_cards_import_data_csv_custom_id_update(pub):
'''
resp = app.get('/backoffice/data/test/import-file')
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
resp.form['update_existing_cards'].checked = True
resp = resp.forms[0].submit().follow()
assert carddef.data_class().count() == 2

View File

@ -22,6 +22,7 @@ from wcs.workflows import (
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel,
WorkflowVariablesFieldsFormDef,
)
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -1205,3 +1206,72 @@ def test_inspect_page_idp_role(pub):
resp.pyquery('[data-function-key="_receiver"] a').attr.href
== 'https://idp.example.net/manage/roles/uuid:d4b59e1ffb204dfd99fd3760f4952999/'
)
def test_inspect_page_form_option(pub):
create_user(pub, is_admin=True)
FormDef.wipe()
wf = Workflow(name='variables')
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.add_status('st1')
wf.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.workflow = wf
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
app = login(get_app(pub))
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert 'form_option' not in resp.text
wf.variables_formdef.fields = [
fields.StringField(label='String test', varname='string_test'),
fields.DateField(label='Date test', varname='date_test'),
]
wf.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'None (no value)'
)
wf.variables_formdef.fields[0].default_value = 'xxx'
wf.variables_formdef.fields[1].default_value = '2024-03-20'
wf.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'xxx'
)
assert (
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
== '2024-03-20'
)
assert (
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
== '2024 (integer number)'
)
formdef.workflow_options = {'string_test': 'yyy', 'date_test': datetime.date(2024, 3, 21).timetuple()}
formdef.store()
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
assert (
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
== 'yyy'
)
assert (
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
== '2024-03-21'
)
assert (
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
== '2024 (integer number)'
)

View File

@ -414,6 +414,14 @@ def test_form_max_drafts(pub):
assert not formdef.data_class().has_key(drafts[0].id) # oldest draft was removed
formdef.drafts_max_per_user = '3'
formdef.store()
resp = app.get('/test/')
resp.form['f0'] = 'hello2'
resp = resp.form.submit('submit')
assert formdef.data_class().count([Equal('status', 'draft')]) == 4
def test_form_draft_temporary_access_url(pub):
FormDef.wipe()

View File

@ -1626,6 +1626,57 @@ def test_card_update_related_cascading_loop(pub):
assert carddata2.data['2_display'] == 'card1 card2 card1 None'
def test_card_update_related_items_relation(pub):
CardDef.wipe()
FormDef.wipe()
carddef = CardDef()
carddef.name = 'foo'
carddef.fields = [
StringField(id='1', label='Test', varname='foo'),
]
carddef.digest_templates = {'default': '{{ form_var_foo }}'}
carddef.store()
carddef.data_class().wipe()
carddata1 = carddef.data_class()()
carddata1.data = {'1': 'card1'}
carddata1.just_created()
carddata1.store()
carddata2 = carddef.data_class()()
carddata2.data = {'1': 'card2'}
carddata2.just_created()
carddata2.store()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
ItemField(id='1', label='Test', data_source={'type': 'carddef:foo'}),
ItemsField(id='2', label='Test2', data_source={'type': 'carddef:foo'}),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '1', '2': ['1', '2']}
formdata.data['1_display'] = formdef.fields[0].store_display_value(formdata.data, formdef.fields[0].id)
formdata.data['2_display'] = formdef.fields[1].store_display_value(formdata.data, formdef.fields[1].id)
assert formdata.data['1_display'] == 'card1'
assert formdata.data['2_display'] == 'card1, card2'
formdata.just_created()
formdata.store()
pub.cleanup()
carddef = carddef.get(carddef.id)
carddata1 = carddef.data_class().get(carddata1.id)
carddata1.data = {'1': 'card1-change1'}
carddata1.store()
formdata.refresh_from_storage()
assert formdata.data['1_display'] == 'card1-change1'
assert formdata.data['2_display'] == 'card1-change1, card2'
def test_card_update_related_deleted(pub):
BlockDef.wipe()
CardDef.wipe()

View File

@ -1997,6 +1997,31 @@ def test_lazy_formdata_queryset_filter_non_unique_varname(pub, variable_test_dat
assert tmpl.render(context) == '1'
def test_filter_on_page_field(pub):
pub.loggederror_class.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.PageField(id='1', label='Page', varname='page'),
]
formdef.store()
data_class = formdef.data_class()
formdata = data_class()
formdata.just_created()
formdata.store()
context = pub.substitutions.get_context_variables(mode='lazy')
tmpl = Template('{{forms|objects:"test"|filter_by:"page"|filter_value:"100"}}')
tmpl.render(context)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid filter "page"'
def test_numeric_filter_on_string(pub):
FormDef.wipe()
formdef = FormDef()

View File

@ -76,6 +76,12 @@ HOBO_JSON = {
},
],
},
{
'service-id': 'lingo',
'title': 'Lingo',
'base_url': 'http://payment.example.net/',
'secret_key': 'aaa',
},
],
'profile': {
'fields': [
@ -293,6 +299,7 @@ def test_configure_site_options(setuptest, alt_tempdir):
assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD'
assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/'
assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/'
assert pub.get_site_option('lingo_url', 'variables') == 'http://payment.example.net/'
assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/'
assert pub.get_site_option('disable_cron_jobs', 'variables') == 'True'
assert pub.get_site_option('maintenance_page', 'variables') == 'True'

View File

@ -1183,6 +1183,45 @@ def test_sql_criteria_fts(pub):
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
def test_search_tokens_purge(pub):
_, cur = sql.get_connection_and_cursor()
# purge garbage from other tests
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
start = cur.fetchone()[0]
# define a new table
test_formdef = FormDef()
test_formdef.name = 'tableSelectFTStokens'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 1
t = data_class()
t.data = {'3': 'foofortokensofcourse'}
t.just_created()
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
t.data = {'3': 'chaussettefortokensofcourse'}
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 3
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
def table_exists(cur, table_name):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
@ -1692,6 +1731,26 @@ def test_load_all_evolutions_on_any_formdata(pub):
assert len([x for x in objects if x._evolution is not None]) == 100
def test_store_on_any_formdata(pub):
drop_formdef_tables()
formdef = FormDef()
formdef.name = 'test any store'
formdef.fields = []
formdef.store()
data_class = formdef.data_class(mode='sql')
formdata = data_class()
formdata.just_created()
formdata.receipt_time = localtime()
formdata.store()
objects = sql.AnyFormData.select()
assert len(objects) == 1
with pytest.raises(TypeError):
objects[0].store()
def test_geoloc_in_global_view(pub):
drop_formdef_tables()

View File

@ -9,7 +9,12 @@ from wcs.qommon.http_request import HTTPRequest
from wcs.testdef import TestDef, WebserviceResponse
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.workflow_tests import WorkflowTestError
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
from wcs.workflows import (
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel,
WorkflowStatusItem,
)
from .backoffice_pages.test_all import create_user
from .utilities import create_temporary_pub, get_app, login
@ -204,6 +209,65 @@ def test_workflow_tests_button_click(pub):
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
def test_workflow_tests_button_click_global_action(pub):
role = pub.role_class(name='test role')
role.store()
user = pub.user_class(name='test user')
user.roles = [role.id]
user.store()
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
end_status = workflow.add_status(name='End status')
global_action = workflow.add_global_action('Go to end status')
global_action.triggers[0].roles = [role.id]
sendmail = global_action.add_action('sendmail')
sendmail.to = ['test@example.org']
sendmail.subject = 'In new status'
sendmail.body = 'xxx'
jump = global_action.add_action('jump')
jump.status = end_status.id
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='End status'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='Go to end status'),
workflow_tests.AssertEmail(),
workflow_tests.AssertStatus(status_name='End status'),
]
testdef.run(formdef)
# hide button from test user
user.roles = []
user.store()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
def test_workflow_tests_button_click_who(pub):
role = pub.role_class(name='test role')
role.store()
@ -603,6 +667,230 @@ def test_workflow_tests_sms(pub):
assert 'SMS body: "Hello"' in excinfo.value.details
def test_workflow_tests_anonymise(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertAnonymise(),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form was not anonymised.'
anonymise_action = new_status.add_action('anonymise')
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
anonymise_action.mode = 'intermediate'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
anonymise_action.mode = 'unlink_user'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
def test_workflow_tests_redirect(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(url='https://example.com/'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No redirection occured.'
redirect_action = new_status.add_action('redirect_to_url')
redirect_action.url = 'https://test.com/'
workflow.store()
formdef.refresh_from_storage()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert (
str(excinfo.value)
== 'Expected redirection to https://example.com/ but was redirected to https://test.com/.'
)
testdef.workflow_tests.actions = [
workflow_tests.AssertRedirect(url='https://test.com/'),
]
testdef.run(formdef)
def test_workflow_tests_history_message(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(message='Hello 42'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No history message.'
register_comment = new_status.add_action('register-comment')
register_comment.comment = 'Hello {{ 41|add:1 }}'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.AssertHistoryMessage(message='Hello 43'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Wrong history message content.'
assert 'Displayed history message: <div>Hello 42</div>' in excinfo.value.details
assert 'Expected history message: Hello 43' in excinfo.value.details
def test_workflow_tests_alert(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(message='Hello 42'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: None' in excinfo.value.details
assert 'Expected alert: Hello 42' in excinfo.value.details
alert = new_status.add_action('displaymsg')
alert.message = 'Hello {{ 41|add:1 }}'
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.AssertAlert(message='Hello 43'),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'No alert matching message.'
assert 'Displayed alerts: <p>Hello 42</p>' in excinfo.value.details
assert 'Expected alert: Hello 43' in excinfo.value.details
def test_workflow_tests_criticality(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
green_level = WorkflowCriticalityLevel(name='green')
red_level = WorkflowCriticalityLevel(name='red')
workflow.criticality_levels = [green_level, red_level]
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.AssertCriticality(level_id=red_level.id),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Form should have criticality level "red" but has level "green".'
new_status.add_action('modify_criticality')
workflow.store()
formdef.refresh_from_storage()
testdef.run(formdef)
workflow.criticality_levels = []
workflow.store()
formdef.refresh_from_storage()
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Broken, missing criticality level'
def test_workflow_tests_backoffice_fields(pub):
user = pub.user_class(name='test user')
user.store()
@ -701,7 +989,7 @@ def test_workflow_tests_webservice(pub):
testdef.workflow_tests.actions = [
workflow_tests.AssertStatus(status_name='End status'),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
]
with pytest.raises(WorkflowTestError) as excinfo:
@ -716,7 +1004,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 2 times (instead of 1).'
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=2),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=2),
]
testdef.run(formdef)
@ -733,8 +1021,8 @@ def test_workflow_tests_webservice(pub):
response2.store()
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_id=response2.id, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response2.uuid, call_count=1),
]
testdef.run(formdef)
@ -744,8 +1032,8 @@ def test_workflow_tests_webservice(pub):
testdef.run(formdef)
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
]
with pytest.raises(WorkflowTestError) as excinfo:
@ -753,7 +1041,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=0),
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=0),
]
with pytest.raises(WorkflowTestError) as excinfo:
@ -761,7 +1049,7 @@ def test_workflow_tests_webservice(pub):
assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).'
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id='xxx', call_count=1),
workflow_tests.AssertWebserviceCall(webservice_response_uuid='xxx', call_count=1),
]
with pytest.raises(WorkflowTestError) as excinfo:
@ -834,6 +1122,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
status_with_button = workflow.add_status('Status with button', 'status-with-button')
transition_status = workflow.add_status('Transition status', 'transition-status')
transition_status2 = workflow.add_status('Transition status 2', 'transition-status-2')
end_status = workflow.add_status('End status', 'end-status')
jump = new_status.add_action('jump')
@ -864,7 +1153,24 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
sendsms.to = ['0123456789']
sendsms.body = 'Hello'
jump = transition_status.add_action('jump')
anonymise_action = transition_status.add_action('anonymise')
anonymise_action.mode = 'intermediate'
redirect_action = transition_status.add_action('redirect_to_url')
redirect_action.url = 'https://test.com/'
register_comment = transition_status.add_action('register-comment')
register_comment.comment = 'Hello'
transition_status.add_action('modify_criticality')
global_action = workflow.add_global_action('Action 1')
global_action.triggers[0].roles = [role.id]
jump = global_action.add_action('jump')
jump.status = transition_status2.id
jump = transition_status2.add_action('jump')
jump.status = end_status.id
workflow.store()
@ -889,6 +1195,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
app = login(get_app(pub))
resp = app.get(formdata.get_url())
resp.form.submit('button1').follow()
resp.form.submit('button-action-1').follow()
formdata.refresh_from_storage()
assert formdata.status == 'wf-end-status'
@ -896,7 +1203,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
testdef.run(formdef)
actions = testdef.workflow_tests.actions
assert len(actions) == 9
assert len(actions) == 15
assert actions[0].key == 'assert-status'
assert actions[0].status_name == 'Status with timeout jump'
@ -914,6 +1221,16 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
assert actions[5].key == 'assert-email'
assert actions[6].key == 'assert-backoffice-field'
assert actions[7].key == 'assert-sms'
assert actions[8].key == 'assert-anonymise'
assert actions[9].key == 'assert-redirect'
assert actions[10].key == 'assert-history-message'
assert actions[11].key == 'assert-criticality'
assert actions[12].key == 'assert-status'
assert actions[12].status_name == 'Transition status'
assert actions[13].key == 'button-click'
assert actions[13].button_name == 'Action 1'
assert actions[-1].key == 'assert-status'
assert actions[-1].status_name == 'End status'

View File

@ -152,7 +152,7 @@ class BlockDirectory(FieldsDirectory):
'Save snapshot'
)
r += htmltext('<li><a class="button button-paragraph" rel="popup" href="overwrite">%s</a>') % _(
'Overwrite'
'Overwrite with new import'
)
r += htmltext('</ul>')
r += htmltext('<h3>%s</h3>') % _('Navigation')

View File

@ -196,7 +196,20 @@ class FieldDefPage(Directory):
to_be_deleted.reverse()
# add delete_fields checkbox only if the page has fields
if to_be_deleted:
form.add(CheckboxWidget, 'delete_fields', title=_('Also remove all fields from the page'))
form.add(
CheckboxWidget,
'delete_fields',
title=_('Also remove all fields from the page'),
attrs={'data-dynamic-display-parent': 'true'},
)
form.widgets.append(
HtmlWidget(
'<div class="warningnotice" '
'data-dynamic-display-child-of="delete_fields" '
'data-dynamic-display-checked="true">%s</div>'
% _('Warning: the page fields data will be permanently deleted.')
)
)
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():

View File

@ -30,6 +30,7 @@ from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.formdef import (
DRAFTS_DEFAULT_LIFESPAN,
DRAFTS_DEFAULT_MAX_PER_USER,
FormDef,
FormdefImportError,
FormdefImportRecoverableError,
@ -290,6 +291,23 @@ class OptionsDirectory(Directory):
widget.validation_function = check_lifespan
widget.validation_function_error_message = _('Lifespan must be between 2 and 100 days.')
widget = form.add(
WcsExtraStringWidget,
'drafts_max_per_user',
title=_('Maximum number of drafts per user (between 2 and 100)'),
value=self.formdef.drafts_max_per_user,
hint=_('%s drafts per user by default') % DRAFTS_DEFAULT_MAX_PER_USER,
)
def check_max_per_user(value):
try:
return bool(int(value) >= 2 and int(value) <= 100)
except (ValueError, TypeError):
return False
widget.validation_function = check_max_per_user
widget.validation_function_error_message = _('Maximum must be between 2 and 100 drafts.')
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Tracking Code')))
form.add(
CheckboxWidget,
@ -495,6 +513,7 @@ class OptionsDirectory(Directory):
'id_template',
'submission_lateral_template',
'drafts_lifespan',
'drafts_max_per_user',
'user_support',
'management_sidebar_items',
]

View File

@ -155,6 +155,7 @@ class TestPage(FormBackOfficeStatusPage):
self.testdef = TestDef.get(component)
except KeyError:
raise TraversalError()
self.testdef.formdef = objectdef
filled = self.testdef.build_formdata(objectdef, include_fields=True)
super().__init__(objectdef, filled)

View File

@ -228,10 +228,16 @@ class CardPage(FormPage):
form.add(
CheckboxWidget,
'update_existing_cards',
title=_('Update existing cards (only for JSON imports)'),
title=_('Update existing cards (only for JSON imports)')
if not self.formdef.id_template
else _('Update existing cards'),
hint=_('Cards will be matched using their unique identifier ("uuid" property).')
if not self.formdef.id_template
else _('Cards will be matched using their custom identifier ("id" property).'),
else _(
'Cards will be matched using their custom identifier ("id" property). '
'If this option is enabled cards with the same identifiers will be updated, '
'otherwise they will be skipped.'
),
value=False,
)
form.add_submit('submit', _('Submit'))
@ -241,19 +247,22 @@ class CardPage(FormPage):
if form.is_submitted() and not form.has_errors():
file_content = form.get_widget('file').parse().fp.read()
update_existing_cards = form.get_widget('update_existing_cards').parse()
try:
json_content = json.loads(file_content)
except ValueError:
# not json -> CSV
try:
return self.import_csv_submit(file_content, submission_agent_id=get_request().user.id)
return self.import_csv_submit(
file_content,
update_existing_cards=update_existing_cards,
submission_agent_id=get_request().user.id,
)
except ValueError as e:
form.set_error('file', e)
else:
try:
return self.import_json_submit(
json_content, update_existing_cards=form.get_widget('update_existing_cards').parse()
)
return self.import_json_submit(json_content, update_existing_cards=update_existing_cards)
except ValueError as e:
form.set_error('file', e)
@ -275,7 +284,9 @@ class CardPage(FormPage):
impossible_fields.append(field.label)
return impossible_fields
def import_csv_submit(self, content, afterjob=True, api=False, submission_agent_id=None):
def import_csv_submit(
self, content, afterjob=True, api=False, update_existing_cards=False, submission_agent_id=None
):
if b'\0' in content:
raise ValueError(_('Invalid file format.'))
@ -328,7 +339,10 @@ class CardPage(FormPage):
raise ValueError(error_message)
job = ImportFromCsvAfterJob(
carddef=self.formdef, data_lines=data_lines, submission_agent_id=submission_agent_id
carddef=self.formdef,
data_lines=data_lines,
update_existing_cards=update_existing_cards,
submission_agent_id=submission_agent_id,
)
if afterjob:
get_response().add_after_job(job)
@ -432,12 +446,13 @@ class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
class ImportFromCsvAfterJob(AfterJob):
def __init__(self, carddef, data_lines, submission_agent_id):
def __init__(self, carddef, data_lines, update_existing_cards, submission_agent_id):
super().__init__(
label=_('Importing data into cards'),
carddef_class=carddef.__class__,
carddef_id=carddef.id,
data_lines=data_lines,
update_existing_cards=update_existing_cards,
submission_agent_id=submission_agent_id,
)
@ -448,6 +463,7 @@ class ImportFromCsvAfterJob(AfterJob):
def execute(self):
self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
update_existing_cards = self.kwargs['update_existing_cards']
carddata_class = self.carddef.data_class()
self.submission_agent_id = self.kwargs['submission_agent_id']
self.total_count = len(self.kwargs['data_lines'])
@ -507,6 +523,9 @@ class ImportFromCsvAfterJob(AfterJob):
except KeyError:
pass # unique id, fine
else:
if not update_existing_cards:
self.increment_count()
continue
# overwrite (only fields from CSV columns, not unsupported or backoffice fields)
new_card = False
orig_data = copy.copy(carddata_with_same_id.data)

View File

@ -495,6 +495,10 @@ class Command(TenantCommand):
variables['portal_user_url'] = service_url
variables['portal_user_title'] = service.get('title')
config.set('options', 'theme_skeleton_url', service.get('base_url') + '__skeleton__/')
if service.get('service-id') == 'lingo':
variables['lingo_url'] = urllib.parse.urljoin(service_url, '/')
for legacy_url in service.get('legacy_urls', []):
legacy_domain = urllib.parse.urlparse(legacy_url['base_url']).netloc.split(':')[0]
legacy_urls[legacy_domain] = domain

View File

@ -61,6 +61,7 @@ from .qommon.upload_storage import PicklableUpload
from .roles import logged_users_role
DRAFTS_DEFAULT_LIFESPAN = 100 # days
DRAFTS_DEFAULT_MAX_PER_USER = 5
if not hasattr(types, 'ClassType'):
types.ClassType = type
@ -190,6 +191,7 @@ class FormDef(StorableObject):
submission_lateral_template = None
id_template = None
drafts_lifespan = None
drafts_max_per_user = None
user_support = None
geolocations = None
@ -220,6 +222,7 @@ class FormDef(StorableObject):
'submission_lateral_template',
'id_template',
'drafts_lifespan',
'drafts_max_per_user',
'user_support',
]
BOOLEAN_ATTRIBUTES = [
@ -573,9 +576,11 @@ class FormDef(StorableObject):
def get_all_fields(self):
return (self.fields or []) + self.workflow.get_backoffice_fields()
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True):
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True, with_no_data_fields=True):
def _iter_fields(fields, block_field=None):
for field in fields:
if with_no_data_fields is False and field.is_no_data_field:
continue
# add contextual_id/contextual_varname attributes
# they are id/varname for normal fields
# but in case of blocks they are concatenation of block id/varname + field id/varname
@ -634,6 +639,9 @@ class FormDef(StorableObject):
def get_drafts_lifespan(self):
return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN)
def get_drafts_max_per_user(self):
return int(self.drafts_max_per_user or DRAFTS_DEFAULT_MAX_PER_USER)
_workflow = None
def get_workflow(self):

View File

@ -1030,7 +1030,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
return Directory._q_lookup(self, component)
def _q_traverse(self, path):
get_response().breadcrumb.append((str(self.filled.id) + '/', self.filled.get_display_id()))
get_response().breadcrumb.append((self.filled.identifier + '/', self.filled.get_display_id()))
return super()._q_traverse(path)
def wfedit(self, action_id):

View File

@ -1778,11 +1778,11 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
if get_session().mark_anonymous_formdata(filled):
get_session().store()
elif new_draft:
# keep at most 5 drafts per user
# keep at most "max_per_user" drafts per user
data_class = self.formdef.data_class()
for id in data_class.get_sorted_ids(
'-last_update_time', [Equal('status', 'draft'), Equal('user_id', str(filled.user_id))]
)[5:]:
)[self.formdef.get_drafts_max_per_user() :]:
data_class.remove_object(id)
if new_draft:

View File

@ -4,8 +4,8 @@ msgid ""
msgstr ""
"Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-03-18 14:22+0100\n"
"PO-Revision-Date: 2024-03-18 14:22+0100\n"
"POT-Creation-Date: 2024-03-21 19:00+0100\n"
"PO-Revision-Date: 2024-03-21 19:00+0100\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n"
"Language: fr\n"
@ -18,7 +18,7 @@ msgstr ""
#: admin/data_sources.py admin/forms.py admin/mail_templates.py admin/tests.py
#: admin/users.py admin/workflows.py admin/wscalls.py backoffice/management.py
#: fields/base.py qommon/ident/franceconnect.py
#: templates/wcs/backoffice/test-result.html wf/profile.py
#: templates/wcs/backoffice/test-result.html wf/profile.py workflow_tests.py
msgid "Name"
msgstr "Nom"
@ -866,7 +866,7 @@ msgstr ""
#: admin/fields.py admin/settings.py admin/users.py backoffice/management.py
#: data_sources.py fields/base.py qommon/form.py qommon/ident/password.py
#: statistics/views.py wf/create_formdata.py
#: statistics/views.py wf/create_formdata.py workflow_tests.py
msgid "None"
msgstr "Aucun"
@ -931,6 +931,12 @@ msgstr "Vous allez supprimer le champ « %s »."
msgid "Also remove all fields from the page"
msgstr "Supprimer tous les champs de la page"
#: admin/fields.py
msgid "Warning: the page fields data will be permanently deleted."
msgstr ""
"Attention : les informations contenues dans les champs de la page seront "
"perdues de façon irréversible."
#: admin/fields.py
#, python-format
msgid "Deletion of field \"%s\""
@ -1166,6 +1172,19 @@ msgstr "Par défaut les brouillons sont supprimés après %s jours."
msgid "Lifespan must be between 2 and 100 days."
msgstr "La durée de vie doit être entre 2 et 100 jours."
#: admin/forms.py
msgid "Maximum number of drafts per user (between 2 and 100)"
msgstr "Nombre maximum de brouillons par utilisateur (entre 2 et 100)"
#: admin/forms.py
#, python-format
msgid "%s drafts per user by default"
msgstr "%s brouillons par utilisateur par défaut"
#: admin/forms.py
msgid "Maximum must be between 2 and 100 drafts."
msgstr "Le nombre maximum doit être entre 2 et 100 brouillons."
#: admin/forms.py backoffice/management.py backoffice/submission.py
#: forms/root.py
msgid "Tracking Code"
@ -2369,7 +2388,7 @@ msgid "Sender (number or name)"
msgstr "Expéditeur (nom ou numéro)"
#: admin/settings.py admin/tests.py wf/notification.py wf/redirect_to_url.py
#: wf/wscall.py wscalls.py
#: wf/wscall.py workflow_tests.py wscalls.py
msgid "URL"
msgstr "URL"
@ -3940,6 +3959,10 @@ msgid "Update existing cards (only for JSON imports)"
msgstr ""
"Mettre à jour les fiches existantes (uniquement pour les fichiers JSON)"
#: backoffice/data_management.py
msgid "Update existing cards"
msgstr "Mettre à jour les fiches existantes"
#: backoffice/data_management.py
msgid ""
"Cards will be matched using their unique identifier (\"uuid\" property)."
@ -3948,10 +3971,15 @@ msgstr ""
"identifiant unique (propriété « uuid »)."
#: backoffice/data_management.py
msgid "Cards will be matched using their custom identifier (\"id\" property)."
msgid ""
"Cards will be matched using their custom identifier (\"id\" property). If "
"this option is enabled cards with the same identifiers will be updated, "
"otherwise they will be skipped."
msgstr ""
"La correspondance avec les fiches existantes se fera sur base de leur "
"identifiant personnalisé (propriété « id »)."
"identifiant personnalisé (propriété « id »). Si cette option est activée les "
"fiches avec un identifiant existant seront mises à jour, sinon elles seront "
"ignorées."
#: backoffice/data_management.py backoffice/i18n.py
#: templates/wcs/backoffice/card-data-import-form.html
@ -9728,6 +9756,10 @@ msgstr "Contenu de la barre latéral pour le traitement"
msgid "Tracking codes"
msgstr "Codes de suivi"
#: templates/wcs/backoffice/formdef-inspect.html
msgid "Maximum number of drafts per user"
msgstr "Nombre maximum de brouillons par utilisateur"
#: templates/wcs/backoffice/formdef-inspect.html
msgid "Redirection when disabled"
msgstr "Redirection quand désactivé"
@ -11392,7 +11424,7 @@ msgstr "Erreur dans le gabarit du message de workflow (%s)"
msgid "Error rendering message."
msgstr "Erreur de rendu du message."
#: wf/display_message.py wf/register_comment.py
#: wf/display_message.py wf/register_comment.py workflow_tests.py
msgid "Message"
msgstr "Message"
@ -12422,6 +12454,104 @@ msgstr "Numéros de téléphone"
msgid "Add phone number"
msgstr "Ajouter un numéro de téléphone"
#: workflow_tests.py
msgid "Assert anonymisation is performed"
msgstr "Vérifier que lanonymisation a lieu"
#: workflow_tests.py
msgid "Form was not anonymised."
msgstr "La demande na pas été anonymisée."
#: workflow_tests.py
msgid "Assert redirect is performed"
msgstr "Vérifier quune redirection a lieu"
#: workflow_tests.py
msgid "No redirection occured."
msgstr "Aucune redirection na eu lieu."
#: workflow_tests.py
#, python-format
msgid "Expected redirection to %(expected_url)s but was redirected to %(url)s."
msgstr ""
"Une redirection vers « %(expected_url)s » était attendue mais une "
"redirection vers « %(url)s » a eu lieu."
#: workflow_tests.py
msgid "Assert history message is displayed"
msgstr "Vérifier laffichage dun message dans lhistorique"
#: workflow_tests.py
msgid "No history message."
msgstr "Pas de message dans lhistorique."
#: workflow_tests.py
#, python-format
msgid "Displayed history message: %s"
msgstr "Message affiché dans lhistorique : %s"
#: workflow_tests.py
#, python-format
msgid "Expected history message: %s"
msgstr "Message attendu dans lhistorique : %s"
#: workflow_tests.py
msgid "Wrong history message content."
msgstr "Contenu du message dans lhistorique incorrect."
#: workflow_tests.py
msgid "Assertion will pass if the text is contained in history message."
msgstr "La vérification réussira si le texte est contenu dans le message."
#: workflow_tests.py
msgid "Assert alert is displayed"
msgstr "Vérifier laffichage dune alerte"
#: workflow_tests.py
#, python-format
msgid "Displayed alerts: %s"
msgstr "Alertes affichées : %s"
#: workflow_tests.py
#, python-format
msgid "Expected alert: %s"
msgstr "Alerte attendue : %s"
#: workflow_tests.py
msgid "No alert matching message."
msgstr "Pas dalerte correspondant au message attendu."
#: workflow_tests.py
msgid "Assertion will pass if the text is contained in alert message."
msgstr ""
"La vérification réussira si le texte est contenu dans le message dalerte."
#: workflow_tests.py
msgid "Assert criticality level"
msgstr "Vérifier le niveau de criticité"
#: workflow_tests.py
msgid "Workflow has no criticality levels."
msgstr "Le workflow na pas de niveaux de criticité."
#: workflow_tests.py
msgid "Broken, missing criticality level"
msgstr "Cassé, niveau de criticité manquant"
#: workflow_tests.py
#, python-format
msgid "Criticality is \"%s\""
msgstr "Le niveau de criticité est « %s »"
#: workflow_tests.py
#, python-format
msgid ""
"Form should have criticality level \"%(expected_level)s\" but has level "
"\"%(level)s\"."
msgstr ""
"La demande devrait avoir le niveau de criticité « %(expected_level)s » mais "
"elle a le niveau « %(level)s »."
#: workflow_traces.py
msgid "Created (by API)"
msgstr "Création (par lAPI)"

View File

@ -485,6 +485,7 @@ class WcsPublisher(QommonPublisher):
for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur)
sql.init_search_tokens()
cur.close()
def record_deprecated_usage(self, *args, **kwargs):

View File

@ -692,6 +692,11 @@ class QommonPublisher(Publisher):
for error in self.loggederror_class.select(clause=clauses):
self.loggederror_class.remove_object(error.id)
def clean_search_tokens(self, **kwargs):
from wcs import sql
sql.purge_obsolete_search_tokens()
@classmethod
def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
@ -704,6 +709,9 @@ class QommonPublisher(Publisher):
cls.register_cronjob(
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
)
cls.register_cronjob(
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens')
)
_initialized = False

View File

@ -113,9 +113,26 @@ function init_sync_from_template_address() {
}
}
$(widget_selector).each(function(idx, elem) {
// enable manual address mode if there is an error in one of the manual address fields.
var $manual_checkbox = $(elem).find('input.wcs-manual-address');
if ($(elem).nextUntil('.template-address', '[data-geolocation].widget-with-error').length) {
$(elem).find('input.wcs-manual-address').prop('checked', true).trigger('change');
// enable manual address mode if there is an error in one of the manual address fields.
$manual_checkbox.prop('checked', true).trigger('change');
} else {
// enable manual address mode if a manual field has data while the select is empty
// (typically when going back to a previous page)
var has_val = $(elem).find('select').val();
if (! has_val) {
var has_manual_var = false;
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('input').each(function(idx, manual_elem) {
if ($(manual_elem).val()) has_manual_var = true;
})
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('textarea').each(function(idx, manual_elem) {
if ($(manual_elem).val()) has_manual_var = true;
})
if (has_manual_var) {
$manual_checkbox.prop('checked', true).trigger('change');
}
}
}
});
}

View File

@ -188,7 +188,7 @@ REQUESTS_CERT = {}
DISABLE_CRON_JOBS = False
# w.c.s. can have very large forms, in backoffice and frontoffice
DATA_UPLOAD_MAX_NUMBER_FIELDS = 2000 # Django default is 1000
DATA_UPLOAD_MAX_NUMBER_FIELDS = 3000 # Django default is 1000
# workalendar config
WORKING_DAY_CALENDAR = 'workalendar.europe.France'

View File

@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = {
}
def _table_exists(cur, table_name):
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s', (table_name,))
rows = cur.fetchall()
return len(rows) > 0
def _trigger_exists(cur, table_name, trigger_name):
cur.execute(
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s', (table_name, trigger_name)
)
rows = cur.fetchall()
return len(rows) > 0
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -1582,6 +1596,8 @@ def do_global_views(conn, cur):
% (name, category.id)
)
init_search_tokens_triggers(cur)
def clean_global_views(conn, cur):
# Purge of any dead data
@ -1674,11 +1690,178 @@ def init_global_table(conn=None, cur=None):
endpoint_status=endpoint_status_filter,
)
)
init_search_tokens_data(cur)
if own_conn:
cur.close()
def init_search_tokens(conn=None, cur=None):
"""Initialize the search_tokens mechanism.
It's based on three parts:
- a token table
- triggers to feed this table from the tsvectors used in the database
- a search function that will leverage these tokens to extend the search query.
So far, the sources used are wcs_all_forms and searchable_formdefs.
Example: let's say the sources texts are "Tarif d'école" and "La cantine".
This gives the following tsvectors: ('tarif', 'écol') and ('cantin')
Our tokens table will have these three words.
When the search function is launched, it splits the search query and will
replace unavailable tokens by those close, if available.
The search query 'tari' will be expanded to 'tarif'.
The search query 'collège' will remain unchanged (and return nothing)
If several tokens match or are close enough, the query will be expanded to
an OR.
"""
own_cur = False
if cur is None:
own_cur = True
conn, cur = get_connection_and_cursor()
# Create table
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
# Create triggers
init_search_tokens_triggers(cur)
# Fill table
init_search_tokens_data(cur)
# Index at the end, small performance trick... not that useful, but it's free...
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
cur.execute(
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
)
# And last: functions to use this brand new table
# These two aggregates make the search query far simpler to write
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
cur.execute(
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
RETURNS tsquery
LANGUAGE sql
STABLE
AS $function$
WITH
tokenized AS (SELECT unnest(regexp_split_to_array($1, '\s+')) w),
super_tokenized AS (
-- perfect: tokens that are found as is in table, thus no OR required
-- partial: tokens found using distance search on tokens table (note: numbers are excluded here)
-- otherwise: token as is and likely no search result later
SELECT w,
coalesce((select plainto_tsquery(perfect.token) FROM wcs_search_tokens AS perfect WHERE perfect.token = plainto_tsquery(w)::text),
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
plainto_tsquery(w)) tokens
FROM tokenized
LEFT JOIN wcs_search_tokens AS partial ON partial.token % w AND w not similar to '%[0-9]{2,}%'
GROUP BY w)
SELECT tsquery_agg_and(tokens) FROM super_tokenized;
$function$;"""
)
if own_cur:
cur.close()
def init_search_tokens_triggers(cur):
# We define only appending triggers, ie on INSERT and UPDATE.
# It would be far heavier to maintain deletions here, and having extra data has
# no or marginal side effect on search performances, and absolutely no impact
# on search results.
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
# personal data is kept uselessly.
# First part: the appending function
cur.execute(
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
RETURN NEW;
END;
$function$;"""
)
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort trigger creation if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
):
# Second part: insert and update triggers for wcs_all_forms
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
AFTER INSERT ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
AFTER UPDATE OF fts ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
):
# Third part: insert and update triggers for searchable_formdefs
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
AFTER INSERT ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
AFTER UPDATE OF fts ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
def init_search_tokens_data(cur):
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort table data initialization if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
ON CONFLICT(token) DO NOTHING;"""
)
if _table_exists(cur, 'searchable_formdefs'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
ON CONFLICT(token) DO NOTHING;"""
)
def purge_obsolete_search_tokens(cur=None):
own_cur = False
if cur is None:
own_cur = True
_, cur = get_connection_and_cursor()
cur.execute(
"""DELETE FROM wcs_search_tokens
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
)
if own_cur:
cur.close()
class SqlMixin:
_table_name = None
_numerical_id = True
@ -4725,6 +4908,8 @@ class AnyFormData(SqlMixin):
# convert back unstructured geolocation to the 'native' formdata format.
if o.geoloc_base_x is not None:
o.geolocations = {'base': {'lon': o.geoloc_base_x, 'lat': o.geoloc_base_y}}
# do not allow storing those partial objects
o.store = None
return o
@classmethod
@ -4809,7 +4994,6 @@ class SearchableFormDef(SqlMixin):
% (cls._table_name, cls._table_name)
)
cls.do_indexes(cur)
cur.close()
from wcs.carddef import CardDef
from wcs.formdef import FormDef
@ -4818,6 +5002,8 @@ class SearchableFormDef(SqlMixin):
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
):
cls.update(obj=objectdef)
init_search_tokens(cur)
cur.close()
@classmethod
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
@ -4855,7 +5041,7 @@ class SearchableFormDef(SqlMixin):
def search(cls, obj_type, string):
_, cur = get_connection_and_cursor()
cur.execute(
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)',
'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
(FtsMatch.get_fts_value(string),),
)
ids = [x[0] for x in cur.fetchall()]
@ -5120,7 +5306,7 @@ def get_period_total(
# latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number)
SQL_LEVEL = (106, 'add context column to logged_errors table')
SQL_LEVEL = (107, 'new fts mechanism with tokens table')
def migrate_global_views(conn, cur):
@ -5454,6 +5640,10 @@ def migrate():
for formdef in FormDef.select() + CardDef.select():
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
if sql_level < 107:
# 107: new fts mechanism with tokens table
init_search_tokens()
if sql_level != SQL_LEVEL[0]:
cur.execute(
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',

View File

@ -379,6 +379,11 @@ class FtsMatch(Criteria):
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
class WcsFtsMatch(FtsMatch):
def as_sql(self):
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
class ElementEqual(Criteria):
def __init__(self, attribute, key, value, **kwargs):
super().__init__(attribute, value)

View File

@ -67,6 +67,7 @@
<li><span class="parameter">{% trans "Fields to check after entering the tracking code" %}{% trans ":" %}</span> {{ tracking_code_verify_fields_labels|default:"-" }}</li>
{% endif %}
<li><span class="parameter">{% trans "Lifespan of drafts (in days)" %}{% trans ":" %}</span> {{ formdef.get_drafts_lifespan }}</li>
<li><span class="parameter">{% trans "Maximum number of drafts per user" %}{% trans ":" %}</span> {{ formdef.get_drafts_max_per_user }}</li>
<li><span class="parameter">{% trans "Templates" %}</span>
<ul>
<li><span class="parameter">{% trans "Digest" %}{% trans ":" %}</span> {{ formdef.default_digest_template|default:"-" }}</li>

View File

@ -31,9 +31,11 @@
</span>
</span>
<p class="commands">
<span class="edit">
<a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a>
</span>
{% if action.editable %}
<span class="edit">
<a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a>
</span>
{% endif %}
<span class="duplicate">
<a href="{{ action.id }}/duplicate" title="{% trans "Duplicate" %}">{% trans "Duplicate" %}</a>
</span>

View File

@ -21,6 +21,7 @@ import io
import json
import socket
import urllib.parse
import uuid
import xml.etree.ElementTree as ET
from contextlib import contextmanager
@ -75,8 +76,8 @@ class TestDefXmlProxy(XmlStorableObject):
}
excluded_fields = ['id', 'object_type', 'object_id']
extra_fields = [
('workflow_tests', 'workflow_tests'),
('_webservice_responses', 'webservice_responses'),
('workflow_tests', 'workflow_tests'),
]
return [
@ -674,6 +675,7 @@ class WebserviceResponse(XmlStorableObject):
_names = 'webservice-response'
xml_root_node = 'webservice-response'
uuid = None
testdef_id = None
name = ''
payload = None
@ -684,6 +686,7 @@ class WebserviceResponse(XmlStorableObject):
post_data = None
XML_NODES = [
('uuid', 'str'),
('testdef_id', 'int'),
('name', 'str'),
('payload', 'str'),
@ -694,6 +697,10 @@ class WebserviceResponse(XmlStorableObject):
('post_data', 'kv_data'),
]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
def __str__(self):
return self.name

View File

@ -270,7 +270,7 @@ class LazyFormDefObjectsManager:
return self._clone(self._criterias + [self._formdef.get_by_id_criteria(str(value))])
def get_fields(self, key):
for field in self._formdef.iter_fields(include_block_fields=True):
for field in self._formdef.iter_fields(include_block_fields=True, with_no_data_fields=False):
if getattr(field, 'block_field', None):
if field.key == 'items':
# not yet
@ -757,8 +757,6 @@ class LazyFormDef:
@property
def option(self):
if not self._formdef.workflow.variables_formdef:
return {}
return LazyFormDefOptions(self._formdef)
@property
@ -1989,14 +1987,20 @@ class LazyRequest:
class LazyFormDefOptions(LazyFormDataVar):
def __init__(self, formdef):
self._formdef = formdef
fields = self._formdef.workflow.variables_formdef.fields
try:
fields = self._formdef.workflow.variables_formdef.fields
except AttributeError:
fields = []
data = self._formdef.workflow_options or {}
for field in fields:
# change field IDs as options are stored in data with their
# varnames, not id.
field.id = field.varname or field.id
if hasattr(field, 'default_value') and data.get(field.varname) is None:
data[field.varname] = field.default_value
if isinstance(field.default_value, str):
data[field.varname] = field.convert_value_from_str(field.default_value)
else:
data[field.varname] = field.default_value
super().__init__(fields, data)
def inspect_keys(self):

View File

@ -82,5 +82,15 @@ class AnonymiseWorkflowStatusItem(WorkflowStatusItem):
default_value=self.__class__.mode,
)
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_perform = self.perform
def perform(formdata):
original_perform(formdata)
formdata.anonymisation_performed = True
setattr(self, 'perform', perform)
return self
register_item_class(AnonymiseWorkflowStatusItem)

View File

@ -86,5 +86,8 @@ class ModifyCriticalityWorkflowStatusItem(WorkflowStatusItem):
elif self.mode == MODE_SET:
formdata.set_criticality_level(int(self.absolute_value))
def get_workflow_test_action(self, *args, **kwargs):
return self
register_item_class(ModifyCriticalityWorkflowStatusItem)

View File

@ -169,5 +169,8 @@ class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
location = '%sitems/%s/' % (base_location, self.id)
yield location, None, self.message
def get_workflow_test_action(self, *args, **kwargs):
return self
register_item_class(DisplayMessageWorkflowStatusItem)

View File

@ -63,5 +63,16 @@ class RedirectToUrlWorkflowStatusItem(WorkflowStatusItem):
return # don't redirect
return url
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_perform = self.perform
def perform(formdata):
url = original_perform(formdata)
formdata.redirect_to_url = url
return url
setattr(self, 'perform', perform)
return self
register_item_class(RedirectToUrlWorkflowStatusItem)

View File

@ -228,20 +228,34 @@ class RegisterCommenterWorkflowStatusItem(WorkflowStatusItem):
# the comment can use attachments done above
if comment:
try:
formdata.evolution[-1].add_part(
JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level)
)
part = self.get_journal_evolution_part(formdata, comment)
if part:
formdata.evolution[-1].add_part(part)
formdata.store()
except TemplateError as e:
get_publisher().record_error(
_('Error in template, comment could not be generated'), formdata=formdata, exception=e
)
def get_journal_evolution_part(self, formdata, comment):
try:
return JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level)
except TemplateError as e:
get_publisher().record_error(
_('Error in template, comment could not be generated'), formdata=formdata, exception=e
)
def i18n_scan(self, base_location):
location = '%sitems/%s/' % (base_location, self.id)
if not self.comment_template:
yield location, None, self.comment
def get_workflow_test_action(self, formdata, *args, **kwargs):
original_get_journal_evolution_part = self.get_journal_evolution_part
def get_journal_evolution_part(formdata, comment):
part = original_get_journal_evolution_part(formdata, comment)
formdata.history_messages.append(part.content)
return part
setattr(self, 'get_journal_evolution_part', get_journal_evolution_part)
return self
register_item_class(RegisterCommenterWorkflowStatusItem)

View File

@ -28,11 +28,12 @@ from wcs.qommon.form import (
RadiobuttonsWidget,
SingleSelectWidget,
StringWidget,
TextWidget,
WidgetList,
)
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
from wcs.qommon.xml_storage import XmlStorableObject
from wcs.testdef import TestError, WebserviceResponse
from wcs.testdef import TestError
from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget
from wcs.wf.profile import FieldNode
@ -90,9 +91,7 @@ class WorkflowTests(XmlStorableObject):
formdata.workflow_test = True
formdata.frozen_receipt_time = formdata.receipt_time
formdata.sent_sms = []
formdata.sent_emails = []
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
self.reset_formdata_test_attributes(formdata)
formdata.perform_workflow()
for action in self.actions:
@ -102,9 +101,7 @@ class WorkflowTests(XmlStorableObject):
continue
if not action.is_assertion:
formdata.sent_sms.clear()
formdata.sent_emails.clear()
formdata.used_webservice_responses.clear()
self.reset_formdata_test_attributes(formdata)
try:
action.perform(formdata)
@ -127,6 +124,14 @@ class WorkflowTests(XmlStorableObject):
formdata.store = lambda *args, **kwargs: None
def reset_formdata_test_attributes(self, formdata):
formdata.sent_sms = []
formdata.sent_emails = []
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
formdata.anonymisation_performed = False
formdata.redirect_to_url = None
formdata.history_messages = []
def get_new_action_id(self):
if not self.actions:
return '1'
@ -145,7 +150,12 @@ class WorkflowTests(XmlStorableObject):
'webservice_call': AssertWebserviceCall,
'set-backoffice-fields': AssertBackofficeFieldValues,
'button': ButtonClick,
'global-action-button': ButtonClick,
'timeout-jump': SkipTime,
'anonymise': AssertAnonymise,
'redirect_to_url': AssertRedirect,
'register-comment': AssertHistoryMessage,
'modify_criticality': AssertCriticality,
}
previous_trace = None
@ -196,6 +206,7 @@ class WorkflowTestAction(XmlStorableObject):
optional_fields = []
is_assertion = True
editable = True
XML_NODES = [
('id', 'str'),
@ -266,14 +277,26 @@ class ButtonClick(WorkflowTestAction):
return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user}
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
try:
item = [
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id']
][0]
except IndexError:
return
if 'action_item_id' in trace.event_args:
try:
button_name = [
x.label
for x in self.get_all_choice_actions(formdef)
if x.id == trace.event_args['action_item_id']
][0]
except IndexError:
return
elif 'global_action_id' in trace.event_args:
try:
button_name = [
x.name
for x in self.get_all_global_actions(formdef)
if x.id == trace.event_args['global_action_id']
][0]
except IndexError:
return
self.button_name = item.label
self.button_name = button_name
def perform(self, formdata):
if self.who == 'receiver':
@ -306,8 +329,15 @@ class ButtonClick(WorkflowTestAction):
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
yield item
@staticmethod
def get_all_global_actions(formdef):
for action in formdef.workflow.global_actions or []:
if not action.is_interactive():
yield action
def fill_admin_form(self, form, formdef):
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
possible_button_names.update(action.name for action in self.get_all_global_actions(formdef))
if not possible_button_names:
return
@ -631,20 +661,22 @@ class AssertWebserviceCall(WorkflowTestAction):
label = _('Assert webservice call')
key = 'assert-webservice-call'
webservice_response_id = None
webservice_response_uuid = None
call_count = 1
optional_fields = ['call_count']
XML_NODES = WorkflowTestAction.XML_NODES + [
('webservice_response_id', 'str'),
('webservice_response_uuid', 'str'),
('call_count', 'int'),
]
@property
def details_label(self):
webservice_responses = [
x for x in self.parent.testdef.get_webservice_responses() if x.id == self.webservice_response_id
x
for x in self.parent.testdef.get_webservice_responses()
if x.uuid == self.webservice_response_uuid
]
if webservice_responses:
return webservice_responses[0].name
@ -664,13 +696,17 @@ class AssertWebserviceCall(WorkflowTestAction):
def perform(self, formdata):
try:
response = WebserviceResponse.get(self.webservice_response_id)
except KeyError:
response = [
x
for x in self.parent.testdef.get_webservice_responses()
if x.uuid == self.webservice_response_uuid
][0]
except IndexError:
raise WorkflowTestError(_('Broken, missing webservice response'))
call_count = 0
for used_response in formdata.used_webservice_responses.copy():
if used_response.id == self.webservice_response_id:
if used_response.uuid == self.webservice_response_uuid:
formdata.used_webservice_responses.remove(used_response)
call_count += 1
@ -682,7 +718,7 @@ class AssertWebserviceCall(WorkflowTestAction):
def fill_admin_form(self, form, formdef):
webservice_response_options = [
(response.id, response.name, response.id)
(response.uuid, response.name, response.uuid)
for response in self.parent.testdef.get_webservice_responses()
]
@ -691,11 +727,11 @@ class AssertWebserviceCall(WorkflowTestAction):
form.add(
SingleSelectWidget,
'webservice_response_id',
'webservice_response_uuid',
title=_('Webservice response'),
options=webservice_response_options,
required=True,
value=self.webservice_response_id,
value=self.webservice_response_uuid,
)
form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count)
@ -760,3 +796,163 @@ class AssertSMS(WorkflowTestAction):
title=_('Body'),
value=self.body,
)
class AssertAnonymise(WorkflowTestAction):
label = _('Assert anonymisation is performed')
key = 'assert-anonymise'
editable = False
details_label = ''
def perform(self, formdata):
if not formdata.anonymisation_performed:
raise WorkflowTestError(_('Form was not anonymised.'))
class AssertRedirect(WorkflowTestAction):
label = _('Assert redirect is performed')
key = 'assert-redirect'
url = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('url', 'str'),
]
@property
def details_label(self):
return self.url
def perform(self, formdata):
if not formdata.redirect_to_url:
raise WorkflowTestError(_('No redirection occured.'))
if formdata.redirect_to_url != self.url:
raise WorkflowTestError(
_('Expected redirection to %(expected_url)s but was redirected to %(url)s.')
% {'expected_url': self.url, 'url': formdata.redirect_to_url}
)
def fill_admin_form(self, form, formdef):
form.add(
StringWidget,
'url',
title=_('URL'),
value=self.url,
)
class AssertHistoryMessage(WorkflowTestAction):
label = _('Assert history message is displayed')
details_label = ''
key = 'assert-history-message'
message = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('message', 'str'),
]
def perform(self, formdata):
try:
message = formdata.history_messages.pop(0)
except IndexError:
raise WorkflowTestError(_('No history message.'))
if self.message not in message:
details = [
_('Displayed history message: %s') % message,
_('Expected history message: %s') % self.message,
]
raise WorkflowTestError(_('Wrong history message content.'), details=details)
def fill_admin_form(self, form, formdef):
form.add(
TextWidget,
'message',
title=_('Message'),
value=self.message,
hint=_('Assertion will pass if the text is contained in history message.'),
)
class AssertAlert(WorkflowTestAction):
label = _('Assert alert is displayed')
details_label = ''
key = 'assert-alert'
message = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('message', 'str'),
]
def perform(self, formdata):
messages = formdata.get_workflow_messages()
for message in messages:
if self.message in message:
break
else:
details = [
_('Displayed alerts: %s') % (', '.join(messages) if messages else _('None')),
_('Expected alert: %s') % self.message,
]
raise WorkflowTestError(_('No alert matching message.'), details=details)
def fill_admin_form(self, form, formdef):
form.add(
TextWidget,
'message',
title=_('Message'),
value=self.message,
hint=_('Assertion will pass if the text is contained in alert message.'),
)
class AssertCriticality(WorkflowTestAction):
label = _('Assert criticality level')
empty_form_error = _('Workflow has no criticality levels.')
key = 'assert-criticality'
level_id = None
XML_NODES = WorkflowTestAction.XML_NODES + [
('level_id', 'str'),
]
@property
def details_label(self):
levels = [
x for x in self.parent.testdef.formdef.workflow.criticality_levels or [] if x.id == self.level_id
]
if not levels:
return _('Broken, missing criticality level')
return _('Criticality is "%s"') % levels[0].name
def perform(self, formdata):
levels = [x for x in formdata.formdef.workflow.criticality_levels or [] if x.id == self.level_id]
if not levels:
raise WorkflowTestError(_('Broken, missing criticality level'))
current_level = formdata.get_criticality_level_object()
if current_level.id != self.level_id:
raise WorkflowTestError(
_('Form should have criticality level "%(expected_level)s" but has level "%(level)s".')
% {'expected_level': levels[0].name, 'level': current_level.name}
)
def fill_admin_form(self, form, formdef):
if not formdef.workflow.criticality_levels:
return
form.add(
SingleSelectWidget,
'level_id',
title=_('Name'),
value=self.level_id,
options=[(x.id, x.name, x.id) for x in formdef.workflow.criticality_levels],
)

View File

@ -720,6 +720,9 @@ class WorkflowVariablesFieldsFormDef(FormDef):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id)
def get_field_admin_url(self, field):
return self.get_admin_url() + '%s/' % field.id
def get_new_field_id(self):
return str(uuid.uuid4())
@ -2733,7 +2736,7 @@ class WorkflowStatus(SerieOfActionsMixin):
if check_replay and form.get('_ts') != str(filled.last_update_time.timestamp()):
raise ReplayException()
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
if 'button-action-%s' % action.id in get_request().form:
if form.get_submit() == 'button-action-%s' % action.id:
if action.is_interactive():
return action.get_global_interactive_form_url(formdef=filled.formdef, ids=[filled.id])
filled.record_workflow_event('global-action-button', global_action_id=action.id)