Compare commits
30 Commits
af28be9910
...
ac48ebb70d
Author | SHA1 | Date |
---|---|---|
Pierre Ducroquet | ac48ebb70d | |
Pierre Ducroquet | 2962b9dd3f | |
Pierre Ducroquet | 03015aa750 | |
Valentin Deniaud | dc473b7378 | |
Frédéric Péters | d0358afa40 | |
Frédéric Péters | 4d5b309986 | |
Lauréline Guérin | e0857ce653 | |
Frédéric Péters | 8c3374e790 | |
Frédéric Péters | 03435d40a6 | |
Frédéric Péters | 70b7087ad9 | |
Valentin Deniaud | 9afbbccb13 | |
Corentin Sechet | 0c225cf254 | |
Valentin Deniaud | a23457fdbf | |
Frédéric Péters | 9c12c01712 | |
Thomas NOËL | 89b4d350ab | |
Valentin Deniaud | 721bdc4e44 | |
Thomas NOËL | 955f012b3d | |
Valentin Deniaud | 0ed9d5d0a0 | |
Valentin Deniaud | 6fd4b87ff5 | |
Valentin Deniaud | d4c3e7dc4e | |
Valentin Deniaud | 7199e84903 | |
Valentin Deniaud | e76e33808b | |
Valentin Deniaud | 0d82f03e59 | |
Valentin Deniaud | 03669bb847 | |
Frédéric Péters | c0d2d36b3c | |
Frédéric Péters | bdb24e21e9 | |
Frédéric Péters | 3a4b8c9cc7 | |
Frédéric Péters | 083f3cf3dd | |
Frédéric Péters | 520e52d1a7 | |
Frédéric Péters | 69249df789 |
|
@ -386,6 +386,7 @@ Une API existe pour récupérer le schéma de données d’un modèle de fiches.
|
|||
"disabled_redirection" : null,
|
||||
"discussion" : false,
|
||||
"drafts_lifespan" : null,
|
||||
"drafts_max_per_user" : null,
|
||||
"enable_tracking_codes" : false,
|
||||
"expiration_date" : null,
|
||||
"fields" : [
|
||||
|
|
|
@ -642,6 +642,8 @@ def test_block_field_statistics_data_update(pub):
|
|||
|
||||
def test_block_test_results(pub):
|
||||
create_superuser(pub)
|
||||
TestDef.wipe()
|
||||
TestResult.wipe()
|
||||
BlockDef.wipe()
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
|
|
|
@ -23,7 +23,12 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
|
|||
from wcs.wf.jump import JumpWorkflowStatusItem
|
||||
from wcs.wf.notification import SendNotificationWorkflowStatusItem
|
||||
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowImportError
|
||||
from wcs.workflows import (
|
||||
Workflow,
|
||||
WorkflowBackofficeFieldsFormDef,
|
||||
WorkflowImportError,
|
||||
WorkflowVariablesFieldsFormDef,
|
||||
)
|
||||
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
|
||||
|
||||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
@ -102,6 +107,10 @@ def test_deprecations(pub):
|
|||
workflow.backoffice_fields_formdef.fields = [
|
||||
fields.TableField(id='bo1', label='table field'),
|
||||
]
|
||||
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow)
|
||||
workflow.variables_formdef.fields = [
|
||||
fields.TableField(id='wfvar1', label='other table field'),
|
||||
]
|
||||
st0 = workflow.add_status('Status0', 'st0')
|
||||
|
||||
display = st0.add_action('displaymsg')
|
||||
|
@ -266,6 +275,7 @@ def test_deprecations(pub):
|
|||
assert [x.text for x in resp.pyquery('.section--fields li a')] == [
|
||||
'foobar / Field "table field"',
|
||||
'foobar / Field "ranked field"',
|
||||
'Options of workflow "test" / Field "other table field"',
|
||||
'Backoffice fields of workflow "test" / Field "table field"',
|
||||
]
|
||||
assert [x.text for x in resp.pyquery('.section--actions li a')] == [
|
||||
|
|
|
@ -284,6 +284,7 @@ def test_forms_edit_tracking_code(pub, formdef):
|
|||
|
||||
resp = resp.click('Form Tracking')
|
||||
assert resp.forms[0]['drafts_lifespan'].value == ''
|
||||
assert resp.forms[0]['drafts_max_per_user'].value == ''
|
||||
resp = resp.forms[0].submit().follow() # check empty value is ok
|
||||
|
||||
resp = resp.click('Form Tracking')
|
||||
|
@ -297,6 +298,20 @@ def test_forms_edit_tracking_code(pub, formdef):
|
|||
resp = resp.forms[0].submit().follow()
|
||||
assert FormDef.get(1).drafts_lifespan == '5'
|
||||
|
||||
resp = resp.click('Form Tracking')
|
||||
resp.forms[0]['drafts_max_per_user'].value = 'xxx'
|
||||
resp = resp.forms[0].submit()
|
||||
assert 'Maximum must be between 2 and 100 drafts.' in resp
|
||||
resp.forms[0]['drafts_max_per_user'].value = '120'
|
||||
resp = resp.forms[0].submit()
|
||||
assert 'Maximum must be between 2 and 100 drafts.' in resp
|
||||
resp.forms[0]['drafts_max_per_user'].value = '1'
|
||||
resp = resp.forms[0].submit()
|
||||
assert 'Maximum must be between 2 and 100 drafts.' in resp
|
||||
resp.forms[0]['drafts_max_per_user'].value = '3'
|
||||
resp = resp.forms[0].submit().follow()
|
||||
assert FormDef.get(1).drafts_max_per_user == '3'
|
||||
|
||||
formdef.fields = [
|
||||
fields.StringField(id='1', label='VerifyString'),
|
||||
fields.DateField(id='2', label='VerifyDate'),
|
||||
|
|
|
@ -1167,6 +1167,11 @@ def test_tests_duplicate(pub):
|
|||
response.name = 'Response xxx'
|
||||
response.store()
|
||||
|
||||
testdef.workflow_tests.actions.append(
|
||||
workflow_tests.AssertWebserviceCall(id='3', webservice_response_uuid=response.uuid),
|
||||
)
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
assert TestDef.count() == 1
|
||||
|
@ -1196,6 +1201,8 @@ def test_tests_duplicate(pub):
|
|||
assert testdef2.workflow_tests.actions[0].button_name == 'Go to end status'
|
||||
assert testdef1.get_webservice_responses()[0].name == 'Changed'
|
||||
assert testdef2.get_webservice_responses()[0].name == 'Response xxx'
|
||||
assert testdef1.workflow_tests.actions[2].details_label == 'Changed'
|
||||
assert testdef2.workflow_tests.actions[2].details_label == 'Response xxx'
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/' % testdef.id)
|
||||
resp = resp.click('Duplicate')
|
||||
|
|
|
@ -9,7 +9,7 @@ from wcs import workflow_tests
|
|||
from wcs.formdef import FormDef, fields
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.testdef import TestDef, WebserviceResponse
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowCriticalityLevel
|
||||
|
||||
from ..utilities import create_temporary_pub, get_app, login
|
||||
from .test_all import create_superuser
|
||||
|
@ -237,10 +237,16 @@ def test_workflow_tests_action_button_click(pub):
|
|||
jump = new_status.add_action('choice')
|
||||
jump.label = 'Button no target status'
|
||||
|
||||
workflow.add_global_action('Action 1')
|
||||
|
||||
interactive_action = workflow.add_global_action('Interactive action (should not be shown)')
|
||||
interactive_action.add_action('form')
|
||||
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
|
||||
assert resp.form['button_name'].options == [
|
||||
('Action 1', False, 'Action 1'),
|
||||
('Button 1', False, 'Button 1'),
|
||||
('Button 2', False, 'Button 2'),
|
||||
('Button 4 (not available)', True, 'Button 4 (not available)'),
|
||||
|
@ -418,6 +424,156 @@ def test_workflow_tests_action_assert_sms(pub):
|
|||
assert escape('SMS to 0123456789 (+2)') in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_anonymise(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertAnonymise(id='1'),
|
||||
]
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
|
||||
assert 'Edit' not in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_redirect(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertRedirect(id='1'),
|
||||
]
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
|
||||
assert 'not configured' in resp.text
|
||||
|
||||
resp = resp.click('Edit')
|
||||
resp.form['url'] = 'http://example.com'
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
assert 'not configured' not in resp.text
|
||||
assert 'http://example.com' in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_history_message(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertHistoryMessage(id='1'),
|
||||
]
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
|
||||
assert 'not configured' in resp.text
|
||||
|
||||
resp = resp.click('Edit')
|
||||
resp.form['message'] = 'Hello'
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
assert 'not configured' not in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_alert(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertAlert(id='1'),
|
||||
]
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
|
||||
assert 'not configured' in resp.text
|
||||
|
||||
resp = resp.click('Edit')
|
||||
resp.form['message'] = 'Hello'
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
assert 'not configured' not in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_criticality(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
workflow.add_status(name='New status')
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertCriticality(id='1'),
|
||||
]
|
||||
testdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/' % testdef.id)
|
||||
assert 'not configured' in resp.text
|
||||
|
||||
resp = resp.click('Edit')
|
||||
assert 'Workflow has no criticality levels.' in resp.text
|
||||
|
||||
workflow.criticality_levels = [
|
||||
WorkflowCriticalityLevel(name='green'),
|
||||
WorkflowCriticalityLevel(name='red'),
|
||||
]
|
||||
workflow.store()
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
|
||||
resp.form['level_id'].select(text='green')
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
assert 'not configured' not in resp.text
|
||||
assert escape('Criticality is "green"') in resp.text
|
||||
|
||||
|
||||
def test_workflow_tests_action_assert_backoffice_field(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
|
@ -508,13 +664,13 @@ def test_workflow_tests_action_assert_webservice_call(pub):
|
|||
response3.store()
|
||||
|
||||
resp = app.get('/backoffice/forms/1/tests/%s/workflow/1/' % testdef.id)
|
||||
assert resp.form['webservice_response_id'].options == [
|
||||
(str(response.id), False, 'Fake response'),
|
||||
(str(response2.id), False, 'Fake response 2'),
|
||||
assert resp.form['webservice_response_uuid'].options == [
|
||||
(str(response.uuid), False, 'Fake response'),
|
||||
(str(response2.uuid), False, 'Fake response 2'),
|
||||
]
|
||||
assert resp.form['call_count'].value == '1'
|
||||
|
||||
resp.form['webservice_response_id'] = 1
|
||||
resp.form['webservice_response_uuid'] = response.uuid
|
||||
resp.form['call_count'] = 2
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
|
@ -522,7 +678,7 @@ def test_workflow_tests_action_assert_webservice_call(pub):
|
|||
assert 'Broken' not in resp.text
|
||||
|
||||
assert_webservice_call = TestDef.get(testdef.id).workflow_tests.actions[0]
|
||||
assert assert_webservice_call.webservice_response_id == '1'
|
||||
assert assert_webservice_call.webservice_response_uuid == response.uuid
|
||||
assert assert_webservice_call.call_count == 2
|
||||
|
||||
response.remove_self()
|
||||
|
|
|
@ -429,6 +429,9 @@ def test_backoffice_submission_formdef_list_search(pub, local_user, access, auth
|
|||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=test')
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=tes')
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz')
|
||||
assert len(resp.json['data']) == 0
|
||||
|
||||
|
|
|
@ -398,6 +398,7 @@ def test_backoffice_card_item_link_id_template(pub):
|
|||
resp = resp.form.submit('submit')
|
||||
assert resp.location.endswith('/backoffice/data/foo/blah/')
|
||||
resp = resp.follow()
|
||||
assert resp.pyquery('.breadcrumbs a')[-1].attrib['href'] == '/backoffice/data/foo/blah/'
|
||||
resp = app.get('/backoffice/data/foo/')
|
||||
assert [x.attrib['href'] for x in resp.pyquery('table a')] == ['blah/', 'test/']
|
||||
|
||||
|
@ -681,6 +682,58 @@ def test_backoffice_cards_import_data_csv_no_backoffice_fields(pub):
|
|||
assert carddef.data_class().count() == 2
|
||||
|
||||
|
||||
def test_backoffice_cards_import_data_csv_custom_id_no_update(pub):
|
||||
user = create_user(pub)
|
||||
user.name_identifiers = [str(uuid.uuid4())]
|
||||
user.store()
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='form-title')
|
||||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||||
workflow.backoffice_fields_formdef.fields = [
|
||||
fields.StringField(id='bo0', varname='foo_bovar', label='bo variable'),
|
||||
]
|
||||
workflow.add_status('st0')
|
||||
workflow.store()
|
||||
|
||||
CardDef.wipe()
|
||||
carddef = CardDef()
|
||||
carddef.name = 'test'
|
||||
carddef.fields = [
|
||||
fields.StringField(id='1', label='String', varname='custom_id'),
|
||||
fields.ItemField(id='2', label='List', items=['item1', 'item2']),
|
||||
]
|
||||
carddef.backoffice_submission_roles = user.roles
|
||||
carddef.id_template = '{{form_var_custom_id}}'
|
||||
carddef.workflow = workflow
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
card = carddef.data_class()()
|
||||
card.data = {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'}
|
||||
card.just_created()
|
||||
card.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
data = b'''\
|
||||
"String","List"
|
||||
"plop","item1"
|
||||
"test","item2"
|
||||
'''
|
||||
resp = app.get('/backoffice/data/test/import-file')
|
||||
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
|
||||
resp.form['update_existing_cards'].checked = False
|
||||
resp = resp.forms[0].submit().follow()
|
||||
assert carddef.data_class().count() == 2
|
||||
|
||||
card.refresh_from_storage()
|
||||
assert card.data == {'1': 'plop', '2': 'test', '2_display': 'test', 'bo0': 'xxx'} # no change
|
||||
|
||||
other_card = carddef.data_class().select(order_by='-receipt_time')[0]
|
||||
assert other_card.data == {'1': 'test', '2': 'item2', '2_display': 'item2', 'bo0': None}
|
||||
assert other_card.id_display == 'test'
|
||||
|
||||
|
||||
def test_backoffice_cards_import_data_csv_custom_id_update(pub):
|
||||
user = create_user(pub)
|
||||
user.name_identifiers = [str(uuid.uuid4())]
|
||||
|
@ -721,6 +774,7 @@ def test_backoffice_cards_import_data_csv_custom_id_update(pub):
|
|||
'''
|
||||
resp = app.get('/backoffice/data/test/import-file')
|
||||
resp.forms[0]['file'] = Upload('test.csv', data, 'text/csv')
|
||||
resp.form['update_existing_cards'].checked = True
|
||||
resp = resp.forms[0].submit().follow()
|
||||
assert carddef.data_class().count() == 2
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ from wcs.workflows import (
|
|||
Workflow,
|
||||
WorkflowBackofficeFieldsFormDef,
|
||||
WorkflowCriticalityLevel,
|
||||
WorkflowVariablesFieldsFormDef,
|
||||
)
|
||||
|
||||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
@ -1205,3 +1206,72 @@ def test_inspect_page_idp_role(pub):
|
|||
resp.pyquery('[data-function-key="_receiver"] a').attr.href
|
||||
== 'https://idp.example.net/manage/roles/uuid:d4b59e1ffb204dfd99fd3760f4952999/'
|
||||
)
|
||||
|
||||
|
||||
def test_inspect_page_form_option(pub):
|
||||
create_user(pub, is_admin=True)
|
||||
FormDef.wipe()
|
||||
|
||||
wf = Workflow(name='variables')
|
||||
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
||||
wf.add_status('st1')
|
||||
wf.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'form title'
|
||||
formdef.fields = []
|
||||
formdef.workflow = wf
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
|
||||
assert 'form_option' not in resp.text
|
||||
|
||||
wf.variables_formdef.fields = [
|
||||
fields.StringField(label='String test', varname='string_test'),
|
||||
fields.DateField(label='Date test', varname='date_test'),
|
||||
]
|
||||
wf.store()
|
||||
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
|
||||
== 'None (no value)'
|
||||
)
|
||||
|
||||
wf.variables_formdef.fields[0].default_value = 'xxx'
|
||||
wf.variables_formdef.fields[1].default_value = '2024-03-20'
|
||||
wf.store()
|
||||
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
|
||||
== 'xxx'
|
||||
)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
|
||||
== '2024-03-20'
|
||||
)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
|
||||
== '2024 (integer number)'
|
||||
)
|
||||
|
||||
formdef.workflow_options = {'string_test': 'yyy', 'date_test': datetime.date(2024, 3, 21).timetuple()}
|
||||
formdef.store()
|
||||
resp = app.get('%sinspect' % formdata.get_url(backoffice=True), status=200)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_string_test"]').parents('li').children('div.value span').text()
|
||||
== 'yyy'
|
||||
)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_date_test"]').parents('li').children('div.value span').text()
|
||||
== '2024-03-21'
|
||||
)
|
||||
assert (
|
||||
resp.pyquery('[title="form_option_date_test_year"]').parents('li').children('div.value span').text()
|
||||
== '2024 (integer number)'
|
||||
)
|
||||
|
|
|
@ -414,6 +414,14 @@ def test_form_max_drafts(pub):
|
|||
|
||||
assert not formdef.data_class().has_key(drafts[0].id) # oldest draft was removed
|
||||
|
||||
formdef.drafts_max_per_user = '3'
|
||||
formdef.store()
|
||||
|
||||
resp = app.get('/test/')
|
||||
resp.form['f0'] = 'hello2'
|
||||
resp = resp.form.submit('submit')
|
||||
assert formdef.data_class().count([Equal('status', 'draft')]) == 4
|
||||
|
||||
|
||||
def test_form_draft_temporary_access_url(pub):
|
||||
FormDef.wipe()
|
||||
|
|
|
@ -1626,6 +1626,57 @@ def test_card_update_related_cascading_loop(pub):
|
|||
assert carddata2.data['2_display'] == 'card1 card2 card1 None'
|
||||
|
||||
|
||||
def test_card_update_related_items_relation(pub):
|
||||
CardDef.wipe()
|
||||
FormDef.wipe()
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'foo'
|
||||
carddef.fields = [
|
||||
StringField(id='1', label='Test', varname='foo'),
|
||||
]
|
||||
carddef.digest_templates = {'default': '{{ form_var_foo }}'}
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata1 = carddef.data_class()()
|
||||
carddata1.data = {'1': 'card1'}
|
||||
carddata1.just_created()
|
||||
carddata1.store()
|
||||
|
||||
carddata2 = carddef.data_class()()
|
||||
carddata2.data = {'1': 'card2'}
|
||||
carddata2.just_created()
|
||||
carddata2.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foo'
|
||||
formdef.fields = [
|
||||
ItemField(id='1', label='Test', data_source={'type': 'carddef:foo'}),
|
||||
ItemsField(id='2', label='Test2', data_source={'type': 'carddef:foo'}),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.data = {'1': '1', '2': ['1', '2']}
|
||||
formdata.data['1_display'] = formdef.fields[0].store_display_value(formdata.data, formdef.fields[0].id)
|
||||
formdata.data['2_display'] = formdef.fields[1].store_display_value(formdata.data, formdef.fields[1].id)
|
||||
assert formdata.data['1_display'] == 'card1'
|
||||
assert formdata.data['2_display'] == 'card1, card2'
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
pub.cleanup()
|
||||
carddef = carddef.get(carddef.id)
|
||||
carddata1 = carddef.data_class().get(carddata1.id)
|
||||
carddata1.data = {'1': 'card1-change1'}
|
||||
carddata1.store()
|
||||
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.data['1_display'] == 'card1-change1'
|
||||
assert formdata.data['2_display'] == 'card1-change1, card2'
|
||||
|
||||
|
||||
def test_card_update_related_deleted(pub):
|
||||
BlockDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
|
|
@ -1997,6 +1997,31 @@ def test_lazy_formdata_queryset_filter_non_unique_varname(pub, variable_test_dat
|
|||
assert tmpl.render(context) == '1'
|
||||
|
||||
|
||||
def test_filter_on_page_field(pub):
|
||||
pub.loggederror_class.wipe()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.fields = [
|
||||
fields.PageField(id='1', label='Page', varname='page'),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
data_class = formdef.data_class()
|
||||
formdata = data_class()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
context = pub.substitutions.get_context_variables(mode='lazy')
|
||||
|
||||
tmpl = Template('{{forms|objects:"test"|filter_by:"page"|filter_value:"100"}}')
|
||||
tmpl.render(context)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid filter "page"'
|
||||
|
||||
|
||||
def test_numeric_filter_on_string(pub):
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
|
|
|
@ -76,6 +76,12 @@ HOBO_JSON = {
|
|||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
'service-id': 'lingo',
|
||||
'title': 'Lingo',
|
||||
'base_url': 'http://payment.example.net/',
|
||||
'secret_key': 'aaa',
|
||||
},
|
||||
],
|
||||
'profile': {
|
||||
'fields': [
|
||||
|
@ -293,6 +299,7 @@ def test_configure_site_options(setuptest, alt_tempdir):
|
|||
assert pub.get_site_option('xxx', 'variables') == 'HELLO WORLD'
|
||||
assert pub.get_site_option('portal_agent_url', 'variables') == 'http://agents.example.net/'
|
||||
assert pub.get_site_option('portal_url', 'variables') == 'http://portal.example.net/'
|
||||
assert pub.get_site_option('lingo_url', 'variables') == 'http://payment.example.net/'
|
||||
assert pub.get_site_option('test_wcs_url', 'variables') == 'http://wcs.example.net/'
|
||||
assert pub.get_site_option('disable_cron_jobs', 'variables') == 'True'
|
||||
assert pub.get_site_option('maintenance_page', 'variables') == 'True'
|
||||
|
|
|
@ -1183,6 +1183,45 @@ def test_sql_criteria_fts(pub):
|
|||
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
|
||||
|
||||
|
||||
def test_search_tokens_purge(pub):
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
|
||||
# purge garbage from other tests
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
start = cur.fetchone()[0]
|
||||
|
||||
# define a new table
|
||||
test_formdef = FormDef()
|
||||
test_formdef.name = 'tableSelectFTStokens'
|
||||
test_formdef.fields = [fields.StringField(id='3', label='string')]
|
||||
test_formdef.store()
|
||||
data_class = test_formdef.data_class(mode='sql')
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 1
|
||||
|
||||
t = data_class()
|
||||
t.data = {'3': 'foofortokensofcourse'}
|
||||
t.just_created()
|
||||
t.store()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 2
|
||||
|
||||
t.data = {'3': 'chaussettefortokensofcourse'}
|
||||
t.store()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 3
|
||||
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 2
|
||||
|
||||
|
||||
def table_exists(cur, table_name):
|
||||
cur.execute(
|
||||
'''SELECT COUNT(*) FROM information_schema.tables
|
||||
|
@ -1692,6 +1731,26 @@ def test_load_all_evolutions_on_any_formdata(pub):
|
|||
assert len([x for x in objects if x._evolution is not None]) == 100
|
||||
|
||||
|
||||
def test_store_on_any_formdata(pub):
|
||||
drop_formdef_tables()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test any store'
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
|
||||
data_class = formdef.data_class(mode='sql')
|
||||
formdata = data_class()
|
||||
formdata.just_created()
|
||||
formdata.receipt_time = localtime()
|
||||
formdata.store()
|
||||
|
||||
objects = sql.AnyFormData.select()
|
||||
assert len(objects) == 1
|
||||
with pytest.raises(TypeError):
|
||||
objects[0].store()
|
||||
|
||||
|
||||
def test_geoloc_in_global_view(pub):
|
||||
drop_formdef_tables()
|
||||
|
||||
|
|
|
@ -9,7 +9,12 @@ from wcs.qommon.http_request import HTTPRequest
|
|||
from wcs.testdef import TestDef, WebserviceResponse
|
||||
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
||||
from wcs.workflow_tests import WorkflowTestError
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
|
||||
from wcs.workflows import (
|
||||
Workflow,
|
||||
WorkflowBackofficeFieldsFormDef,
|
||||
WorkflowCriticalityLevel,
|
||||
WorkflowStatusItem,
|
||||
)
|
||||
|
||||
from .backoffice_pages.test_all import create_user
|
||||
from .utilities import create_temporary_pub, get_app, login
|
||||
|
@ -204,6 +209,65 @@ def test_workflow_tests_button_click(pub):
|
|||
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
|
||||
|
||||
|
||||
def test_workflow_tests_button_click_global_action(pub):
|
||||
role = pub.role_class(name='test role')
|
||||
role.store()
|
||||
user = pub.user_class(name='test user')
|
||||
user.roles = [role.id]
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
workflow.add_status(name='New status')
|
||||
end_status = workflow.add_status(name='End status')
|
||||
|
||||
global_action = workflow.add_global_action('Go to end status')
|
||||
global_action.triggers[0].roles = [role.id]
|
||||
|
||||
sendmail = global_action.add_action('sendmail')
|
||||
sendmail.to = ['test@example.org']
|
||||
sendmail.subject = 'In new status'
|
||||
sendmail.body = 'xxx'
|
||||
|
||||
jump = global_action.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.ButtonClick(button_name='Go to end status'),
|
||||
workflow_tests.AssertEmail(),
|
||||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
]
|
||||
testdef.run(formdef)
|
||||
|
||||
# hide button from test user
|
||||
user.roles = []
|
||||
user.store()
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Button "Go to end status" is not displayed.'
|
||||
|
||||
|
||||
def test_workflow_tests_button_click_who(pub):
|
||||
role = pub.role_class(name='test role')
|
||||
role.store()
|
||||
|
@ -603,6 +667,230 @@ def test_workflow_tests_sms(pub):
|
|||
assert 'SMS body: "Hello"' in excinfo.value.details
|
||||
|
||||
|
||||
def test_workflow_tests_anonymise(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertAnonymise(),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Form was not anonymised.'
|
||||
|
||||
anonymise_action = new_status.add_action('anonymise')
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
anonymise_action.mode = 'intermediate'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
anonymise_action.mode = 'unlink_user'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
|
||||
def test_workflow_tests_redirect(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertRedirect(url='https://example.com/'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'No redirection occured.'
|
||||
|
||||
redirect_action = new_status.add_action('redirect_to_url')
|
||||
redirect_action.url = 'https://test.com/'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert (
|
||||
str(excinfo.value)
|
||||
== 'Expected redirection to https://example.com/ but was redirected to https://test.com/.'
|
||||
)
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertRedirect(url='https://test.com/'),
|
||||
]
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
|
||||
def test_workflow_tests_history_message(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertHistoryMessage(message='Hello 42'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'No history message.'
|
||||
|
||||
register_comment = new_status.add_action('register-comment')
|
||||
register_comment.comment = 'Hello {{ 41|add:1 }}'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertHistoryMessage(message='Hello 43'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Wrong history message content.'
|
||||
assert 'Displayed history message: <div>Hello 42</div>' in excinfo.value.details
|
||||
assert 'Expected history message: Hello 43' in excinfo.value.details
|
||||
|
||||
|
||||
def test_workflow_tests_alert(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertAlert(message='Hello 42'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'No alert matching message.'
|
||||
assert 'Displayed alerts: None' in excinfo.value.details
|
||||
assert 'Expected alert: Hello 42' in excinfo.value.details
|
||||
|
||||
alert = new_status.add_action('displaymsg')
|
||||
alert.message = 'Hello {{ 41|add:1 }}'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertAlert(message='Hello 43'),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'No alert matching message.'
|
||||
assert 'Displayed alerts: <p>Hello 42</p>' in excinfo.value.details
|
||||
assert 'Expected alert: Hello 43' in excinfo.value.details
|
||||
|
||||
|
||||
def test_workflow_tests_criticality(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
green_level = WorkflowCriticalityLevel(name='green')
|
||||
red_level = WorkflowCriticalityLevel(name='red')
|
||||
workflow.criticality_levels = [green_level, red_level]
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertCriticality(level_id=red_level.id),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Form should have criticality level "red" but has level "green".'
|
||||
|
||||
new_status.add_action('modify_criticality')
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
workflow.criticality_levels = []
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Broken, missing criticality level'
|
||||
|
||||
|
||||
def test_workflow_tests_backoffice_fields(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
@ -701,7 +989,7 @@ def test_workflow_tests_webservice(pub):
|
|||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
|
@ -716,7 +1004,7 @@ def test_workflow_tests_webservice(pub):
|
|||
assert str(excinfo.value) == 'Webservice response Fake response was used 2 times (instead of 1).'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=2),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=2),
|
||||
]
|
||||
|
||||
testdef.run(formdef)
|
||||
|
@ -733,8 +1021,8 @@ def test_workflow_tests_webservice(pub):
|
|||
response2.store()
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response2.id, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response2.uuid, call_count=1),
|
||||
]
|
||||
|
||||
testdef.run(formdef)
|
||||
|
@ -744,8 +1032,8 @@ def test_workflow_tests_webservice(pub):
|
|||
testdef.run(formdef)
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=1),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
|
@ -753,7 +1041,7 @@ def test_workflow_tests_webservice(pub):
|
|||
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=0),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid=response.uuid, call_count=0),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
|
@ -761,7 +1049,7 @@ def test_workflow_tests_webservice(pub):
|
|||
assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id='xxx', call_count=1),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_uuid='xxx', call_count=1),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
|
@ -834,6 +1122,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
|||
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
|
||||
status_with_button = workflow.add_status('Status with button', 'status-with-button')
|
||||
transition_status = workflow.add_status('Transition status', 'transition-status')
|
||||
transition_status2 = workflow.add_status('Transition status 2', 'transition-status-2')
|
||||
end_status = workflow.add_status('End status', 'end-status')
|
||||
|
||||
jump = new_status.add_action('jump')
|
||||
|
@ -864,7 +1153,24 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
|||
sendsms.to = ['0123456789']
|
||||
sendsms.body = 'Hello'
|
||||
|
||||
jump = transition_status.add_action('jump')
|
||||
anonymise_action = transition_status.add_action('anonymise')
|
||||
anonymise_action.mode = 'intermediate'
|
||||
|
||||
redirect_action = transition_status.add_action('redirect_to_url')
|
||||
redirect_action.url = 'https://test.com/'
|
||||
|
||||
register_comment = transition_status.add_action('register-comment')
|
||||
register_comment.comment = 'Hello'
|
||||
|
||||
transition_status.add_action('modify_criticality')
|
||||
|
||||
global_action = workflow.add_global_action('Action 1')
|
||||
global_action.triggers[0].roles = [role.id]
|
||||
|
||||
jump = global_action.add_action('jump')
|
||||
jump.status = transition_status2.id
|
||||
|
||||
jump = transition_status2.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
@ -889,6 +1195,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
|||
app = login(get_app(pub))
|
||||
resp = app.get(formdata.get_url())
|
||||
resp.form.submit('button1').follow()
|
||||
resp.form.submit('button-action-1').follow()
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.status == 'wf-end-status'
|
||||
|
||||
|
@ -896,7 +1203,7 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
|||
testdef.run(formdef)
|
||||
|
||||
actions = testdef.workflow_tests.actions
|
||||
assert len(actions) == 9
|
||||
assert len(actions) == 15
|
||||
|
||||
assert actions[0].key == 'assert-status'
|
||||
assert actions[0].status_name == 'Status with timeout jump'
|
||||
|
@ -914,6 +1221,16 @@ def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
|||
assert actions[5].key == 'assert-email'
|
||||
assert actions[6].key == 'assert-backoffice-field'
|
||||
assert actions[7].key == 'assert-sms'
|
||||
assert actions[8].key == 'assert-anonymise'
|
||||
assert actions[9].key == 'assert-redirect'
|
||||
assert actions[10].key == 'assert-history-message'
|
||||
assert actions[11].key == 'assert-criticality'
|
||||
|
||||
assert actions[12].key == 'assert-status'
|
||||
assert actions[12].status_name == 'Transition status'
|
||||
|
||||
assert actions[13].key == 'button-click'
|
||||
assert actions[13].button_name == 'Action 1'
|
||||
|
||||
assert actions[-1].key == 'assert-status'
|
||||
assert actions[-1].status_name == 'End status'
|
||||
|
|
|
@ -152,7 +152,7 @@ class BlockDirectory(FieldsDirectory):
|
|||
'Save snapshot'
|
||||
)
|
||||
r += htmltext('<li><a class="button button-paragraph" rel="popup" href="overwrite">%s</a>') % _(
|
||||
'Overwrite'
|
||||
'Overwrite with new import'
|
||||
)
|
||||
r += htmltext('</ul>')
|
||||
r += htmltext('<h3>%s</h3>') % _('Navigation')
|
||||
|
|
|
@ -196,7 +196,20 @@ class FieldDefPage(Directory):
|
|||
to_be_deleted.reverse()
|
||||
# add delete_fields checkbox only if the page has fields
|
||||
if to_be_deleted:
|
||||
form.add(CheckboxWidget, 'delete_fields', title=_('Also remove all fields from the page'))
|
||||
form.add(
|
||||
CheckboxWidget,
|
||||
'delete_fields',
|
||||
title=_('Also remove all fields from the page'),
|
||||
attrs={'data-dynamic-display-parent': 'true'},
|
||||
)
|
||||
form.widgets.append(
|
||||
HtmlWidget(
|
||||
'<div class="warningnotice" '
|
||||
'data-dynamic-display-child-of="delete_fields" '
|
||||
'data-dynamic-display-checked="true">%s</div>'
|
||||
% _('Warning: the page fields data will be permanently deleted.')
|
||||
)
|
||||
)
|
||||
form.add_submit('delete', _('Delete'))
|
||||
form.add_submit('cancel', _('Cancel'))
|
||||
if form.get_widget('cancel').parse():
|
||||
|
|
|
@ -30,6 +30,7 @@ from wcs.carddef import CardDef
|
|||
from wcs.categories import Category
|
||||
from wcs.formdef import (
|
||||
DRAFTS_DEFAULT_LIFESPAN,
|
||||
DRAFTS_DEFAULT_MAX_PER_USER,
|
||||
FormDef,
|
||||
FormdefImportError,
|
||||
FormdefImportRecoverableError,
|
||||
|
@ -290,6 +291,23 @@ class OptionsDirectory(Directory):
|
|||
widget.validation_function = check_lifespan
|
||||
widget.validation_function_error_message = _('Lifespan must be between 2 and 100 days.')
|
||||
|
||||
widget = form.add(
|
||||
WcsExtraStringWidget,
|
||||
'drafts_max_per_user',
|
||||
title=_('Maximum number of drafts per user (between 2 and 100)'),
|
||||
value=self.formdef.drafts_max_per_user,
|
||||
hint=_('%s drafts per user by default') % DRAFTS_DEFAULT_MAX_PER_USER,
|
||||
)
|
||||
|
||||
def check_max_per_user(value):
|
||||
try:
|
||||
return bool(int(value) >= 2 and int(value) <= 100)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
widget.validation_function = check_max_per_user
|
||||
widget.validation_function_error_message = _('Maximum must be between 2 and 100 drafts.')
|
||||
|
||||
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Tracking Code')))
|
||||
form.add(
|
||||
CheckboxWidget,
|
||||
|
@ -495,6 +513,7 @@ class OptionsDirectory(Directory):
|
|||
'id_template',
|
||||
'submission_lateral_template',
|
||||
'drafts_lifespan',
|
||||
'drafts_max_per_user',
|
||||
'user_support',
|
||||
'management_sidebar_items',
|
||||
]
|
||||
|
|
|
@ -155,6 +155,7 @@ class TestPage(FormBackOfficeStatusPage):
|
|||
self.testdef = TestDef.get(component)
|
||||
except KeyError:
|
||||
raise TraversalError()
|
||||
self.testdef.formdef = objectdef
|
||||
|
||||
filled = self.testdef.build_formdata(objectdef, include_fields=True)
|
||||
super().__init__(objectdef, filled)
|
||||
|
|
|
@ -228,10 +228,16 @@ class CardPage(FormPage):
|
|||
form.add(
|
||||
CheckboxWidget,
|
||||
'update_existing_cards',
|
||||
title=_('Update existing cards (only for JSON imports)'),
|
||||
title=_('Update existing cards (only for JSON imports)')
|
||||
if not self.formdef.id_template
|
||||
else _('Update existing cards'),
|
||||
hint=_('Cards will be matched using their unique identifier ("uuid" property).')
|
||||
if not self.formdef.id_template
|
||||
else _('Cards will be matched using their custom identifier ("id" property).'),
|
||||
else _(
|
||||
'Cards will be matched using their custom identifier ("id" property). '
|
||||
'If this option is enabled cards with the same identifiers will be updated, '
|
||||
'otherwise they will be skipped.'
|
||||
),
|
||||
value=False,
|
||||
)
|
||||
form.add_submit('submit', _('Submit'))
|
||||
|
@ -241,19 +247,22 @@ class CardPage(FormPage):
|
|||
|
||||
if form.is_submitted() and not form.has_errors():
|
||||
file_content = form.get_widget('file').parse().fp.read()
|
||||
update_existing_cards = form.get_widget('update_existing_cards').parse()
|
||||
try:
|
||||
json_content = json.loads(file_content)
|
||||
except ValueError:
|
||||
# not json -> CSV
|
||||
try:
|
||||
return self.import_csv_submit(file_content, submission_agent_id=get_request().user.id)
|
||||
return self.import_csv_submit(
|
||||
file_content,
|
||||
update_existing_cards=update_existing_cards,
|
||||
submission_agent_id=get_request().user.id,
|
||||
)
|
||||
except ValueError as e:
|
||||
form.set_error('file', e)
|
||||
else:
|
||||
try:
|
||||
return self.import_json_submit(
|
||||
json_content, update_existing_cards=form.get_widget('update_existing_cards').parse()
|
||||
)
|
||||
return self.import_json_submit(json_content, update_existing_cards=update_existing_cards)
|
||||
except ValueError as e:
|
||||
form.set_error('file', e)
|
||||
|
||||
|
@ -275,7 +284,9 @@ class CardPage(FormPage):
|
|||
impossible_fields.append(field.label)
|
||||
return impossible_fields
|
||||
|
||||
def import_csv_submit(self, content, afterjob=True, api=False, submission_agent_id=None):
|
||||
def import_csv_submit(
|
||||
self, content, afterjob=True, api=False, update_existing_cards=False, submission_agent_id=None
|
||||
):
|
||||
if b'\0' in content:
|
||||
raise ValueError(_('Invalid file format.'))
|
||||
|
||||
|
@ -328,7 +339,10 @@ class CardPage(FormPage):
|
|||
raise ValueError(error_message)
|
||||
|
||||
job = ImportFromCsvAfterJob(
|
||||
carddef=self.formdef, data_lines=data_lines, submission_agent_id=submission_agent_id
|
||||
carddef=self.formdef,
|
||||
data_lines=data_lines,
|
||||
update_existing_cards=update_existing_cards,
|
||||
submission_agent_id=submission_agent_id,
|
||||
)
|
||||
if afterjob:
|
||||
get_response().add_after_job(job)
|
||||
|
@ -432,12 +446,13 @@ class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
|
|||
|
||||
|
||||
class ImportFromCsvAfterJob(AfterJob):
|
||||
def __init__(self, carddef, data_lines, submission_agent_id):
|
||||
def __init__(self, carddef, data_lines, update_existing_cards, submission_agent_id):
|
||||
super().__init__(
|
||||
label=_('Importing data into cards'),
|
||||
carddef_class=carddef.__class__,
|
||||
carddef_id=carddef.id,
|
||||
data_lines=data_lines,
|
||||
update_existing_cards=update_existing_cards,
|
||||
submission_agent_id=submission_agent_id,
|
||||
)
|
||||
|
||||
|
@ -448,6 +463,7 @@ class ImportFromCsvAfterJob(AfterJob):
|
|||
|
||||
def execute(self):
|
||||
self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
|
||||
update_existing_cards = self.kwargs['update_existing_cards']
|
||||
carddata_class = self.carddef.data_class()
|
||||
self.submission_agent_id = self.kwargs['submission_agent_id']
|
||||
self.total_count = len(self.kwargs['data_lines'])
|
||||
|
@ -507,6 +523,9 @@ class ImportFromCsvAfterJob(AfterJob):
|
|||
except KeyError:
|
||||
pass # unique id, fine
|
||||
else:
|
||||
if not update_existing_cards:
|
||||
self.increment_count()
|
||||
continue
|
||||
# overwrite (only fields from CSV columns, not unsupported or backoffice fields)
|
||||
new_card = False
|
||||
orig_data = copy.copy(carddata_with_same_id.data)
|
||||
|
|
|
@ -495,6 +495,10 @@ class Command(TenantCommand):
|
|||
variables['portal_user_url'] = service_url
|
||||
variables['portal_user_title'] = service.get('title')
|
||||
config.set('options', 'theme_skeleton_url', service.get('base_url') + '__skeleton__/')
|
||||
|
||||
if service.get('service-id') == 'lingo':
|
||||
variables['lingo_url'] = urllib.parse.urljoin(service_url, '/')
|
||||
|
||||
for legacy_url in service.get('legacy_urls', []):
|
||||
legacy_domain = urllib.parse.urlparse(legacy_url['base_url']).netloc.split(':')[0]
|
||||
legacy_urls[legacy_domain] = domain
|
||||
|
|
|
@ -61,6 +61,7 @@ from .qommon.upload_storage import PicklableUpload
|
|||
from .roles import logged_users_role
|
||||
|
||||
DRAFTS_DEFAULT_LIFESPAN = 100 # days
|
||||
DRAFTS_DEFAULT_MAX_PER_USER = 5
|
||||
|
||||
if not hasattr(types, 'ClassType'):
|
||||
types.ClassType = type
|
||||
|
@ -190,6 +191,7 @@ class FormDef(StorableObject):
|
|||
submission_lateral_template = None
|
||||
id_template = None
|
||||
drafts_lifespan = None
|
||||
drafts_max_per_user = None
|
||||
user_support = None
|
||||
|
||||
geolocations = None
|
||||
|
@ -220,6 +222,7 @@ class FormDef(StorableObject):
|
|||
'submission_lateral_template',
|
||||
'id_template',
|
||||
'drafts_lifespan',
|
||||
'drafts_max_per_user',
|
||||
'user_support',
|
||||
]
|
||||
BOOLEAN_ATTRIBUTES = [
|
||||
|
@ -573,9 +576,11 @@ class FormDef(StorableObject):
|
|||
def get_all_fields(self):
|
||||
return (self.fields or []) + self.workflow.get_backoffice_fields()
|
||||
|
||||
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True):
|
||||
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True, with_no_data_fields=True):
|
||||
def _iter_fields(fields, block_field=None):
|
||||
for field in fields:
|
||||
if with_no_data_fields is False and field.is_no_data_field:
|
||||
continue
|
||||
# add contextual_id/contextual_varname attributes
|
||||
# they are id/varname for normal fields
|
||||
# but in case of blocks they are concatenation of block id/varname + field id/varname
|
||||
|
@ -634,6 +639,9 @@ class FormDef(StorableObject):
|
|||
def get_drafts_lifespan(self):
|
||||
return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN)
|
||||
|
||||
def get_drafts_max_per_user(self):
|
||||
return int(self.drafts_max_per_user or DRAFTS_DEFAULT_MAX_PER_USER)
|
||||
|
||||
_workflow = None
|
||||
|
||||
def get_workflow(self):
|
||||
|
|
|
@ -1030,7 +1030,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
|||
return Directory._q_lookup(self, component)
|
||||
|
||||
def _q_traverse(self, path):
|
||||
get_response().breadcrumb.append((str(self.filled.id) + '/', self.filled.get_display_id()))
|
||||
get_response().breadcrumb.append((self.filled.identifier + '/', self.filled.get_display_id()))
|
||||
return super()._q_traverse(path)
|
||||
|
||||
def wfedit(self, action_id):
|
||||
|
|
|
@ -1778,11 +1778,11 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
|
|||
if get_session().mark_anonymous_formdata(filled):
|
||||
get_session().store()
|
||||
elif new_draft:
|
||||
# keep at most 5 drafts per user
|
||||
# keep at most "max_per_user" drafts per user
|
||||
data_class = self.formdef.data_class()
|
||||
for id in data_class.get_sorted_ids(
|
||||
'-last_update_time', [Equal('status', 'draft'), Equal('user_id', str(filled.user_id))]
|
||||
)[5:]:
|
||||
)[self.formdef.get_drafts_max_per_user() :]:
|
||||
data_class.remove_object(id)
|
||||
|
||||
if new_draft:
|
||||
|
|
|
@ -4,8 +4,8 @@ msgid ""
|
|||
msgstr ""
|
||||
"Project-Id-Version: wcs 0\n"
|
||||
"Report-Msgid-Bugs-To: \n"
|
||||
"POT-Creation-Date: 2024-03-18 14:22+0100\n"
|
||||
"PO-Revision-Date: 2024-03-18 14:22+0100\n"
|
||||
"POT-Creation-Date: 2024-03-21 19:00+0100\n"
|
||||
"PO-Revision-Date: 2024-03-21 19:00+0100\n"
|
||||
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
|
||||
"Language-Team: french\n"
|
||||
"Language: fr\n"
|
||||
|
@ -18,7 +18,7 @@ msgstr ""
|
|||
#: admin/data_sources.py admin/forms.py admin/mail_templates.py admin/tests.py
|
||||
#: admin/users.py admin/workflows.py admin/wscalls.py backoffice/management.py
|
||||
#: fields/base.py qommon/ident/franceconnect.py
|
||||
#: templates/wcs/backoffice/test-result.html wf/profile.py
|
||||
#: templates/wcs/backoffice/test-result.html wf/profile.py workflow_tests.py
|
||||
msgid "Name"
|
||||
msgstr "Nom"
|
||||
|
||||
|
@ -866,7 +866,7 @@ msgstr ""
|
|||
|
||||
#: admin/fields.py admin/settings.py admin/users.py backoffice/management.py
|
||||
#: data_sources.py fields/base.py qommon/form.py qommon/ident/password.py
|
||||
#: statistics/views.py wf/create_formdata.py
|
||||
#: statistics/views.py wf/create_formdata.py workflow_tests.py
|
||||
msgid "None"
|
||||
msgstr "Aucun"
|
||||
|
||||
|
@ -931,6 +931,12 @@ msgstr "Vous allez supprimer le champ « %s »."
|
|||
msgid "Also remove all fields from the page"
|
||||
msgstr "Supprimer tous les champs de la page"
|
||||
|
||||
#: admin/fields.py
|
||||
msgid "Warning: the page fields data will be permanently deleted."
|
||||
msgstr ""
|
||||
"Attention : les informations contenues dans les champs de la page seront "
|
||||
"perdues de façon irréversible."
|
||||
|
||||
#: admin/fields.py
|
||||
#, python-format
|
||||
msgid "Deletion of field \"%s\""
|
||||
|
@ -1166,6 +1172,19 @@ msgstr "Par défaut les brouillons sont supprimés après %s jours."
|
|||
msgid "Lifespan must be between 2 and 100 days."
|
||||
msgstr "La durée de vie doit être entre 2 et 100 jours."
|
||||
|
||||
#: admin/forms.py
|
||||
msgid "Maximum number of drafts per user (between 2 and 100)"
|
||||
msgstr "Nombre maximum de brouillons par utilisateur (entre 2 et 100)"
|
||||
|
||||
#: admin/forms.py
|
||||
#, python-format
|
||||
msgid "%s drafts per user by default"
|
||||
msgstr "%s brouillons par utilisateur par défaut"
|
||||
|
||||
#: admin/forms.py
|
||||
msgid "Maximum must be between 2 and 100 drafts."
|
||||
msgstr "Le nombre maximum doit être entre 2 et 100 brouillons."
|
||||
|
||||
#: admin/forms.py backoffice/management.py backoffice/submission.py
|
||||
#: forms/root.py
|
||||
msgid "Tracking Code"
|
||||
|
@ -2369,7 +2388,7 @@ msgid "Sender (number or name)"
|
|||
msgstr "Expéditeur (nom ou numéro)"
|
||||
|
||||
#: admin/settings.py admin/tests.py wf/notification.py wf/redirect_to_url.py
|
||||
#: wf/wscall.py wscalls.py
|
||||
#: wf/wscall.py workflow_tests.py wscalls.py
|
||||
msgid "URL"
|
||||
msgstr "URL"
|
||||
|
||||
|
@ -3940,6 +3959,10 @@ msgid "Update existing cards (only for JSON imports)"
|
|||
msgstr ""
|
||||
"Mettre à jour les fiches existantes (uniquement pour les fichiers JSON)"
|
||||
|
||||
#: backoffice/data_management.py
|
||||
msgid "Update existing cards"
|
||||
msgstr "Mettre à jour les fiches existantes"
|
||||
|
||||
#: backoffice/data_management.py
|
||||
msgid ""
|
||||
"Cards will be matched using their unique identifier (\"uuid\" property)."
|
||||
|
@ -3948,10 +3971,15 @@ msgstr ""
|
|||
"identifiant unique (propriété « uuid »)."
|
||||
|
||||
#: backoffice/data_management.py
|
||||
msgid "Cards will be matched using their custom identifier (\"id\" property)."
|
||||
msgid ""
|
||||
"Cards will be matched using their custom identifier (\"id\" property). If "
|
||||
"this option is enabled cards with the same identifiers will be updated, "
|
||||
"otherwise they will be skipped."
|
||||
msgstr ""
|
||||
"La correspondance avec les fiches existantes se fera sur base de leur "
|
||||
"identifiant personnalisé (propriété « id »)."
|
||||
"identifiant personnalisé (propriété « id »). Si cette option est activée les "
|
||||
"fiches avec un identifiant existant seront mises à jour, sinon elles seront "
|
||||
"ignorées."
|
||||
|
||||
#: backoffice/data_management.py backoffice/i18n.py
|
||||
#: templates/wcs/backoffice/card-data-import-form.html
|
||||
|
@ -9728,6 +9756,10 @@ msgstr "Contenu de la barre latéral pour le traitement"
|
|||
msgid "Tracking codes"
|
||||
msgstr "Codes de suivi"
|
||||
|
||||
#: templates/wcs/backoffice/formdef-inspect.html
|
||||
msgid "Maximum number of drafts per user"
|
||||
msgstr "Nombre maximum de brouillons par utilisateur"
|
||||
|
||||
#: templates/wcs/backoffice/formdef-inspect.html
|
||||
msgid "Redirection when disabled"
|
||||
msgstr "Redirection quand désactivé"
|
||||
|
@ -11392,7 +11424,7 @@ msgstr "Erreur dans le gabarit du message de workflow (%s)"
|
|||
msgid "Error rendering message."
|
||||
msgstr "Erreur de rendu du message."
|
||||
|
||||
#: wf/display_message.py wf/register_comment.py
|
||||
#: wf/display_message.py wf/register_comment.py workflow_tests.py
|
||||
msgid "Message"
|
||||
msgstr "Message"
|
||||
|
||||
|
@ -12422,6 +12454,104 @@ msgstr "Numéros de téléphone"
|
|||
msgid "Add phone number"
|
||||
msgstr "Ajouter un numéro de téléphone"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assert anonymisation is performed"
|
||||
msgstr "Vérifier que l’anonymisation a lieu"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Form was not anonymised."
|
||||
msgstr "La demande n’a pas été anonymisée."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assert redirect is performed"
|
||||
msgstr "Vérifier qu’une redirection a lieu"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "No redirection occured."
|
||||
msgstr "Aucune redirection n’a eu lieu."
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Expected redirection to %(expected_url)s but was redirected to %(url)s."
|
||||
msgstr ""
|
||||
"Une redirection vers « %(expected_url)s » était attendue mais une "
|
||||
"redirection vers « %(url)s » a eu lieu."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assert history message is displayed"
|
||||
msgstr "Vérifier l’affichage d’un message dans l’historique"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "No history message."
|
||||
msgstr "Pas de message dans l’historique."
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Displayed history message: %s"
|
||||
msgstr "Message affiché dans l’historique : %s"
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Expected history message: %s"
|
||||
msgstr "Message attendu dans l’historique : %s"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Wrong history message content."
|
||||
msgstr "Contenu du message dans l’historique incorrect."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assertion will pass if the text is contained in history message."
|
||||
msgstr "La vérification réussira si le texte est contenu dans le message."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assert alert is displayed"
|
||||
msgstr "Vérifier l’affichage d’une alerte"
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Displayed alerts: %s"
|
||||
msgstr "Alertes affichées : %s"
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Expected alert: %s"
|
||||
msgstr "Alerte attendue : %s"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "No alert matching message."
|
||||
msgstr "Pas d’alerte correspondant au message attendu."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assertion will pass if the text is contained in alert message."
|
||||
msgstr ""
|
||||
"La vérification réussira si le texte est contenu dans le message d’alerte."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Assert criticality level"
|
||||
msgstr "Vérifier le niveau de criticité"
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Workflow has no criticality levels."
|
||||
msgstr "Le workflow n’a pas de niveaux de criticité."
|
||||
|
||||
#: workflow_tests.py
|
||||
msgid "Broken, missing criticality level"
|
||||
msgstr "Cassé, niveau de criticité manquant"
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid "Criticality is \"%s\""
|
||||
msgstr "Le niveau de criticité est « %s »"
|
||||
|
||||
#: workflow_tests.py
|
||||
#, python-format
|
||||
msgid ""
|
||||
"Form should have criticality level \"%(expected_level)s\" but has level "
|
||||
"\"%(level)s\"."
|
||||
msgstr ""
|
||||
"La demande devrait avoir le niveau de criticité « %(expected_level)s » mais "
|
||||
"elle a le niveau « %(level)s »."
|
||||
|
||||
#: workflow_traces.py
|
||||
msgid "Created (by API)"
|
||||
msgstr "Création (par l’API)"
|
||||
|
|
|
@ -485,6 +485,7 @@ class WcsPublisher(QommonPublisher):
|
|||
for _formdef in FormDef.select() + CardDef.select():
|
||||
sql.do_formdef_tables(_formdef)
|
||||
sql.migrate_global_views(conn, cur)
|
||||
sql.init_search_tokens()
|
||||
cur.close()
|
||||
|
||||
def record_deprecated_usage(self, *args, **kwargs):
|
||||
|
|
|
@ -692,6 +692,11 @@ class QommonPublisher(Publisher):
|
|||
for error in self.loggederror_class.select(clause=clauses):
|
||||
self.loggederror_class.remove_object(error.id)
|
||||
|
||||
def clean_search_tokens(self, **kwargs):
|
||||
from wcs import sql
|
||||
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
@classmethod
|
||||
def register_cronjobs(cls):
|
||||
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
|
||||
|
@ -704,6 +709,9 @@ class QommonPublisher(Publisher):
|
|||
cls.register_cronjob(
|
||||
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
|
||||
)
|
||||
cls.register_cronjob(
|
||||
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens')
|
||||
)
|
||||
|
||||
_initialized = False
|
||||
|
||||
|
|
|
@ -113,9 +113,26 @@ function init_sync_from_template_address() {
|
|||
}
|
||||
}
|
||||
$(widget_selector).each(function(idx, elem) {
|
||||
// enable manual address mode if there is an error in one of the manual address fields.
|
||||
var $manual_checkbox = $(elem).find('input.wcs-manual-address');
|
||||
if ($(elem).nextUntil('.template-address', '[data-geolocation].widget-with-error').length) {
|
||||
$(elem).find('input.wcs-manual-address').prop('checked', true).trigger('change');
|
||||
// enable manual address mode if there is an error in one of the manual address fields.
|
||||
$manual_checkbox.prop('checked', true).trigger('change');
|
||||
} else {
|
||||
// enable manual address mode if a manual field has data while the select is empty
|
||||
// (typically when going back to a previous page)
|
||||
var has_val = $(elem).find('select').val();
|
||||
if (! has_val) {
|
||||
var has_manual_var = false;
|
||||
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('input').each(function(idx, manual_elem) {
|
||||
if ($(manual_elem).val()) has_manual_var = true;
|
||||
})
|
||||
$(elem).nextUntil('.template-address', 'div[data-geolocation]').find('textarea').each(function(idx, manual_elem) {
|
||||
if ($(manual_elem).val()) has_manual_var = true;
|
||||
})
|
||||
if (has_manual_var) {
|
||||
$manual_checkbox.prop('checked', true).trigger('change');
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -188,7 +188,7 @@ REQUESTS_CERT = {}
|
|||
DISABLE_CRON_JOBS = False
|
||||
|
||||
# w.c.s. can have very large forms, in backoffice and frontoffice
|
||||
DATA_UPLOAD_MAX_NUMBER_FIELDS = 2000 # Django default is 1000
|
||||
DATA_UPLOAD_MAX_NUMBER_FIELDS = 3000 # Django default is 1000
|
||||
|
||||
# workalendar config
|
||||
WORKING_DAY_CALENDAR = 'workalendar.europe.France'
|
||||
|
|
196
wcs/sql.py
196
wcs/sql.py
|
@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = {
|
|||
}
|
||||
|
||||
|
||||
def _table_exists(cur, table_name):
|
||||
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s', (table_name,))
|
||||
rows = cur.fetchall()
|
||||
return len(rows) > 0
|
||||
|
||||
|
||||
def _trigger_exists(cur, table_name, trigger_name):
|
||||
cur.execute(
|
||||
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s', (table_name, trigger_name)
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
return len(rows) > 0
|
||||
|
||||
|
||||
class WcsPgConnection(psycopg2.extensions.connection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -1582,6 +1596,8 @@ def do_global_views(conn, cur):
|
|||
% (name, category.id)
|
||||
)
|
||||
|
||||
init_search_tokens_triggers(cur)
|
||||
|
||||
|
||||
def clean_global_views(conn, cur):
|
||||
# Purge of any dead data
|
||||
|
@ -1674,11 +1690,178 @@ def init_global_table(conn=None, cur=None):
|
|||
endpoint_status=endpoint_status_filter,
|
||||
)
|
||||
)
|
||||
init_search_tokens_data(cur)
|
||||
|
||||
if own_conn:
|
||||
cur.close()
|
||||
|
||||
|
||||
def init_search_tokens(conn=None, cur=None):
|
||||
"""Initialize the search_tokens mechanism.
|
||||
|
||||
It's based on three parts:
|
||||
- a token table
|
||||
- triggers to feed this table from the tsvectors used in the database
|
||||
- a search function that will leverage these tokens to extend the search query.
|
||||
|
||||
So far, the sources used are wcs_all_forms and searchable_formdefs.
|
||||
|
||||
Example: let's say the sources texts are "Tarif d'école" and "La cantine".
|
||||
This gives the following tsvectors: ('tarif', 'écol') and ('cantin')
|
||||
Our tokens table will have these three words.
|
||||
When the search function is launched, it splits the search query and will
|
||||
replace unavailable tokens by those close, if available.
|
||||
The search query 'tari' will be expanded to 'tarif'.
|
||||
The search query 'collège' will remain unchanged (and return nothing)
|
||||
If several tokens match or are close enough, the query will be expanded to
|
||||
an OR.
|
||||
"""
|
||||
|
||||
own_cur = False
|
||||
if cur is None:
|
||||
own_cur = True
|
||||
conn, cur = get_connection_and_cursor()
|
||||
|
||||
# Create table
|
||||
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
|
||||
|
||||
# Create triggers
|
||||
init_search_tokens_triggers(cur)
|
||||
|
||||
# Fill table
|
||||
init_search_tokens_data(cur)
|
||||
|
||||
# Index at the end, small performance trick... not that useful, but it's free...
|
||||
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
|
||||
cur.execute(
|
||||
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
|
||||
)
|
||||
|
||||
# And last: functions to use this brand new table
|
||||
# These two aggregates make the search query far simpler to write
|
||||
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
|
||||
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
|
||||
cur.execute(
|
||||
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
|
||||
RETURNS tsquery
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $function$
|
||||
WITH
|
||||
tokenized AS (SELECT unnest(regexp_split_to_array($1, '\s+')) w),
|
||||
super_tokenized AS (
|
||||
-- perfect: tokens that are found as is in table, thus no OR required
|
||||
-- partial: tokens found using distance search on tokens table (note: numbers are excluded here)
|
||||
-- otherwise: token as is and likely no search result later
|
||||
SELECT w,
|
||||
coalesce((select plainto_tsquery(perfect.token) FROM wcs_search_tokens AS perfect WHERE perfect.token = plainto_tsquery(w)::text),
|
||||
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
|
||||
plainto_tsquery(w)) tokens
|
||||
FROM tokenized
|
||||
LEFT JOIN wcs_search_tokens AS partial ON partial.token % w AND w not similar to '%[0-9]{2,}%'
|
||||
GROUP BY w)
|
||||
SELECT tsquery_agg_and(tokens) FROM super_tokenized;
|
||||
$function$;"""
|
||||
)
|
||||
|
||||
if own_cur:
|
||||
cur.close()
|
||||
|
||||
|
||||
def init_search_tokens_triggers(cur):
|
||||
# We define only appending triggers, ie on INSERT and UPDATE.
|
||||
# It would be far heavier to maintain deletions here, and having extra data has
|
||||
# no or marginal side effect on search performances, and absolutely no impact
|
||||
# on search results.
|
||||
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
|
||||
# personal data is kept uselessly.
|
||||
|
||||
# First part: the appending function
|
||||
cur.execute(
|
||||
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$function$;"""
|
||||
)
|
||||
|
||||
if not (_table_exists(cur, 'wcs_search_tokens')):
|
||||
# abort trigger creation if tokens table doesn't exist yet
|
||||
return
|
||||
|
||||
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
|
||||
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
|
||||
):
|
||||
# Second part: insert and update triggers for wcs_all_forms
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
|
||||
AFTER INSERT ON wcs_all_forms
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
|
||||
AFTER UPDATE OF fts ON wcs_all_forms
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
|
||||
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
|
||||
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
|
||||
):
|
||||
# Third part: insert and update triggers for searchable_formdefs
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
|
||||
AFTER INSERT ON searchable_formdefs
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
|
||||
AFTER UPDATE OF fts ON searchable_formdefs
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
|
||||
|
||||
def init_search_tokens_data(cur):
|
||||
if not (_table_exists(cur, 'wcs_search_tokens')):
|
||||
# abort table data initialization if tokens table doesn't exist yet
|
||||
return
|
||||
|
||||
if _table_exists(cur, 'wcs_all_forms'):
|
||||
cur.execute(
|
||||
"""INSERT INTO wcs_search_tokens
|
||||
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
|
||||
ON CONFLICT(token) DO NOTHING;"""
|
||||
)
|
||||
if _table_exists(cur, 'searchable_formdefs'):
|
||||
cur.execute(
|
||||
"""INSERT INTO wcs_search_tokens
|
||||
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
|
||||
ON CONFLICT(token) DO NOTHING;"""
|
||||
)
|
||||
|
||||
|
||||
def purge_obsolete_search_tokens(cur=None):
|
||||
own_cur = False
|
||||
if cur is None:
|
||||
own_cur = True
|
||||
_, cur = get_connection_and_cursor()
|
||||
|
||||
cur.execute(
|
||||
"""DELETE FROM wcs_search_tokens
|
||||
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
|
||||
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
|
||||
)
|
||||
if own_cur:
|
||||
cur.close()
|
||||
|
||||
|
||||
class SqlMixin:
|
||||
_table_name = None
|
||||
_numerical_id = True
|
||||
|
@ -4725,6 +4908,8 @@ class AnyFormData(SqlMixin):
|
|||
# convert back unstructured geolocation to the 'native' formdata format.
|
||||
if o.geoloc_base_x is not None:
|
||||
o.geolocations = {'base': {'lon': o.geoloc_base_x, 'lat': o.geoloc_base_y}}
|
||||
# do not allow storing those partial objects
|
||||
o.store = None
|
||||
return o
|
||||
|
||||
@classmethod
|
||||
|
@ -4809,7 +4994,6 @@ class SearchableFormDef(SqlMixin):
|
|||
% (cls._table_name, cls._table_name)
|
||||
)
|
||||
cls.do_indexes(cur)
|
||||
cur.close()
|
||||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.formdef import FormDef
|
||||
|
@ -4818,6 +5002,8 @@ class SearchableFormDef(SqlMixin):
|
|||
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
|
||||
):
|
||||
cls.update(obj=objectdef)
|
||||
init_search_tokens(cur)
|
||||
cur.close()
|
||||
|
||||
@classmethod
|
||||
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
|
||||
|
@ -4855,7 +5041,7 @@ class SearchableFormDef(SqlMixin):
|
|||
def search(cls, obj_type, string):
|
||||
_, cur = get_connection_and_cursor()
|
||||
cur.execute(
|
||||
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)',
|
||||
'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
|
||||
(FtsMatch.get_fts_value(string),),
|
||||
)
|
||||
ids = [x[0] for x in cur.fetchall()]
|
||||
|
@ -5120,7 +5306,7 @@ def get_period_total(
|
|||
# latest migration, number + description (description is not used
|
||||
# programmaticaly but will make sure git conflicts if two migrations are
|
||||
# separately added with the same number)
|
||||
SQL_LEVEL = (106, 'add context column to logged_errors table')
|
||||
SQL_LEVEL = (107, 'new fts mechanism with tokens table')
|
||||
|
||||
|
||||
def migrate_global_views(conn, cur):
|
||||
|
@ -5454,6 +5640,10 @@ def migrate():
|
|||
for formdef in FormDef.select() + CardDef.select():
|
||||
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
|
||||
|
||||
if sql_level < 107:
|
||||
# 107: new fts mechanism with tokens table
|
||||
init_search_tokens()
|
||||
|
||||
if sql_level != SQL_LEVEL[0]:
|
||||
cur.execute(
|
||||
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',
|
||||
|
|
|
@ -379,6 +379,11 @@ class FtsMatch(Criteria):
|
|||
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
|
||||
|
||||
|
||||
class WcsFtsMatch(FtsMatch):
|
||||
def as_sql(self):
|
||||
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
|
||||
|
||||
|
||||
class ElementEqual(Criteria):
|
||||
def __init__(self, attribute, key, value, **kwargs):
|
||||
super().__init__(attribute, value)
|
||||
|
|
|
@ -67,6 +67,7 @@
|
|||
<li><span class="parameter">{% trans "Fields to check after entering the tracking code" %}{% trans ":" %}</span> {{ tracking_code_verify_fields_labels|default:"-" }}</li>
|
||||
{% endif %}
|
||||
<li><span class="parameter">{% trans "Lifespan of drafts (in days)" %}{% trans ":" %}</span> {{ formdef.get_drafts_lifespan }}</li>
|
||||
<li><span class="parameter">{% trans "Maximum number of drafts per user" %}{% trans ":" %}</span> {{ formdef.get_drafts_max_per_user }}</li>
|
||||
<li><span class="parameter">{% trans "Templates" %}</span>
|
||||
<ul>
|
||||
<li><span class="parameter">{% trans "Digest" %}{% trans ":" %}</span> {{ formdef.default_digest_template|default:"-" }}</li>
|
||||
|
|
|
@ -31,9 +31,11 @@
|
|||
</span>
|
||||
</span>
|
||||
<p class="commands">
|
||||
<span class="edit">
|
||||
<a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a>
|
||||
</span>
|
||||
{% if action.editable %}
|
||||
<span class="edit">
|
||||
<a href="{{ action.id }}/" rel="popup" title="{% trans "Edit" %}">{% trans "Edit" %}</a>
|
||||
</span>
|
||||
{% endif %}
|
||||
<span class="duplicate">
|
||||
<a href="{{ action.id }}/duplicate" title="{% trans "Duplicate" %}">{% trans "Duplicate" %}</a>
|
||||
</span>
|
||||
|
|
|
@ -21,6 +21,7 @@ import io
|
|||
import json
|
||||
import socket
|
||||
import urllib.parse
|
||||
import uuid
|
||||
import xml.etree.ElementTree as ET
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
@ -75,8 +76,8 @@ class TestDefXmlProxy(XmlStorableObject):
|
|||
}
|
||||
excluded_fields = ['id', 'object_type', 'object_id']
|
||||
extra_fields = [
|
||||
('workflow_tests', 'workflow_tests'),
|
||||
('_webservice_responses', 'webservice_responses'),
|
||||
('workflow_tests', 'workflow_tests'),
|
||||
]
|
||||
|
||||
return [
|
||||
|
@ -674,6 +675,7 @@ class WebserviceResponse(XmlStorableObject):
|
|||
_names = 'webservice-response'
|
||||
xml_root_node = 'webservice-response'
|
||||
|
||||
uuid = None
|
||||
testdef_id = None
|
||||
name = ''
|
||||
payload = None
|
||||
|
@ -684,6 +686,7 @@ class WebserviceResponse(XmlStorableObject):
|
|||
post_data = None
|
||||
|
||||
XML_NODES = [
|
||||
('uuid', 'str'),
|
||||
('testdef_id', 'int'),
|
||||
('name', 'str'),
|
||||
('payload', 'str'),
|
||||
|
@ -694,6 +697,10 @@ class WebserviceResponse(XmlStorableObject):
|
|||
('post_data', 'kv_data'),
|
||||
]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.uuid = str(uuid.uuid4())
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
|
|
@ -270,7 +270,7 @@ class LazyFormDefObjectsManager:
|
|||
return self._clone(self._criterias + [self._formdef.get_by_id_criteria(str(value))])
|
||||
|
||||
def get_fields(self, key):
|
||||
for field in self._formdef.iter_fields(include_block_fields=True):
|
||||
for field in self._formdef.iter_fields(include_block_fields=True, with_no_data_fields=False):
|
||||
if getattr(field, 'block_field', None):
|
||||
if field.key == 'items':
|
||||
# not yet
|
||||
|
@ -757,8 +757,6 @@ class LazyFormDef:
|
|||
|
||||
@property
|
||||
def option(self):
|
||||
if not self._formdef.workflow.variables_formdef:
|
||||
return {}
|
||||
return LazyFormDefOptions(self._formdef)
|
||||
|
||||
@property
|
||||
|
@ -1989,14 +1987,20 @@ class LazyRequest:
|
|||
class LazyFormDefOptions(LazyFormDataVar):
|
||||
def __init__(self, formdef):
|
||||
self._formdef = formdef
|
||||
fields = self._formdef.workflow.variables_formdef.fields
|
||||
try:
|
||||
fields = self._formdef.workflow.variables_formdef.fields
|
||||
except AttributeError:
|
||||
fields = []
|
||||
data = self._formdef.workflow_options or {}
|
||||
for field in fields:
|
||||
# change field IDs as options are stored in data with their
|
||||
# varnames, not id.
|
||||
field.id = field.varname or field.id
|
||||
if hasattr(field, 'default_value') and data.get(field.varname) is None:
|
||||
data[field.varname] = field.default_value
|
||||
if isinstance(field.default_value, str):
|
||||
data[field.varname] = field.convert_value_from_str(field.default_value)
|
||||
else:
|
||||
data[field.varname] = field.default_value
|
||||
super().__init__(fields, data)
|
||||
|
||||
def inspect_keys(self):
|
||||
|
|
|
@ -82,5 +82,15 @@ class AnonymiseWorkflowStatusItem(WorkflowStatusItem):
|
|||
default_value=self.__class__.mode,
|
||||
)
|
||||
|
||||
def get_workflow_test_action(self, formdata, *args, **kwargs):
|
||||
original_perform = self.perform
|
||||
|
||||
def perform(formdata):
|
||||
original_perform(formdata)
|
||||
formdata.anonymisation_performed = True
|
||||
|
||||
setattr(self, 'perform', perform)
|
||||
return self
|
||||
|
||||
|
||||
register_item_class(AnonymiseWorkflowStatusItem)
|
||||
|
|
|
@ -86,5 +86,8 @@ class ModifyCriticalityWorkflowStatusItem(WorkflowStatusItem):
|
|||
elif self.mode == MODE_SET:
|
||||
formdata.set_criticality_level(int(self.absolute_value))
|
||||
|
||||
def get_workflow_test_action(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
|
||||
register_item_class(ModifyCriticalityWorkflowStatusItem)
|
||||
|
|
|
@ -169,5 +169,8 @@ class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
|
|||
location = '%sitems/%s/' % (base_location, self.id)
|
||||
yield location, None, self.message
|
||||
|
||||
def get_workflow_test_action(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
|
||||
register_item_class(DisplayMessageWorkflowStatusItem)
|
||||
|
|
|
@ -63,5 +63,16 @@ class RedirectToUrlWorkflowStatusItem(WorkflowStatusItem):
|
|||
return # don't redirect
|
||||
return url
|
||||
|
||||
def get_workflow_test_action(self, formdata, *args, **kwargs):
|
||||
original_perform = self.perform
|
||||
|
||||
def perform(formdata):
|
||||
url = original_perform(formdata)
|
||||
formdata.redirect_to_url = url
|
||||
return url
|
||||
|
||||
setattr(self, 'perform', perform)
|
||||
return self
|
||||
|
||||
|
||||
register_item_class(RedirectToUrlWorkflowStatusItem)
|
||||
|
|
|
@ -228,20 +228,34 @@ class RegisterCommenterWorkflowStatusItem(WorkflowStatusItem):
|
|||
|
||||
# the comment can use attachments done above
|
||||
if comment:
|
||||
try:
|
||||
formdata.evolution[-1].add_part(
|
||||
JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level)
|
||||
)
|
||||
part = self.get_journal_evolution_part(formdata, comment)
|
||||
if part:
|
||||
formdata.evolution[-1].add_part(part)
|
||||
formdata.store()
|
||||
except TemplateError as e:
|
||||
get_publisher().record_error(
|
||||
_('Error in template, comment could not be generated'), formdata=formdata, exception=e
|
||||
)
|
||||
|
||||
def get_journal_evolution_part(self, formdata, comment):
|
||||
try:
|
||||
return JournalEvolutionPart(formdata, get_publisher().translate(comment), self.to, self.level)
|
||||
except TemplateError as e:
|
||||
get_publisher().record_error(
|
||||
_('Error in template, comment could not be generated'), formdata=formdata, exception=e
|
||||
)
|
||||
|
||||
def i18n_scan(self, base_location):
|
||||
location = '%sitems/%s/' % (base_location, self.id)
|
||||
if not self.comment_template:
|
||||
yield location, None, self.comment
|
||||
|
||||
def get_workflow_test_action(self, formdata, *args, **kwargs):
|
||||
original_get_journal_evolution_part = self.get_journal_evolution_part
|
||||
|
||||
def get_journal_evolution_part(formdata, comment):
|
||||
part = original_get_journal_evolution_part(formdata, comment)
|
||||
formdata.history_messages.append(part.content)
|
||||
return part
|
||||
|
||||
setattr(self, 'get_journal_evolution_part', get_journal_evolution_part)
|
||||
return self
|
||||
|
||||
|
||||
register_item_class(RegisterCommenterWorkflowStatusItem)
|
||||
|
|
|
@ -28,11 +28,12 @@ from wcs.qommon.form import (
|
|||
RadiobuttonsWidget,
|
||||
SingleSelectWidget,
|
||||
StringWidget,
|
||||
TextWidget,
|
||||
WidgetList,
|
||||
)
|
||||
from wcs.qommon.humantime import humanduration2seconds, seconds2humanduration, timewords
|
||||
from wcs.qommon.xml_storage import XmlStorableObject
|
||||
from wcs.testdef import TestError, WebserviceResponse
|
||||
from wcs.testdef import TestError
|
||||
from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget
|
||||
from wcs.wf.profile import FieldNode
|
||||
|
||||
|
@ -90,9 +91,7 @@ class WorkflowTests(XmlStorableObject):
|
|||
formdata.workflow_test = True
|
||||
|
||||
formdata.frozen_receipt_time = formdata.receipt_time
|
||||
formdata.sent_sms = []
|
||||
formdata.sent_emails = []
|
||||
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
|
||||
self.reset_formdata_test_attributes(formdata)
|
||||
|
||||
formdata.perform_workflow()
|
||||
for action in self.actions:
|
||||
|
@ -102,9 +101,7 @@ class WorkflowTests(XmlStorableObject):
|
|||
continue
|
||||
|
||||
if not action.is_assertion:
|
||||
formdata.sent_sms.clear()
|
||||
formdata.sent_emails.clear()
|
||||
formdata.used_webservice_responses.clear()
|
||||
self.reset_formdata_test_attributes(formdata)
|
||||
|
||||
try:
|
||||
action.perform(formdata)
|
||||
|
@ -127,6 +124,14 @@ class WorkflowTests(XmlStorableObject):
|
|||
|
||||
formdata.store = lambda *args, **kwargs: None
|
||||
|
||||
def reset_formdata_test_attributes(self, formdata):
|
||||
formdata.sent_sms = []
|
||||
formdata.sent_emails = []
|
||||
formdata.used_webservice_responses = self.testdef.used_webservice_responses = []
|
||||
formdata.anonymisation_performed = False
|
||||
formdata.redirect_to_url = None
|
||||
formdata.history_messages = []
|
||||
|
||||
def get_new_action_id(self):
|
||||
if not self.actions:
|
||||
return '1'
|
||||
|
@ -145,7 +150,12 @@ class WorkflowTests(XmlStorableObject):
|
|||
'webservice_call': AssertWebserviceCall,
|
||||
'set-backoffice-fields': AssertBackofficeFieldValues,
|
||||
'button': ButtonClick,
|
||||
'global-action-button': ButtonClick,
|
||||
'timeout-jump': SkipTime,
|
||||
'anonymise': AssertAnonymise,
|
||||
'redirect_to_url': AssertRedirect,
|
||||
'register-comment': AssertHistoryMessage,
|
||||
'modify_criticality': AssertCriticality,
|
||||
}
|
||||
|
||||
previous_trace = None
|
||||
|
@ -196,6 +206,7 @@ class WorkflowTestAction(XmlStorableObject):
|
|||
|
||||
optional_fields = []
|
||||
is_assertion = True
|
||||
editable = True
|
||||
|
||||
XML_NODES = [
|
||||
('id', 'str'),
|
||||
|
@ -266,14 +277,26 @@ class ButtonClick(WorkflowTestAction):
|
|||
return _('Click on "%(button_name)s" by %(user)s') % {'button_name': self.button_name, 'user': user}
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
try:
|
||||
item = [
|
||||
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id']
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
if 'action_item_id' in trace.event_args:
|
||||
try:
|
||||
button_name = [
|
||||
x.label
|
||||
for x in self.get_all_choice_actions(formdef)
|
||||
if x.id == trace.event_args['action_item_id']
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
elif 'global_action_id' in trace.event_args:
|
||||
try:
|
||||
button_name = [
|
||||
x.name
|
||||
for x in self.get_all_global_actions(formdef)
|
||||
if x.id == trace.event_args['global_action_id']
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
|
||||
self.button_name = item.label
|
||||
self.button_name = button_name
|
||||
|
||||
def perform(self, formdata):
|
||||
if self.who == 'receiver':
|
||||
|
@ -306,8 +329,15 @@ class ButtonClick(WorkflowTestAction):
|
|||
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
|
||||
yield item
|
||||
|
||||
@staticmethod
|
||||
def get_all_global_actions(formdef):
|
||||
for action in formdef.workflow.global_actions or []:
|
||||
if not action.is_interactive():
|
||||
yield action
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
|
||||
possible_button_names.update(action.name for action in self.get_all_global_actions(formdef))
|
||||
|
||||
if not possible_button_names:
|
||||
return
|
||||
|
@ -631,20 +661,22 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
label = _('Assert webservice call')
|
||||
|
||||
key = 'assert-webservice-call'
|
||||
webservice_response_id = None
|
||||
webservice_response_uuid = None
|
||||
call_count = 1
|
||||
|
||||
optional_fields = ['call_count']
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('webservice_response_id', 'str'),
|
||||
('webservice_response_uuid', 'str'),
|
||||
('call_count', 'int'),
|
||||
]
|
||||
|
||||
@property
|
||||
def details_label(self):
|
||||
webservice_responses = [
|
||||
x for x in self.parent.testdef.get_webservice_responses() if x.id == self.webservice_response_id
|
||||
x
|
||||
for x in self.parent.testdef.get_webservice_responses()
|
||||
if x.uuid == self.webservice_response_uuid
|
||||
]
|
||||
if webservice_responses:
|
||||
return webservice_responses[0].name
|
||||
|
@ -664,13 +696,17 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
|
||||
def perform(self, formdata):
|
||||
try:
|
||||
response = WebserviceResponse.get(self.webservice_response_id)
|
||||
except KeyError:
|
||||
response = [
|
||||
x
|
||||
for x in self.parent.testdef.get_webservice_responses()
|
||||
if x.uuid == self.webservice_response_uuid
|
||||
][0]
|
||||
except IndexError:
|
||||
raise WorkflowTestError(_('Broken, missing webservice response'))
|
||||
|
||||
call_count = 0
|
||||
for used_response in formdata.used_webservice_responses.copy():
|
||||
if used_response.id == self.webservice_response_id:
|
||||
if used_response.uuid == self.webservice_response_uuid:
|
||||
formdata.used_webservice_responses.remove(used_response)
|
||||
call_count += 1
|
||||
|
||||
|
@ -682,7 +718,7 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
webservice_response_options = [
|
||||
(response.id, response.name, response.id)
|
||||
(response.uuid, response.name, response.uuid)
|
||||
for response in self.parent.testdef.get_webservice_responses()
|
||||
]
|
||||
|
||||
|
@ -691,11 +727,11 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
|
||||
form.add(
|
||||
SingleSelectWidget,
|
||||
'webservice_response_id',
|
||||
'webservice_response_uuid',
|
||||
title=_('Webservice response'),
|
||||
options=webservice_response_options,
|
||||
required=True,
|
||||
value=self.webservice_response_id,
|
||||
value=self.webservice_response_uuid,
|
||||
)
|
||||
form.add(IntWidget, 'call_count', title=_('Call count'), required=True, value=self.call_count)
|
||||
|
||||
|
@ -760,3 +796,163 @@ class AssertSMS(WorkflowTestAction):
|
|||
title=_('Body'),
|
||||
value=self.body,
|
||||
)
|
||||
|
||||
|
||||
class AssertAnonymise(WorkflowTestAction):
|
||||
label = _('Assert anonymisation is performed')
|
||||
|
||||
key = 'assert-anonymise'
|
||||
|
||||
editable = False
|
||||
details_label = ''
|
||||
|
||||
def perform(self, formdata):
|
||||
if not formdata.anonymisation_performed:
|
||||
raise WorkflowTestError(_('Form was not anonymised.'))
|
||||
|
||||
|
||||
class AssertRedirect(WorkflowTestAction):
|
||||
label = _('Assert redirect is performed')
|
||||
|
||||
key = 'assert-redirect'
|
||||
url = None
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('url', 'str'),
|
||||
]
|
||||
|
||||
@property
|
||||
def details_label(self):
|
||||
return self.url
|
||||
|
||||
def perform(self, formdata):
|
||||
if not formdata.redirect_to_url:
|
||||
raise WorkflowTestError(_('No redirection occured.'))
|
||||
|
||||
if formdata.redirect_to_url != self.url:
|
||||
raise WorkflowTestError(
|
||||
_('Expected redirection to %(expected_url)s but was redirected to %(url)s.')
|
||||
% {'expected_url': self.url, 'url': formdata.redirect_to_url}
|
||||
)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
form.add(
|
||||
StringWidget,
|
||||
'url',
|
||||
title=_('URL'),
|
||||
value=self.url,
|
||||
)
|
||||
|
||||
|
||||
class AssertHistoryMessage(WorkflowTestAction):
|
||||
label = _('Assert history message is displayed')
|
||||
details_label = ''
|
||||
|
||||
key = 'assert-history-message'
|
||||
message = None
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('message', 'str'),
|
||||
]
|
||||
|
||||
def perform(self, formdata):
|
||||
try:
|
||||
message = formdata.history_messages.pop(0)
|
||||
except IndexError:
|
||||
raise WorkflowTestError(_('No history message.'))
|
||||
|
||||
if self.message not in message:
|
||||
details = [
|
||||
_('Displayed history message: %s') % message,
|
||||
_('Expected history message: %s') % self.message,
|
||||
]
|
||||
raise WorkflowTestError(_('Wrong history message content.'), details=details)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
form.add(
|
||||
TextWidget,
|
||||
'message',
|
||||
title=_('Message'),
|
||||
value=self.message,
|
||||
hint=_('Assertion will pass if the text is contained in history message.'),
|
||||
)
|
||||
|
||||
|
||||
class AssertAlert(WorkflowTestAction):
|
||||
label = _('Assert alert is displayed')
|
||||
details_label = ''
|
||||
|
||||
key = 'assert-alert'
|
||||
message = None
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('message', 'str'),
|
||||
]
|
||||
|
||||
def perform(self, formdata):
|
||||
messages = formdata.get_workflow_messages()
|
||||
|
||||
for message in messages:
|
||||
if self.message in message:
|
||||
break
|
||||
else:
|
||||
details = [
|
||||
_('Displayed alerts: %s') % (', '.join(messages) if messages else _('None')),
|
||||
_('Expected alert: %s') % self.message,
|
||||
]
|
||||
raise WorkflowTestError(_('No alert matching message.'), details=details)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
form.add(
|
||||
TextWidget,
|
||||
'message',
|
||||
title=_('Message'),
|
||||
value=self.message,
|
||||
hint=_('Assertion will pass if the text is contained in alert message.'),
|
||||
)
|
||||
|
||||
|
||||
class AssertCriticality(WorkflowTestAction):
|
||||
label = _('Assert criticality level')
|
||||
empty_form_error = _('Workflow has no criticality levels.')
|
||||
|
||||
key = 'assert-criticality'
|
||||
level_id = None
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('level_id', 'str'),
|
||||
]
|
||||
|
||||
@property
|
||||
def details_label(self):
|
||||
levels = [
|
||||
x for x in self.parent.testdef.formdef.workflow.criticality_levels or [] if x.id == self.level_id
|
||||
]
|
||||
if not levels:
|
||||
return _('Broken, missing criticality level')
|
||||
|
||||
return _('Criticality is "%s"') % levels[0].name
|
||||
|
||||
def perform(self, formdata):
|
||||
levels = [x for x in formdata.formdef.workflow.criticality_levels or [] if x.id == self.level_id]
|
||||
if not levels:
|
||||
raise WorkflowTestError(_('Broken, missing criticality level'))
|
||||
|
||||
current_level = formdata.get_criticality_level_object()
|
||||
if current_level.id != self.level_id:
|
||||
raise WorkflowTestError(
|
||||
_('Form should have criticality level "%(expected_level)s" but has level "%(level)s".')
|
||||
% {'expected_level': levels[0].name, 'level': current_level.name}
|
||||
)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
if not formdef.workflow.criticality_levels:
|
||||
return
|
||||
|
||||
form.add(
|
||||
SingleSelectWidget,
|
||||
'level_id',
|
||||
title=_('Name'),
|
||||
value=self.level_id,
|
||||
options=[(x.id, x.name, x.id) for x in formdef.workflow.criticality_levels],
|
||||
)
|
||||
|
|
|
@ -720,6 +720,9 @@ class WorkflowVariablesFieldsFormDef(FormDef):
|
|||
base_url = get_publisher().get_backoffice_url()
|
||||
return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id)
|
||||
|
||||
def get_field_admin_url(self, field):
|
||||
return self.get_admin_url() + '%s/' % field.id
|
||||
|
||||
def get_new_field_id(self):
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
@ -2733,7 +2736,7 @@ class WorkflowStatus(SerieOfActionsMixin):
|
|||
if check_replay and form.get('_ts') != str(filled.last_update_time.timestamp()):
|
||||
raise ReplayException()
|
||||
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
|
||||
if 'button-action-%s' % action.id in get_request().form:
|
||||
if form.get_submit() == 'button-action-%s' % action.id:
|
||||
if action.is_interactive():
|
||||
return action.get_global_interactive_form_url(formdef=filled.formdef, ids=[filled.id])
|
||||
filled.record_workflow_event('global-action-button', global_action_id=action.id)
|
||||
|
|
Loading…
Reference in New Issue