Compare commits

...

32 Commits

Author SHA1 Message Date
Pierre Ducroquet 56fb3fb2fc sql: test purge of search tokens (#86527)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-14 16:34:33 +01:00
Pierre Ducroquet e153d74089 wcs_search_tokens: new FTS mechanism with fuzzy-match (#86527)
introduce a new mechanism to implement FTS with fuzzy-match.
This is made possible by adding and maintaining a table of the
FTS tokens, wcs_search_tokens, fed with searchable_formdefs
and wcs_all_forms.
When a query is issued, its tokens are matched against the
tokens with a fuzzy match when no direct match is found, and
the query is then rebuilt.
2024-03-14 16:34:33 +01:00
Pierre Ducroquet 09a8dccebf tests: add a test for new FTS on formdefs (#86527) 2024-03-14 16:34:33 +01:00
Valentin Deniaud ad2e64880e workflow_tests: fix new test action id computation (#88066)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-12 15:56:41 +01:00
Thomas NOËL 4ea852afe8 help: fix typo in include-anonymised example URL (#87766)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-12 14:17:43 +01:00
Benjamin Dauvergne 96bfaea4a7 misc: use only pathname to build image live preview URL (#88053)
gitea/wcs/pipeline/head This commit looks good Details
Change introduced by #33301 used the full URL which may also contain a
query string or a fragment identifier making the resulting URL invalid.
2024-03-12 12:24:55 +01:00
Frédéric Péters a130f1d862 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-12 09:34:38 +01:00
Frédéric Péters 1b976a36a7 workflows: consider custom id when looking for carddata (#88024)
gitea/wcs/pipeline/head Build queued... Details
2024-03-12 09:33:44 +01:00
Frédéric Péters 4ba3ebf6b4 blocks: don't fail on creating a prefill block with unknown card (#88027)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-12 07:07:21 +01:00
Frédéric Péters f6e228b438 tests: check item prefill with custom id (#88024) 2024-03-12 07:07:21 +01:00
Frédéric Péters 977cdb0019 admin: filter errors when cleaning up from subpage (#87946)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 17:05:34 +01:00
Frédéric Péters 99d1f4c21f misc: assert complex data context manager is used (#87925)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 12:14:21 +01:00
Frédéric Péters 89f17153bf wscalls: add missing context manager to get complex query string (#87925) 2024-03-08 12:14:21 +01:00
Frédéric Péters c72f9aed9d translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 11:14:42 +01:00
Valentin Deniaud 16e844a049 admin: allow creating workflow tests from formdata (#87545)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 11:07:42 +01:00
Frédéric Péters bf442ecf44 misc: do not enable quixote form token if there may be multiple pages (#87781)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 11:07:26 +01:00
Frédéric Péters 658aff404e backoffice: support existing criteria when filtering statistics (#87909) 2024-03-08 11:07:18 +01:00
Frédéric Péters 06d6487bb3 translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 10:21:42 +01:00
Frédéric Péters eade8f8dda api: never include non-data fields in json export (#87918)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-08 09:07:11 +01:00
Frédéric Péters c24bac7837 fields: make numeric fields required by default (like others) (#87846)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-07 19:27:35 +01:00
Frédéric Péters de292cc399 templatetags: add support for lazy variables to make_public_url (#87817) 2024-03-07 19:27:28 +01:00
Frédéric Péters 23e66ec078 misc: keep track of user that changed a workflow in snapshot (#87843)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-07 19:27:19 +01:00
Frédéric Péters 36e1f16a31 backoffice: make form preview scroll if too wide (#87858)
gitea/wcs/pipeline/head Build queued... Details
2024-03-07 19:27:06 +01:00
Serghei Mihai 1476d21ce1 static: add support for vertical display of items with images (#87241)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-07 18:08:01 +01:00
Frédéric Péters 59d8b91f8d translation update
gitea/wcs/pipeline/head This commit looks good Details
2024-03-06 19:16:38 +01:00
Frédéric Péters 8c26581924 wscalls: do not log/notify about app errors anymore (#87554)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-06 18:10:03 +01:00
Valentin Deniaud 3d42d456d9 workflow_tests: allow asserting webservice was not called (#87824)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-06 11:08:52 +01:00
Valentin Deniaud 0c0807a0d4 workflow_tests: fail webservice call assertion if response is missing (#87824) 2024-03-06 11:00:42 +01:00
Valentin Deniaud 58df7bf7bb workflow_tests: do not perform unconfigured action (#87824) 2024-03-06 10:48:53 +01:00
Frédéric Péters 728afe97b6 misc: quote filename in link in ODS export (#87789)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-06 09:22:37 +01:00
Frédéric Péters c03d216a70 misc: include attachments in grepped strings (#87783)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-05 16:32:27 +01:00
Thomas NOËL 00584bff12 help: add notes about include-anonymised parameter (#87766)
gitea/wcs/pipeline/head This commit looks good Details
2024-03-05 12:54:19 +01:00
45 changed files with 1154 additions and 113 deletions

View File

@ -361,6 +361,16 @@ nest pas nécessaire de préciser lidentifiant dun utilisateur.
<input>GET https://www.example.net/api/forms/inscriptions/10/?anonymise</input>
</screen>
<p>
Par ailleurs, lAPI « Liste de formulaires » accepte un paramètre
<code>include-anonymised</code> permettant dinclure (<code>on</code>) ou non
(<code>off</code>) les demandes anonymisées dans la liste :
</p>
<screen>
<input>GET https://www.example.net/api/forms/inscriptions/list?include-anonymised=on</input>
</screen>
</section>
<section id="global-data">

View File

@ -765,7 +765,7 @@ def test_form_workflow_link(pub):
def test_form_workflow_remapping(pub):
AfterJob.wipe()
create_superuser(pub)
user = create_superuser(pub)
create_role(pub)
FormDef.wipe()
@ -872,6 +872,8 @@ def test_form_workflow_remapping(pub):
resp = resp.follow() # -> to job processing page
resp = resp.click('Back')
assert resp.pyquery('[href="workflow"] .offset').text() == 'Workflow Three'
assert pub.snapshot_class.select_object_history(formdef)[0].comment == 'Workflow change'
assert pub.snapshot_class.select_object_history(formdef)[0].user_id == str(user.id)
# run a SQL SELECT and we known all columns are defined.
FormDef.get(formdef.id).data_class().select()

View File

@ -390,3 +390,66 @@ def test_logged_error_cleanup(pub):
'types$elementcarddef',
'types$elementothers',
]
def test_logged_error_cleanup_from_filtered_page(pub):
create_superuser(pub)
FormDef.wipe()
CardDef.wipe()
Workflow.wipe()
pub.loggederror_class.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
carddef = CardDef()
carddef.name = 'bar'
carddef.store()
workflow = Workflow()
workflow.name = 'blah'
workflow.store()
# FormDef error
error1 = pub.loggederror_class()
error1.summary = 'LoggedError'
error1.formdef_class = 'FormDef'
error1.formdef_id = formdef.id
error1.first_occurence_timestamp = error1.latest_occurence_timestamp = datetime.datetime.now()
error1.store()
# CardDef error
error2 = pub.loggederror_class()
error2.summary = 'LoggedError'
error2.formdef_class = 'CardDef'
error2.formdef_id = carddef.id
error2.first_occurence_timestamp = error2.latest_occurence_timestamp = datetime.datetime.now()
error2.store()
# workflow-only error
error3 = pub.loggederror_class()
error3.summary = 'LoggedError'
error3.workflow_id = workflow.id
error3.first_occurence_timestamp = error3.latest_occurence_timestamp = datetime.datetime.now()
error3.store()
app = login(get_app(pub))
resp = app.get(formdef.get_admin_url() + 'logged-errors/')
resp = resp.click('Cleanup')
resp.form['latest_occurence'] = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime(
'%Y-%m-%d'
)
resp = resp.form.submit('submit')
assert not pub.loggederror_class.has_key(error1.id)
assert pub.loggederror_class.has_key(error2.id)
assert pub.loggederror_class.has_key(error3.id)
resp = app.get(workflow.get_admin_url() + 'logged-errors/')
resp = resp.click('Cleanup')
resp.form['latest_occurence'] = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime(
'%Y-%m-%d'
)
resp = resp.form.submit('submit')
assert not pub.loggederror_class.has_key(error1.id)
assert pub.loggederror_class.has_key(error2.id)
assert not pub.loggederror_class.has_key(error3.id)

View File

@ -1,7 +1,9 @@
import datetime
import os
import pytest
from django.utils.html import escape
from django.utils.timezone import make_aware
from wcs import workflow_tests
from wcs.formdef import FormDef, fields
@ -582,3 +584,41 @@ def test_workflow_tests_run(pub):
assert 'Form status when error occured: New status' in resp.text
assert 'Email body: \nabc' in resp.text
assert resp.pyquery('li#test-action').text() == 'Test action: Assert email is sent'
def test_workfow_tests_creation_from_formdata(pub):
create_superuser(pub)
workflow = Workflow(name='Workflow One')
new_status = workflow.add_status(name='New status')
end_status = workflow.add_status(name='End status')
jump = new_status.add_action('jump')
jump.status = end_status.id
workflow.store()
formdef = FormDef()
formdef.workflow_id = workflow.id
formdef.name = 'test title'
formdef.store()
app = login(get_app(pub))
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = make_aware(datetime.datetime(2022, 1, 1, 0, 0))
formdata.store()
formdata.perform_workflow()
formdata.store()
resp = app.get('/backoffice/forms/%s/tests/new' % formdef.id)
resp.form['name'] = 'First test'
resp.form['creation_mode'] = 'formdata-wf'
resp.form['formdata'].select(text='1-1 - Unknown User - 2022-01-01 00:00')
resp = resp.form.submit().follow()
testdef = TestDef.select()[0]
assert len(testdef.workflow_tests.actions) == 1
assert testdef.workflow_tests.actions[0].key == 'assert-status'
assert testdef.workflow_tests.actions[0].status_name == 'End status'

View File

@ -163,6 +163,7 @@ def test_formdata(pub, local_user, user, auth):
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.TitleField(id='dsd', label='Title'),
fields.StringField(id='abc', label='Foo', varname='foo'),
fields.ItemField(id='xyz', label='Test', data_source={'type': 'foobar'}, varname='bar'),
]

View File

@ -429,6 +429,9 @@ def test_backoffice_submission_formdef_list_search(pub, local_user, access, auth
resp = get_url('/api/formdefs/?backoffice-submission=on&q=test')
assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=tes')
assert len(resp.json['data']) == 2
resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz')
assert len(resp.json['data']) == 0

View File

@ -148,6 +148,12 @@ def test_backoffice_statistics_status_select(pub):
assert 'Total number of records: 26' in resp.text
assert resp.pyquery('ul.resolution-times.status-wf-new li')[0].text == 'Count: 9'
resp.forms['listing-settings']['filter-%s-value' % field1.id].value = 'baz'
resp.forms['listing-settings']['filter-%s-operator' % field1.id].value = 'existing'
resp = resp.forms['listing-settings'].submit()
assert 'Total number of records: 50' in resp.text
assert resp.pyquery('ul.resolution-times.status-wf-new li')[0].text == 'Count: 17'
resp.forms['listing-settings']['filter-%s-value' % field1.id].value = 'foo'
resp.forms['listing-settings']['filter-%s-operator' % field1.id].value = 'eq'
resp = resp.forms['listing-settings'].submit()

View File

@ -6073,3 +6073,25 @@ def test_form_submit_no_csrf(pub):
# simulate call from remote/attacker site (magictokens prevents this)
resp = app.post(formdef.get_url(), params=form_data, status=302)
assert resp.location == formdef.get_url()
def test_form_submit_no_csrf_suddenly_single_page(pub):
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.PageField(id='1', label='page1'),
fields.ComputedField(id='2', label='computed', varname='plop', value_template='{{ "plop" }}'),
fields.PageField(
id='3', label='page2', condition={'type': 'django', 'value': 'form_var_plop != "plop"'}
),
]
formdef.confirmation = False
formdef.store()
formdef.data_class().wipe()
create_user(pub)
app = get_app(pub)
login(app, username='foo', password='foo')
resp = app.get(formdef.get_url())
resp = resp.form.submit('submit').follow()
assert formdef.data_class().select()[0].status == 'wf-new'

View File

@ -2652,6 +2652,92 @@ def test_block_prefill_full_block_email(pub):
}
def test_block_prefill_full_block_card_item(pub):
FormDef.wipe()
BlockDef.wipe()
CardDef.wipe()
create_user(pub)
carddef = CardDef()
carddef.name = 'Test'
carddef.fields = [
fields.StringField(id='0', label='blah', varname='blah'),
]
carddef.digest_templates = {'default': '{{ form_var_blah|upper }}'}
carddef.store()
carddef.data_class().wipe()
carddata1 = carddef.data_class()()
carddata1.data = {'0': 'bar'}
carddata1.just_created()
carddata1.store()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.ItemField(
id='123',
required=False,
hint='-----',
label='Test',
varname='plop',
data_source={'type': 'carddef:test'},
),
]
block.digest_template = '{{block_var_plop}}'
block.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = [
fields.PageField(id='0', label='1st page'),
fields.PageField(id='2', label='2nd page'),
fields.BlockField(
id='1',
label='test',
block_slug='foobar',
max_items=5,
prefill={
'type': 'string',
'value': '{% block_value plop="1" %}',
},
),
]
formdef.store()
formdef.data_class().wipe()
app = get_app(pub)
login(app, username='foo', password='foo')
resp = app.get(formdef.get_url())
resp = resp.form.submit('submit') # -> page 2
assert resp.form['f1$element0$f123'].value == '1'
resp = resp.form.submit('submit') # validation
resp = resp.form.submit('submit') # done
assert formdef.data_class().select()[0].data == {
'1': {
'data': [
{'123': '1', '123_display': 'BAR', '123_structured': {'blah': 'bar', 'id': 1, 'text': 'BAR'}}
],
'schema': {'123': 'item'},
},
'1_display': 'BAR',
}
# prefill with unknown value
pub.loggederror_class.wipe()
formdef.fields[2].prefill['value'] = '{% block_value plop="123" %}'
formdef.store()
formdef.data_class().wipe()
resp = app.get(formdef.get_url())
resp = resp.form.submit('submit') # -> page 2
assert not resp.form['f1$element0$f123'].value
assert pub.loggederror_class.count() == 1
assert (
pub.loggederror_class.select()[0].summary
== 'invalid value when creating block: unknown card value (\'123\')'
)
def test_block_titles_and_empty_block_on_summary_page(pub, emails):
FormDef.wipe()
BlockDef.wipe()

View File

@ -631,6 +631,107 @@ def test_form_page_item_with_variable_data_source_prefill(pub):
assert not resp.pyquery('#form_error_f2').text()
def test_form_page_item_with_card_with_custom_id_prefill(pub):
create_user(pub)
CardDef.wipe()
carddef = CardDef()
carddef.name = 'Test'
carddef.fields = [
fields.StringField(id='0', label='blah', varname='blah'),
]
carddef.digest_templates = {'default': '{{ form_var_blah|upper }}'}
carddef.id_template = '{{ form_var_blah }}'
carddef.store()
carddef.data_class().wipe()
carddata1 = carddef.data_class()()
carddata1.data = {'0': 'bar'}
carddata1.just_created()
carddata1.store()
carddata2 = carddef.data_class()()
carddata2.data = {'0': 'foo'}
carddata2.just_created()
carddata2.store()
formdef = create_formdef()
formdef.data_class().wipe()
formdef.fields = [
fields.ItemField(
id='2',
label='item',
varname='item',
required=False,
data_source={'type': 'carddef:test'},
prefill={'type': 'string', 'value': 'foo'},
),
]
formdef.store()
resp = get_app(pub).get('/test/')
assert [x.attrib['value'] for x in resp.pyquery('#form_f2 option')] == ['bar', 'foo']
assert resp.form['f2'].value == 'foo'
assert not resp.pyquery('#form_error_f2').text()
def test_form_page_block_with_item_with_card_with_custom_id_prefill(pub):
create_user(pub)
CardDef.wipe()
carddef = CardDef()
carddef.name = 'Test'
carddef.fields = [
fields.StringField(id='0', label='blah', varname='blah'),
]
carddef.digest_templates = {'default': 'card {{ form_var_blah }}'}
carddef.id_template = '{{ form_var_blah }}'
carddef.store()
carddef.data_class().wipe()
carddata1 = carddef.data_class()()
carddata1.data = {'0': 'bar'}
carddata1.just_created()
carddata1.store()
carddata2 = carddef.data_class()()
carddata2.data = {'0': 'foo'}
carddata2.just_created()
carddata2.store()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.ItemField(
id='123',
label='item',
varname='item',
required=False,
data_source={'type': 'carddef:test'},
),
]
block.store()
formdef = create_formdef()
formdef.data_class().wipe()
formdef.fields = [
fields.BlockField(
id='2',
label='test',
block_slug='foobar',
varname='foobar',
prefill={'type': 'string', 'value': '{% block_value item="foo" %}'},
),
]
formdef.store()
resp = get_app(pub).get('/test/')
assert [x.attrib['value'] for x in resp.pyquery('#form_f2__element0__f123 option')] == ['bar', 'foo']
assert resp.form['f2$element0$f123'].value == 'foo'
assert not resp.pyquery('.widget-with-error')
def test_form_page_item_with_computed_field_variable_data_source_prefill(pub):
create_user(pub)
formdef = create_formdef()

View File

@ -1183,6 +1183,45 @@ def test_sql_criteria_fts(pub):
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
def test_search_tokens_purge(pub):
_, cur = sql.get_connection_and_cursor()
# purge garbage from other tests
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
start = cur.fetchone()[0]
# define a new table
test_formdef = FormDef()
test_formdef.name = 'tableSelectFTStokens'
test_formdef.fields = [fields.StringField(id='3', label='string')]
test_formdef.store()
data_class = test_formdef.data_class(mode='sql')
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 1
t = data_class()
t.data = {'3': 'foofortokensofcourse'}
t.just_created()
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
t.data = {'3': 'chaussettefortokensofcourse'}
t.store()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 3
sql.purge_obsolete_search_tokens()
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
assert cur.fetchone()[0] == start + 2
def table_exists(cur, table_name):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables

View File

@ -36,6 +36,8 @@ def pub():
pub.substitutions.feed(pub)
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub._set_request(req)
req.session = pub.session_manager.session_class(id='1')
pub.site_options.set('options', 'working_day_calendar', '')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
@ -1636,10 +1638,25 @@ def test_json_dumps(pub):
)
def test_empty_make_public_url(pub):
def test_make_public_url(pub):
# empty value
context = {'value': None}
assert Template('{% make_public_url url=value %}').render(context) == ''
# lazy value
FormDef.wipe()
formdef = FormDef()
formdef.name = 'lazy'
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'0': 'https://example.net'}
formdata.store()
context = CompatibilityNamesDict({'form': LazyFormData(formdata)})
assert (
Template('{% make_public_url url=form_var_foo %}').render(context).startswith('/api/sign-url-token/')
)
def test_with_auth(pub):
context = {'service_url': 'https://www.example.net/api/whatever?x=y'}

View File

@ -1,3 +1,4 @@
import datetime
from unittest import mock
import pytest
@ -6,11 +7,12 @@ from wcs import workflow_tests
from wcs.formdef import FormDef, fields
from wcs.qommon.http_request import HTTPRequest
from wcs.testdef import TestDef, WebserviceResponse
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.workflow_tests import WorkflowTestError
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
from .utilities import create_temporary_pub
from .backoffice_pages.test_all import create_user
from .utilities import create_temporary_pub, get_app, login
@pytest.fixture
@ -19,6 +21,7 @@ def pub():
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.write_cfg()
pub.user_class.wipe()
@ -90,6 +93,51 @@ def test_workflow_tests_no_actions(pub):
mocked_run.assert_not_called()
def test_workflow_tests_action_not_configured(pub):
user = pub.user_class(name='test user')
user.store()
workflow = Workflow(name='Workflow One')
workflow.add_status(name='New status')
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.agent_id = user.id
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(),
]
with mock.patch('wcs.workflow_tests.ButtonClick.perform') as mocked_perform:
testdef.run(formdef)
mocked_perform.assert_not_called()
testdef.workflow_tests.actions = [
workflow_tests.ButtonClick(button_name='xxx'),
]
with mock.patch('wcs.workflow_tests.ButtonClick.perform') as mocked_perform:
testdef.run(formdef)
mocked_perform.assert_called_once()
def test_workflow_tests_new_action_id(pub):
wf_tests = workflow_tests.WorkflowTests()
for i in range(15):
wf_tests.add_action(workflow_tests.ButtonClick)
assert [x.id for x in wf_tests.actions] == [str(i) for i in range(1, 16)]
def test_workflow_tests_button_click(pub):
role = pub.role_class(name='test role')
role.store()
@ -544,6 +592,22 @@ def test_workflow_tests_webservice(pub):
testdef.run(formdef)
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=0),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).'
testdef.workflow_tests.actions = [
workflow_tests.AssertWebserviceCall(webservice_response_id='xxx', call_count=1),
]
with pytest.raises(WorkflowTestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Broken, missing webservice response'
def test_workflow_tests_webservice_status_jump(pub):
user = pub.user_class(name='test user')
@ -593,3 +657,98 @@ def test_workflow_tests_webservice_status_jump(pub):
workflow_tests.AssertStatus(status_name='End status'),
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
]
def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
role = pub.role_class(name='test role')
role.store()
user = create_user(pub, is_admin=True)
user.roles = [role.id]
user.store()
workflow = Workflow(name='Workflow One')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [fields.StringField(id='bo1', label='Text')]
new_status = workflow.add_status('New status', 'new-status')
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
status_with_button = workflow.add_status('Status with button', 'status-with-button')
transition_status = workflow.add_status('Transition status', 'transition-status')
end_status = workflow.add_status('End status', 'end-status')
jump = new_status.add_action('jump')
jump.status = status_with_timeout_jump.id
jump = status_with_timeout_jump.add_action('jump')
jump.status = status_with_button.id
jump.timeout = '{{ 1 }} day'
choice = status_with_button.add_action('choice')
choice.label = 'Accept'
choice.status = transition_status.id
choice.by = [role.id]
wscall = transition_status.add_action('webservice_call')
wscall.url = 'http://remote.example.net/json'
wscall.varname = 'test_webservice'
sendmail = transition_status.add_action('sendmail')
sendmail.to = ['test@example.org']
sendmail.subject = 'In new status'
sendmail.body = 'xxx'
set_backoffice_fields = transition_status.add_action('set-backoffice-fields')
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
jump = transition_status.add_action('jump')
jump.status = end_status.id
workflow.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.just_created()
formdata.store()
formdata.record_workflow_event('frontoffice-created')
formdata.perform_workflow()
formdata.store()
freezer.tick(datetime.timedelta(days=2))
_apply_timeouts(pub)
app = login(get_app(pub))
resp = app.get(formdata.get_url())
resp.form.submit('button1').follow()
formdata.refresh_from_storage()
assert formdata.status == 'wf-end-status'
testdef = TestDef.create_from_formdata(formdef, formdata, add_workflow_tests=True)
testdef.run(formdef)
actions = testdef.workflow_tests.actions
assert len(actions) == 8
assert actions[0].key == 'assert-status'
assert actions[0].status_name == 'Status with timeout jump'
assert actions[1].key == 'skip-time'
assert actions[1].seconds == 172800
assert actions[2].key == 'assert-status'
assert actions[2].status_name == 'Status with button'
assert actions[3].key == 'button-click'
assert actions[3].button_name == 'Accept'
assert actions[4].key == 'assert-webservice-call'
assert actions[5].key == 'assert-email'
assert actions[6].key == 'assert-backoffice-field'
assert actions[-1].key == 'assert-status'
assert actions[-1].status_name == 'End status'

View File

@ -241,12 +241,12 @@ def test_webservice_on_error(http_requests, emails, notify_on_errors, record_on_
'500': '500 Internal Server Error',
'json-err0': None,
'json-err0int': None,
'json-err1': 'err: 1',
'json-err1int': 'err: 1',
'json-err1-with-desc': 'err: 1, err_desc: :(',
'json-errstr': 'err: bug',
'json-errheader1': 'err: 1',
'json-errheaderstr': 'err: bug',
'json-err1': None,
'json-err1int': None,
'json-err1-with-desc': None,
'json-errstr': None,
'json-errheader1': None,
'json-errheaderstr': None,
}
wscall.request = {'url': 'http://remote.example.net/%s' % url_part}
wscall.store()

View File

@ -1119,6 +1119,117 @@ def test_edit_carddata_targeting_itself(pub):
assert carddata.status == 'wf-%s' % st2.id
def test_edit_carddata_auto_targeting_custom_id(pub):
CardDef.wipe()
carddef = CardDef()
carddef.name = 'Foo Card'
carddef.fields = [
StringField(id='0', label='foo', varname='foo'),
StringField(id='1', label='slug', varname='slug'),
]
carddef.id_template = 'card_{{form_var_slug}}'
carddef.store()
carddef.data_class().wipe()
carddata = carddef.data_class()()
carddata.data = {'0': 'foo', '1': 'foo'}
carddata.store()
carddata.just_created()
carddata.store()
assert carddata.identifier == 'card_foo'
carddef2 = CardDef()
carddef2.name = 'Bar Card'
carddef2.fields = [
ItemField(id='1', label='card', varname='card', data_source={'type': 'carddef:%s' % carddef.url_name})
]
carddef2.store()
card_wf = Workflow(name='Card workflow')
st1 = card_wf.add_status('Status1')
st2 = card_wf.add_status('Status2')
edit = st1.add_action('edit_carddata', id='_edit')
edit.formdef_slug = carddef.url_name
edit.target_mode = 'all'
edit.mappings = [Mapping(field_id='0', expression='bar')]
jump = st1.add_action('jump', '_jump')
jump.status = st2.id
card_wf.store()
carddef2.workflow = card_wf
carddef2.store()
carddata2 = carddef2.data_class()()
carddata2.data = {
'1': 'card_foo',
}
carddata2.store()
carddata2.just_created()
carddata2.store()
carddata2.perform_workflow()
carddata.refresh_from_storage()
assert carddata.data['0'] == 'bar'
def test_edit_carddata_manual_targeting_custom_id(pub):
CardDef.wipe()
carddef = CardDef()
carddef.name = 'Foo Card'
carddef.fields = [
StringField(id='0', label='foo', varname='foo'),
StringField(id='1', label='slug', varname='slug'),
]
carddef.id_template = 'card_{{form_var_slug}}'
carddef.store()
carddef.data_class().wipe()
carddata = carddef.data_class()()
carddata.data = {'0': 'foo', '1': 'foo'}
carddata.store()
carddata.just_created()
carddata.store()
assert carddata.identifier == 'card_foo'
carddef2 = CardDef()
carddef2.name = 'Bar Card'
carddef2.fields = []
carddef2.store()
card_wf = Workflow(name='Card workflow')
st1 = card_wf.add_status('Status1')
st2 = card_wf.add_status('Status2')
edit = st1.add_action('edit_carddata', id='_edit')
edit.formdef_slug = carddef.url_name
edit.target_mode = 'manual'
edit.target_id = 'card_foo'
edit.mappings = [Mapping(field_id='0', expression='bar')]
jump = st1.add_action('jump', '_jump')
jump.status = st2.id
card_wf.store()
carddef2.workflow = card_wf
carddef2.store()
carddata2 = carddef2.data_class()()
carddata2.data = {}
carddata2.store()
carddata2.just_created()
carddata2.store()
carddata2.perform_workflow()
carddata.refresh_from_storage()
assert carddata.data['0'] == 'bar'
def test_edit_carddata_from_created_object(pub):
FormDef.wipe()
CardDef.wipe()

View File

@ -7,7 +7,8 @@ import pytest
import responses
from quixote import cleanup, get_publisher
from wcs.fields import BoolField, FileField, ItemField, ItemsField, StringField
from wcs.blocks import BlockDef
from wcs.fields import BlockField, BoolField, FileField, ItemField, ItemsField, StringField
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.wf.wscall import JournalWsCallErrorPart, WebserviceCallStatusItem
@ -1065,6 +1066,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
),
}
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [StringField(id='1', label='String', varname='string')]
block.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
@ -1075,6 +1082,7 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
StringField(id='4', label='4th field', varname='empty_str'),
StringField(id='5', label='5th field', varname='none'),
BoolField(id='6', label='6th field', varname='bool'),
BlockField(id='7', label='7th field', varname='block', block_slug=block.slug, max_items=3),
]
formdef.workflow_id = wf.id
formdef.store()
@ -1091,6 +1099,17 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
formdata.data['4'] = 'empty_str'
formdata.data['5'] = None
formdata.data['6'] = False
formdata.data['7'] = {
'data': [
{
'1': 'plop',
},
{
'1': 'poulpe',
},
],
'schema': {},
}
formdata.just_created()
formdata.store()
@ -1115,11 +1134,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
'none': '{{ form_var_none }}',
'bool': '{{ form_var_bool_raw }}',
'time': '{{ "13:12"|time }}',
'block_template': '{% for b in form_var_block %}{{ b.string }}{% endfor %}',
}
pub.substitutions.feed(formdata)
with get_publisher().complex_data():
item.perform(formdata)
item.perform(formdata)
assert sorted(urllib.parse.parse_qsl(urllib.parse.urlparse(http_requests.get_last('url')).query)) == [
('block_template', 'ploppoulpe'),
('bool', 'False'),
('decimal', '1E+3'),
('decimal2', '1000.1'),

View File

@ -1140,7 +1140,12 @@ class FormDefPage(Directory, TempfileDirectoryMixin):
# there are existing formdata, status will have to be mapped
return redirect('workflow-status-remapping?new=%s' % workflow_id)
job = WorkflowChangeJob(formdef=self.formdef, new_workflow_id=workflow_id, status_mapping={})
job = WorkflowChangeJob(
formdef=self.formdef,
new_workflow_id=workflow_id,
status_mapping={},
user_id=get_session().user,
)
job.store()
get_response().add_after_job(job)
return redirect(job.get_processing_url())
@ -1230,7 +1235,10 @@ class FormDefPage(Directory, TempfileDirectoryMixin):
return self.workflow_status_remapping()
job = WorkflowChangeJob(
formdef=self.formdef, new_workflow_id=new_workflow.id, status_mapping=status_mapping
formdef=self.formdef,
new_workflow_id=new_workflow.id,
status_mapping=status_mapping,
user_id=get_session().user,
)
job.store()
get_response().add_after_job(job)
@ -2042,19 +2050,20 @@ class FormsDirectory(AccessControlled, Directory):
class WorkflowChangeJob(AfterJob):
def __init__(self, formdef, new_workflow_id, status_mapping):
def __init__(self, formdef, new_workflow_id, status_mapping, user_id):
super().__init__(
label=_('Updating data for new workflow'),
formdef_class=formdef.__class__,
formdef_id=formdef.id,
new_workflow_id=new_workflow_id,
status_mapping=status_mapping,
user_id=user_id,
)
def execute(self):
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
workflow = Workflow.get(self.kwargs['new_workflow_id'])
formdef.change_workflow(workflow, self.kwargs['status_mapping'])
formdef.change_workflow(workflow, self.kwargs['status_mapping'], user_id=self.kwargs.get('user_id'))
def done_action_url(self):
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])

View File

@ -253,14 +253,15 @@ class LoggedErrorsDirectory(AccessControlled, Directory):
options.append(('carddef', _('Card Models'), 'carddef'))
if backoffice_root.is_accessible('workflows'):
options.append(('others', _('Others'), 'others'))
form.add(
CheckboxesWidget,
'types',
title=_('Error types'),
value=[x[0] for x in options], # check all by default
options=options,
required=True,
)
if not (self.formdef_id or self.workflow_id):
form.add(
CheckboxesWidget,
'types',
title=_('Error types'),
value=[x[0] for x in options], # check all by default
options=options,
required=True,
)
form.add(
DateWidget,
'latest_occurence',
@ -275,17 +276,22 @@ class LoggedErrorsDirectory(AccessControlled, Directory):
return redirect('.')
if form.get_submit() == 'submit' and not form.has_errors():
type_criterias = []
if 'formdef' in form.get_widget('types').parse():
type_criterias.append(Equal('formdef_class', 'FormDef'))
if 'carddef' in form.get_widget('types').parse():
type_criterias.append(Equal('formdef_class', 'CardDef'))
if 'others' in form.get_widget('types').parse():
type_criterias.append(Null('formdef_class'))
criterias = [
Less('latest_occurence_timestamp', form.get_widget('latest_occurence').parse()),
Or(type_criterias),
]
criterias = []
if self.formdef_id and self.formdef_class:
criterias.append(Equal('formdef_id', self.formdef_id))
criterias.append(Equal('formdef_class', self.formdef_class.__name__))
elif self.workflow_id:
criterias.append(Equal('workflow_id', self.workflow_id))
else:
if 'formdef' in form.get_widget('types').parse():
criterias.append(Equal('formdef_class', 'FormDef'))
if 'carddef' in form.get_widget('types').parse():
criterias.append(Equal('formdef_class', 'CardDef'))
if 'others' in form.get_widget('types').parse():
criterias.append(Null('formdef_class'))
criterias = [Or(criterias)]
criterias.append(Less('latest_occurence_timestamp', form.get_widget('latest_occurence').parse()))
get_publisher().loggederror_class.wipe(clause=criterias)
return redirect('.')

View File

@ -344,13 +344,18 @@ class TestsDirectory(Directory):
]
if formdata_options:
creation_options = [
('empty', _('Fill data manually'), 'empty'),
('formdata', _('Import data from form'), 'formdata'),
]
if get_publisher().has_site_option('enable-workflow-tests'):
creation_options.append(
('formdata-wf', _('Import data from form (and initialise workflow tests)'), 'formdata-wf')
)
form.add(
RadiobuttonsWidget,
'creation_mode',
options=[
('empty', _('Fill data manually'), 'empty'),
('formdata', _('Import data from form'), 'formdata'),
],
options=creation_options,
value='empty',
attrs={'data-dynamic-display-parent': 'true'},
)
@ -362,7 +367,7 @@ class TestsDirectory(Directory):
hint=_('Form is only used for initial data alimentation, no link is kept with created test.'),
attrs={
'data-dynamic-display-child-of': 'creation_mode',
'data-dynamic-display-value-in': 'formdata',
'data-dynamic-display-value-in': 'formdata|formdata-wf',
},
**{'data-autocomplete': 'true'},
)
@ -392,7 +397,11 @@ class TestsDirectory(Directory):
formdata_id = form.get_widget('formdata').parse()
formdata = self.objectdef.data_class().get(formdata_id)
testdef = TestDef.create_from_formdata(self.objectdef, formdata)
testdef = TestDef.create_from_formdata(
self.objectdef,
formdata,
add_workflow_tests=bool(creation_mode_widget.parse() == 'formdata-wf'),
)
testdef.name = form.get_widget('name').parse()
testdef.agent_id = get_session().user
testdef.store()

View File

@ -51,6 +51,17 @@ class NamedWsCallUI:
)
form.add(WsCallRequestWidget, 'request', value=self.wscall.request, title=_('Request'), required=True)
form.widgets.append(
HtmlWidget(
'<div class="infonotice"><p>%s</p></div>'
% _(
'This tab is about connection, payload, and HTTP errors. '
'Application errors ("err" property in response different than zero) '
'are always silent.'
),
tab=('error', _('Error Handling')),
)
)
form.add(
CheckboxWidget,
'notify_on_errors',
@ -63,6 +74,7 @@ class NamedWsCallUI:
'record_on_errors',
title=_('Record on errors'),
value=self.wscall.record_on_errors if self.wscall.slug else True,
default_value=True,
tab=('error', _('Error Handling')),
)
if not self.wscall.is_readonly():

View File

@ -19,7 +19,7 @@ from quixote import get_publisher, get_request, get_session
from wcs.formdata import FormData
from .qommon import _
from .sql_criterias import Equal, Null, StrictNotEqual
from .sql_criterias import Equal
class CardData(FormData):
@ -39,20 +39,6 @@ class CardData(FormData):
formdef = property(get_formdef)
@classmethod
def get_by_id(cls, value):
try:
return cls.select(
[
StrictNotEqual('status', 'draft'),
Null('anonymised'),
cls._formdef.get_by_id_criteria(value),
],
limit=1,
)[0]
except IndexError:
raise KeyError(value)
def get_data_source_structured_item(
self, digest_key='default', group_by=None, with_related_urls=False, with_files_urls=False
):

View File

@ -28,6 +28,7 @@ from wcs.qommon.ods import NS as OD_NS
from wcs.qommon.ods import clean_text as od_clean_text
from .base import SetValueError, WidgetField
from .item import UnknownCardValueError
class MissingBlockFieldError(Exception):
@ -67,7 +68,11 @@ class BlockRowValue:
sub_field.set_value(row_data, sub_value)
return row_data
row_data = make_row_data(self.attributes)
try:
row_data = make_row_data(self.attributes)
except UnknownCardValueError as e:
get_publisher().record_error(_('invalid value when creating block: %s') % str(e), exception=e)
return None
current_block_value = data.get(field.id)
if not self.check_current_value(current_block_value):

View File

@ -35,7 +35,6 @@ class NumericField(WidgetField):
use_live_server_validation = True
widget_class = NumericWidget
required = False
validation = None
restrict_to_integers = True

View File

@ -32,7 +32,7 @@ from quixote.errors import RequestError
from quixote.html import htmltext
from quixote.http_request import Upload
from wcs.sql_criterias import And, Contains, Equal, Intersects
from wcs.sql_criterias import And, Contains, Equal, Intersects, Null, StrictNotEqual
from .qommon import _, misc
from .qommon.evalutils import make_datetime
@ -365,7 +365,10 @@ class FormData(StorableObject):
def get_by_id(cls, value):
if cls._formdef.id_template:
try:
return cls.select([Equal('id_display', str(value))], limit=1)[0]
return cls.select(
[StrictNotEqual('status', 'draft'), Null('anonymised'), Equal('id_display', str(value))],
limit=1,
)[0]
except IndexError:
raise KeyError(value)
return cls.get(value)
@ -1488,6 +1491,8 @@ class FormData(StorableObject):
for field in fields:
if anonymise and field.anonymise == 'final':
continue
if field.is_no_data_field:
continue
if not field.varname and not include_unnamed_fields:
continue
if field.varname in seen:
@ -1930,7 +1935,7 @@ class FormData(StorableObject):
if object_type:
# workflow action
try:
yield objectdef.data_class().get(target_id)
yield objectdef.data_class().get_by_id(target_id)
except KeyError:
# linked object may be missing
pass
@ -1950,7 +1955,7 @@ class FormData(StorableObject):
_('%s - not found') % origin,
)
else:
yield (_objectdef.data_class().get(target_id), origin)
yield (_objectdef.data_class().get_by_id(target_id), origin)
except ValueError:
pass
except KeyError:

View File

@ -1967,7 +1967,7 @@ class FormDef(StorableObject):
# chunk contains the fields.
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
def change_workflow(self, new_workflow, status_mapping=None):
def change_workflow(self, new_workflow, status_mapping=None, user_id=None):
old_workflow = self.get_workflow()
formdata_count = self.data_class().count()
@ -2003,7 +2003,7 @@ class FormDef(StorableObject):
if function_key not in new_workflow.roles:
del self.workflow_roles[function_key]
removed_functions.add(function_key)
self.store(comment=_('Workflow change'))
self.store(comment=_('Workflow change'), snapshot_store_user=user_id)
if formdata_count:
# instruct formdef to update its security rules
self.data_class().rebuild_security()

View File

@ -37,7 +37,7 @@ from quixote.util import randbytes
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.fields import MissingBlockFieldError, SetValueError
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
from wcs.formdata import Evolution, FormData
from wcs.formdef import FormDef
from wcs.forms.common import FormStatusPage, FormTemplateMixin, TempfileDirectoryMixin
@ -1006,9 +1006,12 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
def create_form(self, *args, **kwargs):
form = self.formdef.create_form(*args, **kwargs)
if len(self.pages) == 1 and not self.formdef.confirmation:
# if there's a form with a single page, no confirmation, add native quixote
# CSRF protection.
if (
len([x for x in self.formdef.fields if isinstance(x, PageField)]) < 2
and not self.formdef.confirmation
):
# if there's a form with a single page (at all, not as the result of conditions),
# and no confirmation page, add native quixote CSRF protection.
form.add(FormTokenWidget, form.TOKEN_NAME)
form.attrs['data-live-url'] = self.formdef.get_url(language=get_publisher().current_language) + 'live'
form.attrs['data-live-validation-url'] = (

View File

@ -4,8 +4,8 @@ msgid ""
msgstr ""
"Project-Id-Version: wcs 0\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-03-04 17:49+0100\n"
"PO-Revision-Date: 2024-03-04 17:49+0100\n"
"POT-Creation-Date: 2024-03-12 09:34+0100\n"
"PO-Revision-Date: 2024-03-12 09:34+0100\n"
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
"Language-Team: french\n"
"Language: fr\n"
@ -2568,6 +2568,12 @@ msgstr "Saisir les données manuellement"
msgid "Import data from form"
msgstr "Importer les données depuis une demande"
#: admin/tests.py
msgid "Import data from form (and initialise workflow tests)"
msgstr ""
"Importer les données depuis une demande (et initialiser les tests de "
"workflow)"
#: admin/tests.py
msgid ""
"Form is only used for initial data alimentation, no link is kept with "
@ -3483,6 +3489,15 @@ msgstr "Attention, ce changement est risqué"
msgid "Request"
msgstr "Requête"
#: admin/wscalls.py
msgid ""
"This tab is about connection, payload, and HTTP errors. Application errors "
"(\"err\" property in response different than zero) are always silent."
msgstr ""
"Cet onglet concerne les erreurs de connexion et de message à transmettre, "
"ainsi qque les erreurs HTTP. Les erreurs applicatives (quand lattribut "
 err » dans les réponses est différent de zéro) sont toujours silencieuses."
#: admin/wscalls.py wf/wscall.py
msgid "Error Handling"
msgstr "Gestion des erreurs"
@ -5433,6 +5448,11 @@ msgstr "La valeur doit contenir un ou plusieurs noms valides."
msgid "Missing block field: %s"
msgstr "Bloc de champ manquant : %s"
#: fields/block.py
#, python-format
msgid "invalid value when creating block: %s"
msgstr "valeur invalide pour la création du bloc : %s"
#: fields/block.py
#, python-format
msgid "Field Block (%s)"
@ -11343,7 +11363,7 @@ msgstr "Valeur actuelle"
#: wf/export_to_model.py
msgid "Template to obtain model file"
msgstr "Gabarit pour obtenir le ficher servant de modèle"
msgstr "Gabarit pour obtenir le fichier servant de modèle"
#: wf/export_to_model.py
msgid "Convert generated file to PDF"
@ -11884,7 +11904,7 @@ msgid ""
"additional POST data in an additional \"extra\" key. It is often not "
"necessary."
msgstr ""
"Attention : cette option envoie lintégralité des donneés de la demande ou "
"Attention : cette option envoie lintégralité des données de la demande ou "
"fiche concernée, avec les données POST supplémentaires dans une clé "
 extra ». Cette option ne devrait généralement pas être utilisée."

View File

@ -455,6 +455,7 @@ class WcsPublisher(QommonPublisher):
for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur)
sql.init_search_tokens()
cur.close()
def record_deprecated_usage(self, *args, **kwargs):

View File

@ -565,6 +565,7 @@ class HtmlWidget:
self.attrs = {}
self.string = string
self.title = title
self.tab = kwargs.pop('tab', None)
def render(self):
return self.render_content()

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import re
import urllib.parse
import xml.etree.ElementTree as ET
import zipfile
@ -252,7 +253,7 @@ class WorkCell:
url = '%sfiles/%s/%s' % (
self.formdata.get_url(backoffice=True),
self.data_field.id,
self.native_value,
urllib.parse.quote(self.native_value),
)
a = ET.SubElement(p, '{%s}a' % NS['text'])
a.attrib['{%s}href' % NS['xlink']] = url

View File

@ -690,6 +690,11 @@ class QommonPublisher(Publisher):
for error in self.loggederror_class.select(clause=clauses):
self.loggederror_class.remove_object(error.id)
def clean_search_tokens(self, **kwargs):
from wcs import sql
sql.purge_obsolete_search_tokens()
@classmethod
def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
@ -702,6 +707,9 @@ class QommonPublisher(Publisher):
cls.register_cronjob(
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
)
cls.register_cronjob(
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens')
)
_initialized = False

View File

@ -677,6 +677,11 @@ div.form-preview > br {
clear: both; /* stop grid */
}
div.form-preview {
overflow-y: auto;
clear: both;
}
div.widget.no-bottom-margin {
margin-bottom: 0.2rem;
}

View File

@ -11,6 +11,24 @@
outline: 1px dashed var(--primary-color, #bbb);
}
}
&.pk-vertical-items {
.content {
flex-direction: column;
}
.item-with-image {
padding: 0;
grid-template-areas: 'input picture label';
justify-items: flex-start;
align-items: center;
grid-template-columns: auto var(--image-size) 1fr;
grid-column-gap: 0.7em;
grid-template-rows: auto;
flex: 0 0 auto;
&--input {
margin-right: 0;
}
}
}
}
.item-with-image {

View File

@ -461,7 +461,7 @@ $.WcsFileUpload = {
image_preview: function(base_widget, img_token) {
var file_button = base_widget.find('.file-button');
if(file_button.hasClass("file-image")) {
file_button[0].style.setProperty('--image-preview-url', `url(${window.location.href}tempfile?t=${img_token}&thumbnail=1)`);
file_button[0].style.setProperty('--image-preview-url', `url(${window.location.pathname}tempfile?t=${img_token}&thumbnail=1)`);
}
}
}

View File

@ -1331,6 +1331,7 @@ def json_dumps(value):
@register.simple_tag(takes_context=True)
def make_public_url(context, url=None):
url = unlazy(url)
if not url:
return ''
token = get_session().create_token('sign-url-token', {'url': url})

View File

@ -150,13 +150,21 @@ class Snapshot:
]
@classmethod
def snap(cls, instance, comment=None, label=None, store_user=True, application=None):
def snap(cls, instance, comment=None, label=None, store_user=None, application=None):
obj = cls()
obj.object_type = instance.xml_root_node
obj.object_id = instance.id
obj.timestamp = now()
if get_session() and store_user:
obj.user_id = get_session().user
# store_user:
# None/True: get user from active session
# False: do not store user
# any value: consider it as user id
# (store_user is explicitely checked to be a boolean, to avoid the "1" integer being treated as True)
if store_user is None or (isinstance(store_user, bool) and store_user is True):
if get_session():
obj.user_id = get_session().user
elif store_user:
obj.user_id = store_user
tree = instance.export_to_xml(include_id=True)
# remove position for categories

View File

@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = {
}
def _table_exists(cur, table_name):
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s;', (table_name,))
rows = cur.fetchall()
return len(rows) > 0
def _trigger_exists(cur, table_name, trigger_name):
cur.execute(
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s;', (table_name, trigger_name)
)
rows = cur.fetchall()
return len(rows) > 0
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -1579,6 +1593,8 @@ def do_global_views(conn, cur):
% (name, category.id)
)
init_search_tokens_triggers(cur)
def clean_global_views(conn, cur):
# Purge of any dead data
@ -1671,11 +1687,154 @@ def init_global_table(conn=None, cur=None):
endpoint_status=endpoint_status_filter,
)
)
init_search_tokens_data(cur)
if own_conn:
cur.close()
def init_search_tokens(conn=None, cur=None):
own_cur = False
if cur is None:
own_cur = True
conn, cur = get_connection_and_cursor()
# Create table
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
# Create triggers
init_search_tokens_triggers(cur)
# Fill table
init_search_tokens_data(cur)
# Index at the end, small performance trick... not that useful, but it's free...
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
cur.execute(
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
)
# And last: functions to use this brand new table
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
cur.execute(
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
RETURNS tsquery
LANGUAGE sql
STABLE
AS $function$
with
tokenized as (select unnest(regexp_split_to_array($1, '\s+')) w),
super_tokenized as (
select w,
coalesce((select plainto_tsquery(perfect.token) from wcs_search_tokens perfect where perfect.token = plainto_tsquery(w)::text),
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
plainto_tsquery(w)) tokens
from tokenized
left join wcs_search_tokens partial on partial.token % w and w not similar to '%[0-9]{2,}%'
group by w)
select tsquery_agg_and(tokens) from super_tokenized;
$function$;"""
)
if own_cur:
cur.close()
def init_search_tokens_triggers(cur):
# We define only appending triggers, ie on INSERT and UPDATE.
# It would be far heavier to maintain deletions here, and having extra data has
# no or marginal side effect on search performances, and absolutely no impact
# on search results.
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
# personal data is kept uselessly.
# First part: the appending function
cur.execute(
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
RETURN NEW;
END;
$function$;"""
)
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort trigger creation if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
):
# Second part: insert and update triggers for wcs_all_forms
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
AFTER INSERT ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
AFTER UPDATE OF fts ON wcs_all_forms
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
):
# Third part: insert and update triggers for searchable_formdefs
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
AFTER INSERT ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
cur.execute(
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
AFTER UPDATE OF fts ON searchable_formdefs
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
)
def init_search_tokens_data(cur):
if not (_table_exists(cur, 'wcs_search_tokens')):
# abort table data initialization if tokens table doesn't exist yet
return
if _table_exists(cur, 'wcs_all_forms'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
ON CONFLICT(token) DO NOTHING;"""
)
if _table_exists(cur, 'searchable_formdefs'):
cur.execute(
"""INSERT INTO wcs_search_tokens
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
ON CONFLICT(token) DO NOTHING;"""
)
def purge_obsolete_search_tokens(cur=None):
own_cur = False
if cur is None:
own_cur = True
_, cur = get_connection_and_cursor()
cur.execute(
"""DELETE FROM wcs_search_tokens
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
)
if own_cur:
cur.close()
class SqlMixin:
_table_name = None
_numerical_id = True
@ -2355,13 +2514,20 @@ class SqlDataMixin(SqlMixin):
if period_end:
criterias.append(Less('f.receipt_time', period_end))
if extra_criterias:
for criteria in extra_criterias:
def alter_criteria(criteria):
# change attributes to point to the formdata table (f)
if hasattr(criteria, 'attribute'):
criteria.attribute = f'f.{criteria.attribute}'
elif hasattr(criteria, 'criteria'): # Not()
alter_criteria(criteria.criteria)
elif hasattr(criteria, 'criterias'): # Or()
for c in criteria.criterias:
alter_criteria(c)
for criteria in extra_criterias:
altered_criteria = copy.deepcopy(criteria)
if isinstance(criteria, Not):
altered_criteria.criteria.attribute = f'f.{criteria.criteria.attribute}'
else:
altered_criteria.attribute = f'f.{criteria.attribute}'
alter_criteria(altered_criteria)
criterias.append(altered_criteria)
where_clauses, params, dummy = parse_clause(criterias)
@ -4789,7 +4955,6 @@ class SearchableFormDef(SqlMixin):
% (cls._table_name, cls._table_name)
)
cls.do_indexes(cur)
cur.close()
from wcs.carddef import CardDef
from wcs.formdef import FormDef
@ -4798,6 +4963,8 @@ class SearchableFormDef(SqlMixin):
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
):
cls.update(obj=objectdef)
init_search_tokens(cur)
cur.close()
@classmethod
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
@ -4835,7 +5002,7 @@ class SearchableFormDef(SqlMixin):
def search(cls, obj_type, string):
_, cur = get_connection_and_cursor()
cur.execute(
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)',
'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
(FtsMatch.get_fts_value(string),),
)
ids = [x[0] for x in cur.fetchall()]
@ -5100,7 +5267,7 @@ def get_period_total(
# latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number)
SQL_LEVEL = (105, 'change test result json structure')
SQL_LEVEL = (106, 'improved fts method')
def migrate_global_views(conn, cur):
@ -5433,6 +5600,10 @@ def migrate():
for formdef in FormDef.select() + CardDef.select():
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
if sql_level < 106:
# 106: new fts mechanism with tokens table
init_search_tokens()
if sql_level != SQL_LEVEL[0]:
cur.execute(
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',

View File

@ -373,6 +373,11 @@ class FtsMatch(Criteria):
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
class WcsFtsMatch(FtsMatch):
def as_sql(self):
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
class ElementEqual(Criteria):
def __init__(self, attribute, key, value, **kwargs):
super().__init__(attribute, value)

View File

@ -185,7 +185,7 @@ class TestDef(sql.TestDef):
)
@classmethod
def create_from_formdata(cls, formdef, formdata):
def create_from_formdata(cls, formdef, formdata, add_workflow_tests=False):
testdef = cls()
testdef.object_type = formdef.get_table_name()
testdef.object_id = formdef.id
@ -213,6 +213,10 @@ class TestDef(sql.TestDef):
'fields': field_data,
'user': formdata.user.get_json_export_dict() if formdata.user else None,
}
if add_workflow_tests:
testdef.workflow_tests.add_actions_from_formdata(formdata)
return testdef
def build_formdata(self, objectdef, include_fields=False):

View File

@ -267,7 +267,7 @@ class ExternalWorkflowGlobalAction(WorkflowStatusItem):
return
try:
yield objectdef.data_class().get(target_id)
yield objectdef.data_class().get_by_id(target_id)
except KeyError as e:
# use custom error message depending on target type
get_publisher().record_error(

View File

@ -184,6 +184,7 @@ class RegisterCommenterWorkflowStatusItem(WorkflowStatusItem):
yield from super().get_computed_strings()
if not self.comment_template:
yield self.comment
yield from (self.attachments or [])
def migrate(self):
changed = super().migrate()

View File

@ -238,6 +238,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
yield self.body
if self.to:
yield from self.to
yield from (self.attachments or [])
def perform(self, formdata, ignore_i18n=False):
if not self.to:

View File

@ -77,6 +77,9 @@ class WorkflowTests(XmlStorableObject):
for action in self.actions:
status = formdata.get_status()
if not action.is_configured:
continue
if not action.is_assertion:
formdata.sent_emails.clear()
formdata.used_webservice_responses.clear()
@ -106,10 +109,43 @@ class WorkflowTests(XmlStorableObject):
if not self.actions:
return '1'
return str(int(max(x.id for x in self.actions)) + 1)
return str(max(int(x.id) for x in self.actions) + 1)
def add_action(self, action_class):
self.actions.append(action_class(id=self.get_new_action_id()))
action = action_class(id=self.get_new_action_id())
self.actions.append(action)
return action
def add_actions_from_formdata(self, formdata):
test_action_class_by_trace_id = {
'sendmail': AssertEmail,
'webservice_call': AssertWebserviceCall,
'set-backoffice-fields': AssertBackofficeFieldValues,
'button': ButtonClick,
'timeout-jump': SkipTime,
}
previous_trace = None
workflow_traces = formdata.get_workflow_traces()
for trace in workflow_traces:
trace_id = trace.event or trace.action_item_key
if trace_id not in test_action_class_by_trace_id:
previous_trace = trace
continue
if trace.event:
action = self.add_action(AssertStatus)
action.set_attributes_from_trace(formdata.formdef, trace)
action = self.add_action(test_action_class_by_trace_id[trace_id])
action.set_attributes_from_trace(formdata.formdef, trace, previous_trace)
previous_trace = trace
if workflow_traces:
action = self.add_action(AssertStatus)
action.set_attributes_from_trace(formdata.formdef, workflow_traces[-1])
def store(self, *args, **kwargs):
super().store(*args, **kwargs)
@ -162,10 +198,20 @@ class WorkflowTestAction(XmlStorableObject):
def __str__(self):
return str(self.label)
@property
def is_configured(self):
return not any(
field
for field, _ in self.XML_NODES
if field != 'id' and field not in self.optional_fields and not getattr(self, field)
)
def set_attributes_from_trace(self, *args, **kwargs):
pass
def render_as_line(self):
for field, dummy in self.XML_NODES:
if field not in self.optional_fields and not getattr(self, field):
return _('not configured')
if not self.is_configured:
return _('not configured')
return self.details_label
@ -187,6 +233,16 @@ class ButtonClick(WorkflowTestAction):
def details_label(self):
return _('Click on "%s"') % self.button_name
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
try:
item = [
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id']
][0]
except IndexError:
return
self.button_name = item.label
def perform(self, formdata, user):
status = formdata.get_status()
form = status.get_action_form(formdata, user)
@ -198,11 +254,14 @@ class ButtonClick(WorkflowTestAction):
form.get_submit = lambda: button_widget.name
status.handle_form(form, formdata, user, check_replay=False)
def fill_admin_form(self, form, formdef):
possible_button_names = set()
@staticmethod
def get_all_choice_actions(formdef):
for item in formdef.workflow.get_all_items():
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
possible_button_names.add(item.label)
yield item
def fill_admin_form(self, form, formdef):
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
if not possible_button_names:
return
@ -238,6 +297,14 @@ class AssertStatus(WorkflowTestAction):
def details_label(self):
return _('Status is "%s"') % self.status_name
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
try:
status = formdef.workflow.get_status(trace.status_id)
except KeyError:
return
self.status_name = status.name
def perform(self, formdata, user):
status = formdata.get_status()
if status.name != self.status_name:
@ -365,6 +432,10 @@ class SkipTime(WorkflowTestAction):
def details_label(self):
return seconds2humanduration(self.seconds)
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
if previous_trace:
self.seconds = (trace.timestamp - previous_trace.timestamp).total_seconds()
def rewind(self, formdata):
def rewind_time(timestamp):
return timestamp - datetime.timedelta(seconds=self.seconds)
@ -492,6 +563,8 @@ class AssertWebserviceCall(WorkflowTestAction):
webservice_response_id = None
call_count = 1
optional_fields = ['call_count']
XML_NODES = WorkflowTestAction.XML_NODES + [
('webservice_response_id', 'str'),
('call_count', 'int'),
@ -519,14 +592,18 @@ class AssertWebserviceCall(WorkflowTestAction):
return r
def perform(self, formdata, user):
try:
response = WebserviceResponse.get(self.webservice_response_id)
except KeyError:
raise WorkflowTestError(_('Broken, missing webservice response'))
call_count = 0
for response in formdata.used_webservice_responses.copy():
if response.id == self.webservice_response_id:
formdata.used_webservice_responses.remove(response)
for used_response in formdata.used_webservice_responses.copy():
if used_response.id == self.webservice_response_id:
formdata.used_webservice_responses.remove(used_response)
call_count += 1
if call_count != self.call_count:
response = WebserviceResponse.get(self.webservice_response_id)
raise WorkflowTestError(
_('Webservice response %(name)s was used %(count)s times (instead of %(expected_count)s).')
% {'name': response.name, 'count': call_count, 'expected_count': self.call_count}

View File

@ -3314,6 +3314,10 @@ class WorkflowStatusItem(XmlSerialisable):
if expression['type'] == 'template':
old_allow_complex_value = vars.get('allow_complex')
vars['allow_complex'] = allow_complex
# make sure complex data context manager is used
assert (
not vars['allow_complex'] or get_publisher().complex_data_cache is not None
), 'missing complex_data context manager'
try:
return Template(expression['value'], raises=raises, autoescape=False).render(vars)
except TemplateError as e:

View File

@ -114,18 +114,19 @@ def call_webservice(
if qs_data: # merge qs_data into url
qs = list(urllib.parse.parse_qsl(parsed.query))
for key, value in qs_data.items():
try:
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
except Exception as e:
get_publisher().record_error(exception=e, notify=True)
else:
if value:
value = get_publisher().get_cached_complex_data(value)
if isinstance(value, (tuple, list, set)):
qs.extend((key, x) for x in value)
with get_publisher().complex_data():
try:
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
except Exception as e:
get_publisher().record_error(exception=e, notify=True)
else:
value = str(value) if value is not None else ''
qs.append((key, value))
if value:
value = get_publisher().get_cached_complex_data(value)
if isinstance(value, (tuple, list, set)):
qs.extend((key, x) for x in value)
else:
value = str(value) if value is not None else ''
qs.append((key, value))
qs = urllib.parse.urlencode(qs)
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
@ -205,8 +206,8 @@ def call_webservice(
)
return (None, None, None)
app_error_code = get_app_error_code(response, data, 'json')
if (app_error_code != 0 or status >= 400) and (notify_on_errors or record_on_errors):
if status >= 400 and (notify_on_errors or record_on_errors):
app_error_code = get_app_error_code(response, data, 'json')
record_wscall_error(status, data, response, app_error_code, notify_on_errors, record_on_errors)
return (response, status, data)