Compare commits
32 Commits
627f5a6f12
...
56fb3fb2fc
Author | SHA1 | Date |
---|---|---|
Pierre Ducroquet | 56fb3fb2fc | |
Pierre Ducroquet | e153d74089 | |
Pierre Ducroquet | 09a8dccebf | |
Valentin Deniaud | ad2e64880e | |
Thomas NOËL | 4ea852afe8 | |
Benjamin Dauvergne | 96bfaea4a7 | |
Frédéric Péters | a130f1d862 | |
Frédéric Péters | 1b976a36a7 | |
Frédéric Péters | 4ba3ebf6b4 | |
Frédéric Péters | f6e228b438 | |
Frédéric Péters | 977cdb0019 | |
Frédéric Péters | 99d1f4c21f | |
Frédéric Péters | 89f17153bf | |
Frédéric Péters | c72f9aed9d | |
Valentin Deniaud | 16e844a049 | |
Frédéric Péters | bf442ecf44 | |
Frédéric Péters | 658aff404e | |
Frédéric Péters | 06d6487bb3 | |
Frédéric Péters | eade8f8dda | |
Frédéric Péters | c24bac7837 | |
Frédéric Péters | de292cc399 | |
Frédéric Péters | 23e66ec078 | |
Frédéric Péters | 36e1f16a31 | |
Serghei Mihai | 1476d21ce1 | |
Frédéric Péters | 59d8b91f8d | |
Frédéric Péters | 8c26581924 | |
Valentin Deniaud | 3d42d456d9 | |
Valentin Deniaud | 0c0807a0d4 | |
Valentin Deniaud | 58df7bf7bb | |
Frédéric Péters | 728afe97b6 | |
Frédéric Péters | c03d216a70 | |
Thomas NOËL | 00584bff12 |
|
@ -361,6 +361,16 @@ n’est pas nécessaire de préciser l’identifiant d’un utilisateur.
|
|||
<input>GET https://www.example.net/api/forms/inscriptions/10/?anonymise</input>
|
||||
</screen>
|
||||
|
||||
<p>
|
||||
Par ailleurs, l’API « Liste de formulaires » accepte un paramètre
|
||||
<code>include-anonymised</code> permettant d’inclure (<code>on</code>) ou non
|
||||
(<code>off</code>) les demandes anonymisées dans la liste :
|
||||
</p>
|
||||
|
||||
<screen>
|
||||
<input>GET https://www.example.net/api/forms/inscriptions/list?include-anonymised=on</input>
|
||||
</screen>
|
||||
|
||||
</section>
|
||||
|
||||
<section id="global-data">
|
||||
|
|
|
@ -765,7 +765,7 @@ def test_form_workflow_link(pub):
|
|||
|
||||
def test_form_workflow_remapping(pub):
|
||||
AfterJob.wipe()
|
||||
create_superuser(pub)
|
||||
user = create_superuser(pub)
|
||||
create_role(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
|
@ -872,6 +872,8 @@ def test_form_workflow_remapping(pub):
|
|||
resp = resp.follow() # -> to job processing page
|
||||
resp = resp.click('Back')
|
||||
assert resp.pyquery('[href="workflow"] .offset').text() == 'Workflow Three'
|
||||
assert pub.snapshot_class.select_object_history(formdef)[0].comment == 'Workflow change'
|
||||
assert pub.snapshot_class.select_object_history(formdef)[0].user_id == str(user.id)
|
||||
|
||||
# run a SQL SELECT and we known all columns are defined.
|
||||
FormDef.get(formdef.id).data_class().select()
|
||||
|
|
|
@ -390,3 +390,66 @@ def test_logged_error_cleanup(pub):
|
|||
'types$elementcarddef',
|
||||
'types$elementothers',
|
||||
]
|
||||
|
||||
|
||||
def test_logged_error_cleanup_from_filtered_page(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
Workflow.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foo'
|
||||
formdef.store()
|
||||
carddef = CardDef()
|
||||
carddef.name = 'bar'
|
||||
carddef.store()
|
||||
workflow = Workflow()
|
||||
workflow.name = 'blah'
|
||||
workflow.store()
|
||||
|
||||
# FormDef error
|
||||
error1 = pub.loggederror_class()
|
||||
error1.summary = 'LoggedError'
|
||||
error1.formdef_class = 'FormDef'
|
||||
error1.formdef_id = formdef.id
|
||||
error1.first_occurence_timestamp = error1.latest_occurence_timestamp = datetime.datetime.now()
|
||||
error1.store()
|
||||
|
||||
# CardDef error
|
||||
error2 = pub.loggederror_class()
|
||||
error2.summary = 'LoggedError'
|
||||
error2.formdef_class = 'CardDef'
|
||||
error2.formdef_id = carddef.id
|
||||
error2.first_occurence_timestamp = error2.latest_occurence_timestamp = datetime.datetime.now()
|
||||
error2.store()
|
||||
|
||||
# workflow-only error
|
||||
error3 = pub.loggederror_class()
|
||||
error3.summary = 'LoggedError'
|
||||
error3.workflow_id = workflow.id
|
||||
error3.first_occurence_timestamp = error3.latest_occurence_timestamp = datetime.datetime.now()
|
||||
error3.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(formdef.get_admin_url() + 'logged-errors/')
|
||||
resp = resp.click('Cleanup')
|
||||
resp.form['latest_occurence'] = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime(
|
||||
'%Y-%m-%d'
|
||||
)
|
||||
resp = resp.form.submit('submit')
|
||||
assert not pub.loggederror_class.has_key(error1.id)
|
||||
assert pub.loggederror_class.has_key(error2.id)
|
||||
assert pub.loggederror_class.has_key(error3.id)
|
||||
|
||||
resp = app.get(workflow.get_admin_url() + 'logged-errors/')
|
||||
resp = resp.click('Cleanup')
|
||||
resp.form['latest_occurence'] = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime(
|
||||
'%Y-%m-%d'
|
||||
)
|
||||
resp = resp.form.submit('submit')
|
||||
assert not pub.loggederror_class.has_key(error1.id)
|
||||
assert pub.loggederror_class.has_key(error2.id)
|
||||
assert not pub.loggederror_class.has_key(error3.id)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import datetime
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from django.utils.html import escape
|
||||
from django.utils.timezone import make_aware
|
||||
|
||||
from wcs import workflow_tests
|
||||
from wcs.formdef import FormDef, fields
|
||||
|
@ -582,3 +584,41 @@ def test_workflow_tests_run(pub):
|
|||
assert 'Form status when error occured: New status' in resp.text
|
||||
assert 'Email body: \nabc' in resp.text
|
||||
assert resp.pyquery('li#test-action').text() == 'Test action: Assert email is sent'
|
||||
|
||||
|
||||
def test_workfow_tests_creation_from_formdata(pub):
|
||||
create_superuser(pub)
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
end_status = workflow.add_status(name='End status')
|
||||
|
||||
jump = new_status.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.receipt_time = make_aware(datetime.datetime(2022, 1, 1, 0, 0))
|
||||
formdata.store()
|
||||
formdata.perform_workflow()
|
||||
formdata.store()
|
||||
|
||||
resp = app.get('/backoffice/forms/%s/tests/new' % formdef.id)
|
||||
resp.form['name'] = 'First test'
|
||||
resp.form['creation_mode'] = 'formdata-wf'
|
||||
resp.form['formdata'].select(text='1-1 - Unknown User - 2022-01-01 00:00')
|
||||
resp = resp.form.submit().follow()
|
||||
|
||||
testdef = TestDef.select()[0]
|
||||
assert len(testdef.workflow_tests.actions) == 1
|
||||
assert testdef.workflow_tests.actions[0].key == 'assert-status'
|
||||
assert testdef.workflow_tests.actions[0].status_name == 'End status'
|
||||
|
|
|
@ -163,6 +163,7 @@ def test_formdata(pub, local_user, user, auth):
|
|||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [
|
||||
fields.TitleField(id='dsd', label='Title'),
|
||||
fields.StringField(id='abc', label='Foo', varname='foo'),
|
||||
fields.ItemField(id='xyz', label='Test', data_source={'type': 'foobar'}, varname='bar'),
|
||||
]
|
||||
|
|
|
@ -429,6 +429,9 @@ def test_backoffice_submission_formdef_list_search(pub, local_user, access, auth
|
|||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=test')
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=tes')
|
||||
assert len(resp.json['data']) == 2
|
||||
|
||||
resp = get_url('/api/formdefs/?backoffice-submission=on&q=xyz')
|
||||
assert len(resp.json['data']) == 0
|
||||
|
||||
|
|
|
@ -148,6 +148,12 @@ def test_backoffice_statistics_status_select(pub):
|
|||
assert 'Total number of records: 26' in resp.text
|
||||
assert resp.pyquery('ul.resolution-times.status-wf-new li')[0].text == 'Count: 9'
|
||||
|
||||
resp.forms['listing-settings']['filter-%s-value' % field1.id].value = 'baz'
|
||||
resp.forms['listing-settings']['filter-%s-operator' % field1.id].value = 'existing'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert 'Total number of records: 50' in resp.text
|
||||
assert resp.pyquery('ul.resolution-times.status-wf-new li')[0].text == 'Count: 17'
|
||||
|
||||
resp.forms['listing-settings']['filter-%s-value' % field1.id].value = 'foo'
|
||||
resp.forms['listing-settings']['filter-%s-operator' % field1.id].value = 'eq'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
|
|
|
@ -6073,3 +6073,25 @@ def test_form_submit_no_csrf(pub):
|
|||
# simulate call from remote/attacker site (magictokens prevents this)
|
||||
resp = app.post(formdef.get_url(), params=form_data, status=302)
|
||||
assert resp.location == formdef.get_url()
|
||||
|
||||
|
||||
def test_form_submit_no_csrf_suddenly_single_page(pub):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
formdef.fields = [
|
||||
fields.PageField(id='1', label='page1'),
|
||||
fields.ComputedField(id='2', label='computed', varname='plop', value_template='{{ "plop" }}'),
|
||||
fields.PageField(
|
||||
id='3', label='page2', condition={'type': 'django', 'value': 'form_var_plop != "plop"'}
|
||||
),
|
||||
]
|
||||
formdef.confirmation = False
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
create_user(pub)
|
||||
app = get_app(pub)
|
||||
login(app, username='foo', password='foo')
|
||||
resp = app.get(formdef.get_url())
|
||||
resp = resp.form.submit('submit').follow()
|
||||
assert formdef.data_class().select()[0].status == 'wf-new'
|
||||
|
|
|
@ -2652,6 +2652,92 @@ def test_block_prefill_full_block_email(pub):
|
|||
}
|
||||
|
||||
|
||||
def test_block_prefill_full_block_card_item(pub):
|
||||
FormDef.wipe()
|
||||
BlockDef.wipe()
|
||||
CardDef.wipe()
|
||||
create_user(pub)
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'Test'
|
||||
carddef.fields = [
|
||||
fields.StringField(id='0', label='blah', varname='blah'),
|
||||
]
|
||||
carddef.digest_templates = {'default': '{{ form_var_blah|upper }}'}
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata1 = carddef.data_class()()
|
||||
carddata1.data = {'0': 'bar'}
|
||||
carddata1.just_created()
|
||||
carddata1.store()
|
||||
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [
|
||||
fields.ItemField(
|
||||
id='123',
|
||||
required=False,
|
||||
hint='-----',
|
||||
label='Test',
|
||||
varname='plop',
|
||||
data_source={'type': 'carddef:test'},
|
||||
),
|
||||
]
|
||||
block.digest_template = '{{block_var_plop}}'
|
||||
block.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'form title'
|
||||
formdef.fields = [
|
||||
fields.PageField(id='0', label='1st page'),
|
||||
fields.PageField(id='2', label='2nd page'),
|
||||
fields.BlockField(
|
||||
id='1',
|
||||
label='test',
|
||||
block_slug='foobar',
|
||||
max_items=5,
|
||||
prefill={
|
||||
'type': 'string',
|
||||
'value': '{% block_value plop="1" %}',
|
||||
},
|
||||
),
|
||||
]
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
app = get_app(pub)
|
||||
login(app, username='foo', password='foo')
|
||||
resp = app.get(formdef.get_url())
|
||||
resp = resp.form.submit('submit') # -> page 2
|
||||
assert resp.form['f1$element0$f123'].value == '1'
|
||||
resp = resp.form.submit('submit') # validation
|
||||
resp = resp.form.submit('submit') # done
|
||||
assert formdef.data_class().select()[0].data == {
|
||||
'1': {
|
||||
'data': [
|
||||
{'123': '1', '123_display': 'BAR', '123_structured': {'blah': 'bar', 'id': 1, 'text': 'BAR'}}
|
||||
],
|
||||
'schema': {'123': 'item'},
|
||||
},
|
||||
'1_display': 'BAR',
|
||||
}
|
||||
|
||||
# prefill with unknown value
|
||||
pub.loggederror_class.wipe()
|
||||
formdef.fields[2].prefill['value'] = '{% block_value plop="123" %}'
|
||||
formdef.store()
|
||||
formdef.data_class().wipe()
|
||||
resp = app.get(formdef.get_url())
|
||||
resp = resp.form.submit('submit') # -> page 2
|
||||
assert not resp.form['f1$element0$f123'].value
|
||||
assert pub.loggederror_class.count() == 1
|
||||
assert (
|
||||
pub.loggederror_class.select()[0].summary
|
||||
== 'invalid value when creating block: unknown card value (\'123\')'
|
||||
)
|
||||
|
||||
|
||||
def test_block_titles_and_empty_block_on_summary_page(pub, emails):
|
||||
FormDef.wipe()
|
||||
BlockDef.wipe()
|
||||
|
|
|
@ -631,6 +631,107 @@ def test_form_page_item_with_variable_data_source_prefill(pub):
|
|||
assert not resp.pyquery('#form_error_f2').text()
|
||||
|
||||
|
||||
def test_form_page_item_with_card_with_custom_id_prefill(pub):
|
||||
create_user(pub)
|
||||
CardDef.wipe()
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'Test'
|
||||
carddef.fields = [
|
||||
fields.StringField(id='0', label='blah', varname='blah'),
|
||||
]
|
||||
carddef.digest_templates = {'default': '{{ form_var_blah|upper }}'}
|
||||
carddef.id_template = '{{ form_var_blah }}'
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata1 = carddef.data_class()()
|
||||
carddata1.data = {'0': 'bar'}
|
||||
carddata1.just_created()
|
||||
carddata1.store()
|
||||
|
||||
carddata2 = carddef.data_class()()
|
||||
carddata2.data = {'0': 'foo'}
|
||||
carddata2.just_created()
|
||||
carddata2.store()
|
||||
|
||||
formdef = create_formdef()
|
||||
formdef.data_class().wipe()
|
||||
formdef.fields = [
|
||||
fields.ItemField(
|
||||
id='2',
|
||||
label='item',
|
||||
varname='item',
|
||||
required=False,
|
||||
data_source={'type': 'carddef:test'},
|
||||
prefill={'type': 'string', 'value': 'foo'},
|
||||
),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert [x.attrib['value'] for x in resp.pyquery('#form_f2 option')] == ['bar', 'foo']
|
||||
assert resp.form['f2'].value == 'foo'
|
||||
assert not resp.pyquery('#form_error_f2').text()
|
||||
|
||||
|
||||
def test_form_page_block_with_item_with_card_with_custom_id_prefill(pub):
|
||||
create_user(pub)
|
||||
CardDef.wipe()
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'Test'
|
||||
carddef.fields = [
|
||||
fields.StringField(id='0', label='blah', varname='blah'),
|
||||
]
|
||||
carddef.digest_templates = {'default': 'card {{ form_var_blah }}'}
|
||||
carddef.id_template = '{{ form_var_blah }}'
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata1 = carddef.data_class()()
|
||||
carddata1.data = {'0': 'bar'}
|
||||
carddata1.just_created()
|
||||
carddata1.store()
|
||||
|
||||
carddata2 = carddef.data_class()()
|
||||
carddata2.data = {'0': 'foo'}
|
||||
carddata2.just_created()
|
||||
carddata2.store()
|
||||
|
||||
BlockDef.wipe()
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [
|
||||
fields.ItemField(
|
||||
id='123',
|
||||
label='item',
|
||||
varname='item',
|
||||
required=False,
|
||||
data_source={'type': 'carddef:test'},
|
||||
),
|
||||
]
|
||||
block.store()
|
||||
|
||||
formdef = create_formdef()
|
||||
formdef.data_class().wipe()
|
||||
formdef.fields = [
|
||||
fields.BlockField(
|
||||
id='2',
|
||||
label='test',
|
||||
block_slug='foobar',
|
||||
varname='foobar',
|
||||
prefill={'type': 'string', 'value': '{% block_value item="foo" %}'},
|
||||
),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert [x.attrib['value'] for x in resp.pyquery('#form_f2__element0__f123 option')] == ['bar', 'foo']
|
||||
assert resp.form['f2$element0$f123'].value == 'foo'
|
||||
assert not resp.pyquery('.widget-with-error')
|
||||
|
||||
|
||||
def test_form_page_item_with_computed_field_variable_data_source_prefill(pub):
|
||||
create_user(pub)
|
||||
formdef = create_formdef()
|
||||
|
|
|
@ -1183,6 +1183,45 @@ def test_sql_criteria_fts(pub):
|
|||
assert data_class.select([st.FtsMatch(formdata1.id_display)])[0].id_display == formdata1.id_display
|
||||
|
||||
|
||||
def test_search_tokens_purge(pub):
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
|
||||
# purge garbage from other tests
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
start = cur.fetchone()[0]
|
||||
|
||||
# define a new table
|
||||
test_formdef = FormDef()
|
||||
test_formdef.name = 'tableSelectFTStokens'
|
||||
test_formdef.fields = [fields.StringField(id='3', label='string')]
|
||||
test_formdef.store()
|
||||
data_class = test_formdef.data_class(mode='sql')
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 1
|
||||
|
||||
t = data_class()
|
||||
t.data = {'3': 'foofortokensofcourse'}
|
||||
t.just_created()
|
||||
t.store()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 2
|
||||
|
||||
t.data = {'3': 'chaussettefortokensofcourse'}
|
||||
t.store()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 3
|
||||
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
cur.execute('SELECT count(*) FROM wcs_search_tokens;')
|
||||
assert cur.fetchone()[0] == start + 2
|
||||
|
||||
|
||||
def table_exists(cur, table_name):
|
||||
cur.execute(
|
||||
'''SELECT COUNT(*) FROM information_schema.tables
|
||||
|
|
|
@ -36,6 +36,8 @@ def pub():
|
|||
pub.substitutions.feed(pub)
|
||||
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
||||
pub.set_app_dir(req)
|
||||
pub._set_request(req)
|
||||
req.session = pub.session_manager.session_class(id='1')
|
||||
pub.site_options.set('options', 'working_day_calendar', '')
|
||||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||||
pub.site_options.write(fd)
|
||||
|
@ -1636,10 +1638,25 @@ def test_json_dumps(pub):
|
|||
)
|
||||
|
||||
|
||||
def test_empty_make_public_url(pub):
|
||||
def test_make_public_url(pub):
|
||||
# empty value
|
||||
context = {'value': None}
|
||||
assert Template('{% make_public_url url=value %}').render(context) == ''
|
||||
|
||||
# lazy value
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'lazy'
|
||||
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
|
||||
formdef.store()
|
||||
formdata = formdef.data_class()()
|
||||
formdata.data = {'0': 'https://example.net'}
|
||||
formdata.store()
|
||||
context = CompatibilityNamesDict({'form': LazyFormData(formdata)})
|
||||
assert (
|
||||
Template('{% make_public_url url=form_var_foo %}').render(context).startswith('/api/sign-url-token/')
|
||||
)
|
||||
|
||||
|
||||
def test_with_auth(pub):
|
||||
context = {'service_url': 'https://www.example.net/api/whatever?x=y'}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
@ -6,11 +7,12 @@ from wcs import workflow_tests
|
|||
from wcs.formdef import FormDef, fields
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.testdef import TestDef, WebserviceResponse
|
||||
from wcs.wf.jump import JumpWorkflowStatusItem
|
||||
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
||||
from wcs.workflow_tests import WorkflowTestError
|
||||
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
|
||||
|
||||
from .utilities import create_temporary_pub
|
||||
from .backoffice_pages.test_all import create_user
|
||||
from .utilities import create_temporary_pub, get_app, login
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -19,6 +21,7 @@ def pub():
|
|||
|
||||
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
||||
pub.set_app_dir(req)
|
||||
pub.cfg['identification'] = {'methods': ['password']}
|
||||
pub.write_cfg()
|
||||
|
||||
pub.user_class.wipe()
|
||||
|
@ -90,6 +93,51 @@ def test_workflow_tests_no_actions(pub):
|
|||
mocked_run.assert_not_called()
|
||||
|
||||
|
||||
def test_workflow_tests_action_not_configured(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
workflow.add_status(name='New status')
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.ButtonClick(),
|
||||
]
|
||||
|
||||
with mock.patch('wcs.workflow_tests.ButtonClick.perform') as mocked_perform:
|
||||
testdef.run(formdef)
|
||||
mocked_perform.assert_not_called()
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.ButtonClick(button_name='xxx'),
|
||||
]
|
||||
|
||||
with mock.patch('wcs.workflow_tests.ButtonClick.perform') as mocked_perform:
|
||||
testdef.run(formdef)
|
||||
mocked_perform.assert_called_once()
|
||||
|
||||
|
||||
def test_workflow_tests_new_action_id(pub):
|
||||
wf_tests = workflow_tests.WorkflowTests()
|
||||
|
||||
for i in range(15):
|
||||
wf_tests.add_action(workflow_tests.ButtonClick)
|
||||
|
||||
assert [x.id for x in wf_tests.actions] == [str(i) for i in range(1, 16)]
|
||||
|
||||
|
||||
def test_workflow_tests_button_click(pub):
|
||||
role = pub.role_class(name='test role')
|
||||
role.store()
|
||||
|
@ -544,6 +592,22 @@ def test_workflow_tests_webservice(pub):
|
|||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Webservice response Fake response was used 0 times (instead of 1).'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=0),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Webservice response Fake response was used 1 times (instead of 0).'
|
||||
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id='xxx', call_count=1),
|
||||
]
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Broken, missing webservice response'
|
||||
|
||||
|
||||
def test_workflow_tests_webservice_status_jump(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
|
@ -593,3 +657,98 @@ def test_workflow_tests_webservice_status_jump(pub):
|
|||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
workflow_tests.AssertWebserviceCall(webservice_response_id=response.id, call_count=1),
|
||||
]
|
||||
|
||||
|
||||
def test_workflow_tests_create_from_formdata(pub, http_requests, freezer):
|
||||
role = pub.role_class(name='test role')
|
||||
role.store()
|
||||
user = create_user(pub, is_admin=True)
|
||||
user.roles = [role.id]
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||||
workflow.backoffice_fields_formdef.fields = [fields.StringField(id='bo1', label='Text')]
|
||||
|
||||
new_status = workflow.add_status('New status', 'new-status')
|
||||
status_with_timeout_jump = workflow.add_status('Status with timeout jump', 'status-with-timeout-jump')
|
||||
status_with_button = workflow.add_status('Status with button', 'status-with-button')
|
||||
transition_status = workflow.add_status('Transition status', 'transition-status')
|
||||
end_status = workflow.add_status('End status', 'end-status')
|
||||
|
||||
jump = new_status.add_action('jump')
|
||||
jump.status = status_with_timeout_jump.id
|
||||
|
||||
jump = status_with_timeout_jump.add_action('jump')
|
||||
jump.status = status_with_button.id
|
||||
jump.timeout = '{{ 1 }} day'
|
||||
|
||||
choice = status_with_button.add_action('choice')
|
||||
choice.label = 'Accept'
|
||||
choice.status = transition_status.id
|
||||
choice.by = [role.id]
|
||||
|
||||
wscall = transition_status.add_action('webservice_call')
|
||||
wscall.url = 'http://remote.example.net/json'
|
||||
wscall.varname = 'test_webservice'
|
||||
|
||||
sendmail = transition_status.add_action('sendmail')
|
||||
sendmail.to = ['test@example.org']
|
||||
sendmail.subject = 'In new status'
|
||||
sendmail.body = 'xxx'
|
||||
|
||||
set_backoffice_fields = transition_status.add_action('set-backoffice-fields')
|
||||
set_backoffice_fields.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
|
||||
|
||||
jump = transition_status.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.user_id = user.id
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
formdata.record_workflow_event('frontoffice-created')
|
||||
formdata.perform_workflow()
|
||||
formdata.store()
|
||||
|
||||
freezer.tick(datetime.timedelta(days=2))
|
||||
_apply_timeouts(pub)
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get(formdata.get_url())
|
||||
resp.form.submit('button1').follow()
|
||||
formdata.refresh_from_storage()
|
||||
assert formdata.status == 'wf-end-status'
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata, add_workflow_tests=True)
|
||||
testdef.run(formdef)
|
||||
|
||||
actions = testdef.workflow_tests.actions
|
||||
assert len(actions) == 8
|
||||
|
||||
assert actions[0].key == 'assert-status'
|
||||
assert actions[0].status_name == 'Status with timeout jump'
|
||||
|
||||
assert actions[1].key == 'skip-time'
|
||||
assert actions[1].seconds == 172800
|
||||
|
||||
assert actions[2].key == 'assert-status'
|
||||
assert actions[2].status_name == 'Status with button'
|
||||
|
||||
assert actions[3].key == 'button-click'
|
||||
assert actions[3].button_name == 'Accept'
|
||||
|
||||
assert actions[4].key == 'assert-webservice-call'
|
||||
assert actions[5].key == 'assert-email'
|
||||
assert actions[6].key == 'assert-backoffice-field'
|
||||
|
||||
assert actions[-1].key == 'assert-status'
|
||||
assert actions[-1].status_name == 'End status'
|
||||
|
|
|
@ -241,12 +241,12 @@ def test_webservice_on_error(http_requests, emails, notify_on_errors, record_on_
|
|||
'500': '500 Internal Server Error',
|
||||
'json-err0': None,
|
||||
'json-err0int': None,
|
||||
'json-err1': 'err: 1',
|
||||
'json-err1int': 'err: 1',
|
||||
'json-err1-with-desc': 'err: 1, err_desc: :(',
|
||||
'json-errstr': 'err: bug',
|
||||
'json-errheader1': 'err: 1',
|
||||
'json-errheaderstr': 'err: bug',
|
||||
'json-err1': None,
|
||||
'json-err1int': None,
|
||||
'json-err1-with-desc': None,
|
||||
'json-errstr': None,
|
||||
'json-errheader1': None,
|
||||
'json-errheaderstr': None,
|
||||
}
|
||||
wscall.request = {'url': 'http://remote.example.net/%s' % url_part}
|
||||
wscall.store()
|
||||
|
|
|
@ -1119,6 +1119,117 @@ def test_edit_carddata_targeting_itself(pub):
|
|||
assert carddata.status == 'wf-%s' % st2.id
|
||||
|
||||
|
||||
def test_edit_carddata_auto_targeting_custom_id(pub):
|
||||
CardDef.wipe()
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'Foo Card'
|
||||
carddef.fields = [
|
||||
StringField(id='0', label='foo', varname='foo'),
|
||||
StringField(id='1', label='slug', varname='slug'),
|
||||
]
|
||||
carddef.id_template = 'card_{{form_var_slug}}'
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata = carddef.data_class()()
|
||||
carddata.data = {'0': 'foo', '1': 'foo'}
|
||||
carddata.store()
|
||||
carddata.just_created()
|
||||
carddata.store()
|
||||
assert carddata.identifier == 'card_foo'
|
||||
|
||||
carddef2 = CardDef()
|
||||
carddef2.name = 'Bar Card'
|
||||
carddef2.fields = [
|
||||
ItemField(id='1', label='card', varname='card', data_source={'type': 'carddef:%s' % carddef.url_name})
|
||||
]
|
||||
carddef2.store()
|
||||
|
||||
card_wf = Workflow(name='Card workflow')
|
||||
st1 = card_wf.add_status('Status1')
|
||||
st2 = card_wf.add_status('Status2')
|
||||
|
||||
edit = st1.add_action('edit_carddata', id='_edit')
|
||||
edit.formdef_slug = carddef.url_name
|
||||
edit.target_mode = 'all'
|
||||
edit.mappings = [Mapping(field_id='0', expression='bar')]
|
||||
|
||||
jump = st1.add_action('jump', '_jump')
|
||||
jump.status = st2.id
|
||||
|
||||
card_wf.store()
|
||||
|
||||
carddef2.workflow = card_wf
|
||||
carddef2.store()
|
||||
|
||||
carddata2 = carddef2.data_class()()
|
||||
carddata2.data = {
|
||||
'1': 'card_foo',
|
||||
}
|
||||
carddata2.store()
|
||||
carddata2.just_created()
|
||||
carddata2.store()
|
||||
carddata2.perform_workflow()
|
||||
|
||||
carddata.refresh_from_storage()
|
||||
assert carddata.data['0'] == 'bar'
|
||||
|
||||
|
||||
def test_edit_carddata_manual_targeting_custom_id(pub):
|
||||
CardDef.wipe()
|
||||
|
||||
carddef = CardDef()
|
||||
carddef.name = 'Foo Card'
|
||||
carddef.fields = [
|
||||
StringField(id='0', label='foo', varname='foo'),
|
||||
StringField(id='1', label='slug', varname='slug'),
|
||||
]
|
||||
carddef.id_template = 'card_{{form_var_slug}}'
|
||||
carddef.store()
|
||||
carddef.data_class().wipe()
|
||||
|
||||
carddata = carddef.data_class()()
|
||||
carddata.data = {'0': 'foo', '1': 'foo'}
|
||||
carddata.store()
|
||||
carddata.just_created()
|
||||
carddata.store()
|
||||
assert carddata.identifier == 'card_foo'
|
||||
|
||||
carddef2 = CardDef()
|
||||
carddef2.name = 'Bar Card'
|
||||
carddef2.fields = []
|
||||
carddef2.store()
|
||||
|
||||
card_wf = Workflow(name='Card workflow')
|
||||
st1 = card_wf.add_status('Status1')
|
||||
st2 = card_wf.add_status('Status2')
|
||||
|
||||
edit = st1.add_action('edit_carddata', id='_edit')
|
||||
edit.formdef_slug = carddef.url_name
|
||||
edit.target_mode = 'manual'
|
||||
edit.target_id = 'card_foo'
|
||||
edit.mappings = [Mapping(field_id='0', expression='bar')]
|
||||
|
||||
jump = st1.add_action('jump', '_jump')
|
||||
jump.status = st2.id
|
||||
|
||||
card_wf.store()
|
||||
|
||||
carddef2.workflow = card_wf
|
||||
carddef2.store()
|
||||
|
||||
carddata2 = carddef2.data_class()()
|
||||
carddata2.data = {}
|
||||
carddata2.store()
|
||||
carddata2.just_created()
|
||||
carddata2.store()
|
||||
carddata2.perform_workflow()
|
||||
|
||||
carddata.refresh_from_storage()
|
||||
assert carddata.data['0'] == 'bar'
|
||||
|
||||
|
||||
def test_edit_carddata_from_created_object(pub):
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
|
|
@ -7,7 +7,8 @@ import pytest
|
|||
import responses
|
||||
from quixote import cleanup, get_publisher
|
||||
|
||||
from wcs.fields import BoolField, FileField, ItemField, ItemsField, StringField
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.fields import BlockField, BoolField, FileField, ItemField, ItemsField, StringField
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.wf.wscall import JournalWsCallErrorPart, WebserviceCallStatusItem
|
||||
|
@ -1065,6 +1066,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
),
|
||||
}
|
||||
|
||||
BlockDef.wipe()
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [StringField(id='1', label='String', varname='string')]
|
||||
block.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
|
@ -1075,6 +1082,7 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
StringField(id='4', label='4th field', varname='empty_str'),
|
||||
StringField(id='5', label='5th field', varname='none'),
|
||||
BoolField(id='6', label='6th field', varname='bool'),
|
||||
BlockField(id='7', label='7th field', varname='block', block_slug=block.slug, max_items=3),
|
||||
]
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.store()
|
||||
|
@ -1091,6 +1099,17 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
formdata.data['4'] = 'empty_str'
|
||||
formdata.data['5'] = None
|
||||
formdata.data['6'] = False
|
||||
formdata.data['7'] = {
|
||||
'data': [
|
||||
{
|
||||
'1': 'plop',
|
||||
},
|
||||
{
|
||||
'1': 'poulpe',
|
||||
},
|
||||
],
|
||||
'schema': {},
|
||||
}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
|
@ -1115,11 +1134,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
'none': '{{ form_var_none }}',
|
||||
'bool': '{{ form_var_bool_raw }}',
|
||||
'time': '{{ "13:12"|time }}',
|
||||
'block_template': '{% for b in form_var_block %}{{ b.string }}{% endfor %}',
|
||||
}
|
||||
pub.substitutions.feed(formdata)
|
||||
with get_publisher().complex_data():
|
||||
item.perform(formdata)
|
||||
item.perform(formdata)
|
||||
assert sorted(urllib.parse.parse_qsl(urllib.parse.urlparse(http_requests.get_last('url')).query)) == [
|
||||
('block_template', 'ploppoulpe'),
|
||||
('bool', 'False'),
|
||||
('decimal', '1E+3'),
|
||||
('decimal2', '1000.1'),
|
||||
|
|
|
@ -1140,7 +1140,12 @@ class FormDefPage(Directory, TempfileDirectoryMixin):
|
|||
# there are existing formdata, status will have to be mapped
|
||||
return redirect('workflow-status-remapping?new=%s' % workflow_id)
|
||||
|
||||
job = WorkflowChangeJob(formdef=self.formdef, new_workflow_id=workflow_id, status_mapping={})
|
||||
job = WorkflowChangeJob(
|
||||
formdef=self.formdef,
|
||||
new_workflow_id=workflow_id,
|
||||
status_mapping={},
|
||||
user_id=get_session().user,
|
||||
)
|
||||
job.store()
|
||||
get_response().add_after_job(job)
|
||||
return redirect(job.get_processing_url())
|
||||
|
@ -1230,7 +1235,10 @@ class FormDefPage(Directory, TempfileDirectoryMixin):
|
|||
return self.workflow_status_remapping()
|
||||
|
||||
job = WorkflowChangeJob(
|
||||
formdef=self.formdef, new_workflow_id=new_workflow.id, status_mapping=status_mapping
|
||||
formdef=self.formdef,
|
||||
new_workflow_id=new_workflow.id,
|
||||
status_mapping=status_mapping,
|
||||
user_id=get_session().user,
|
||||
)
|
||||
job.store()
|
||||
get_response().add_after_job(job)
|
||||
|
@ -2042,19 +2050,20 @@ class FormsDirectory(AccessControlled, Directory):
|
|||
|
||||
|
||||
class WorkflowChangeJob(AfterJob):
|
||||
def __init__(self, formdef, new_workflow_id, status_mapping):
|
||||
def __init__(self, formdef, new_workflow_id, status_mapping, user_id):
|
||||
super().__init__(
|
||||
label=_('Updating data for new workflow'),
|
||||
formdef_class=formdef.__class__,
|
||||
formdef_id=formdef.id,
|
||||
new_workflow_id=new_workflow_id,
|
||||
status_mapping=status_mapping,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
def execute(self):
|
||||
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
|
||||
workflow = Workflow.get(self.kwargs['new_workflow_id'])
|
||||
formdef.change_workflow(workflow, self.kwargs['status_mapping'])
|
||||
formdef.change_workflow(workflow, self.kwargs['status_mapping'], user_id=self.kwargs.get('user_id'))
|
||||
|
||||
def done_action_url(self):
|
||||
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
|
||||
|
|
|
@ -253,14 +253,15 @@ class LoggedErrorsDirectory(AccessControlled, Directory):
|
|||
options.append(('carddef', _('Card Models'), 'carddef'))
|
||||
if backoffice_root.is_accessible('workflows'):
|
||||
options.append(('others', _('Others'), 'others'))
|
||||
form.add(
|
||||
CheckboxesWidget,
|
||||
'types',
|
||||
title=_('Error types'),
|
||||
value=[x[0] for x in options], # check all by default
|
||||
options=options,
|
||||
required=True,
|
||||
)
|
||||
if not (self.formdef_id or self.workflow_id):
|
||||
form.add(
|
||||
CheckboxesWidget,
|
||||
'types',
|
||||
title=_('Error types'),
|
||||
value=[x[0] for x in options], # check all by default
|
||||
options=options,
|
||||
required=True,
|
||||
)
|
||||
form.add(
|
||||
DateWidget,
|
||||
'latest_occurence',
|
||||
|
@ -275,17 +276,22 @@ class LoggedErrorsDirectory(AccessControlled, Directory):
|
|||
return redirect('.')
|
||||
|
||||
if form.get_submit() == 'submit' and not form.has_errors():
|
||||
type_criterias = []
|
||||
if 'formdef' in form.get_widget('types').parse():
|
||||
type_criterias.append(Equal('formdef_class', 'FormDef'))
|
||||
if 'carddef' in form.get_widget('types').parse():
|
||||
type_criterias.append(Equal('formdef_class', 'CardDef'))
|
||||
if 'others' in form.get_widget('types').parse():
|
||||
type_criterias.append(Null('formdef_class'))
|
||||
criterias = [
|
||||
Less('latest_occurence_timestamp', form.get_widget('latest_occurence').parse()),
|
||||
Or(type_criterias),
|
||||
]
|
||||
criterias = []
|
||||
|
||||
if self.formdef_id and self.formdef_class:
|
||||
criterias.append(Equal('formdef_id', self.formdef_id))
|
||||
criterias.append(Equal('formdef_class', self.formdef_class.__name__))
|
||||
elif self.workflow_id:
|
||||
criterias.append(Equal('workflow_id', self.workflow_id))
|
||||
else:
|
||||
if 'formdef' in form.get_widget('types').parse():
|
||||
criterias.append(Equal('formdef_class', 'FormDef'))
|
||||
if 'carddef' in form.get_widget('types').parse():
|
||||
criterias.append(Equal('formdef_class', 'CardDef'))
|
||||
if 'others' in form.get_widget('types').parse():
|
||||
criterias.append(Null('formdef_class'))
|
||||
criterias = [Or(criterias)]
|
||||
criterias.append(Less('latest_occurence_timestamp', form.get_widget('latest_occurence').parse()))
|
||||
get_publisher().loggederror_class.wipe(clause=criterias)
|
||||
return redirect('.')
|
||||
|
||||
|
|
|
@ -344,13 +344,18 @@ class TestsDirectory(Directory):
|
|||
]
|
||||
|
||||
if formdata_options:
|
||||
creation_options = [
|
||||
('empty', _('Fill data manually'), 'empty'),
|
||||
('formdata', _('Import data from form'), 'formdata'),
|
||||
]
|
||||
if get_publisher().has_site_option('enable-workflow-tests'):
|
||||
creation_options.append(
|
||||
('formdata-wf', _('Import data from form (and initialise workflow tests)'), 'formdata-wf')
|
||||
)
|
||||
form.add(
|
||||
RadiobuttonsWidget,
|
||||
'creation_mode',
|
||||
options=[
|
||||
('empty', _('Fill data manually'), 'empty'),
|
||||
('formdata', _('Import data from form'), 'formdata'),
|
||||
],
|
||||
options=creation_options,
|
||||
value='empty',
|
||||
attrs={'data-dynamic-display-parent': 'true'},
|
||||
)
|
||||
|
@ -362,7 +367,7 @@ class TestsDirectory(Directory):
|
|||
hint=_('Form is only used for initial data alimentation, no link is kept with created test.'),
|
||||
attrs={
|
||||
'data-dynamic-display-child-of': 'creation_mode',
|
||||
'data-dynamic-display-value-in': 'formdata',
|
||||
'data-dynamic-display-value-in': 'formdata|formdata-wf',
|
||||
},
|
||||
**{'data-autocomplete': 'true'},
|
||||
)
|
||||
|
@ -392,7 +397,11 @@ class TestsDirectory(Directory):
|
|||
formdata_id = form.get_widget('formdata').parse()
|
||||
formdata = self.objectdef.data_class().get(formdata_id)
|
||||
|
||||
testdef = TestDef.create_from_formdata(self.objectdef, formdata)
|
||||
testdef = TestDef.create_from_formdata(
|
||||
self.objectdef,
|
||||
formdata,
|
||||
add_workflow_tests=bool(creation_mode_widget.parse() == 'formdata-wf'),
|
||||
)
|
||||
testdef.name = form.get_widget('name').parse()
|
||||
testdef.agent_id = get_session().user
|
||||
testdef.store()
|
||||
|
|
|
@ -51,6 +51,17 @@ class NamedWsCallUI:
|
|||
)
|
||||
form.add(WsCallRequestWidget, 'request', value=self.wscall.request, title=_('Request'), required=True)
|
||||
|
||||
form.widgets.append(
|
||||
HtmlWidget(
|
||||
'<div class="infonotice"><p>%s</p></div>'
|
||||
% _(
|
||||
'This tab is about connection, payload, and HTTP errors. '
|
||||
'Application errors ("err" property in response different than zero) '
|
||||
'are always silent.'
|
||||
),
|
||||
tab=('error', _('Error Handling')),
|
||||
)
|
||||
)
|
||||
form.add(
|
||||
CheckboxWidget,
|
||||
'notify_on_errors',
|
||||
|
@ -63,6 +74,7 @@ class NamedWsCallUI:
|
|||
'record_on_errors',
|
||||
title=_('Record on errors'),
|
||||
value=self.wscall.record_on_errors if self.wscall.slug else True,
|
||||
default_value=True,
|
||||
tab=('error', _('Error Handling')),
|
||||
)
|
||||
if not self.wscall.is_readonly():
|
||||
|
|
|
@ -19,7 +19,7 @@ from quixote import get_publisher, get_request, get_session
|
|||
from wcs.formdata import FormData
|
||||
|
||||
from .qommon import _
|
||||
from .sql_criterias import Equal, Null, StrictNotEqual
|
||||
from .sql_criterias import Equal
|
||||
|
||||
|
||||
class CardData(FormData):
|
||||
|
@ -39,20 +39,6 @@ class CardData(FormData):
|
|||
|
||||
formdef = property(get_formdef)
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, value):
|
||||
try:
|
||||
return cls.select(
|
||||
[
|
||||
StrictNotEqual('status', 'draft'),
|
||||
Null('anonymised'),
|
||||
cls._formdef.get_by_id_criteria(value),
|
||||
],
|
||||
limit=1,
|
||||
)[0]
|
||||
except IndexError:
|
||||
raise KeyError(value)
|
||||
|
||||
def get_data_source_structured_item(
|
||||
self, digest_key='default', group_by=None, with_related_urls=False, with_files_urls=False
|
||||
):
|
||||
|
|
|
@ -28,6 +28,7 @@ from wcs.qommon.ods import NS as OD_NS
|
|||
from wcs.qommon.ods import clean_text as od_clean_text
|
||||
|
||||
from .base import SetValueError, WidgetField
|
||||
from .item import UnknownCardValueError
|
||||
|
||||
|
||||
class MissingBlockFieldError(Exception):
|
||||
|
@ -67,7 +68,11 @@ class BlockRowValue:
|
|||
sub_field.set_value(row_data, sub_value)
|
||||
return row_data
|
||||
|
||||
row_data = make_row_data(self.attributes)
|
||||
try:
|
||||
row_data = make_row_data(self.attributes)
|
||||
except UnknownCardValueError as e:
|
||||
get_publisher().record_error(_('invalid value when creating block: %s') % str(e), exception=e)
|
||||
return None
|
||||
|
||||
current_block_value = data.get(field.id)
|
||||
if not self.check_current_value(current_block_value):
|
||||
|
|
|
@ -35,7 +35,6 @@ class NumericField(WidgetField):
|
|||
use_live_server_validation = True
|
||||
|
||||
widget_class = NumericWidget
|
||||
required = False
|
||||
validation = None
|
||||
|
||||
restrict_to_integers = True
|
||||
|
|
|
@ -32,7 +32,7 @@ from quixote.errors import RequestError
|
|||
from quixote.html import htmltext
|
||||
from quixote.http_request import Upload
|
||||
|
||||
from wcs.sql_criterias import And, Contains, Equal, Intersects
|
||||
from wcs.sql_criterias import And, Contains, Equal, Intersects, Null, StrictNotEqual
|
||||
|
||||
from .qommon import _, misc
|
||||
from .qommon.evalutils import make_datetime
|
||||
|
@ -365,7 +365,10 @@ class FormData(StorableObject):
|
|||
def get_by_id(cls, value):
|
||||
if cls._formdef.id_template:
|
||||
try:
|
||||
return cls.select([Equal('id_display', str(value))], limit=1)[0]
|
||||
return cls.select(
|
||||
[StrictNotEqual('status', 'draft'), Null('anonymised'), Equal('id_display', str(value))],
|
||||
limit=1,
|
||||
)[0]
|
||||
except IndexError:
|
||||
raise KeyError(value)
|
||||
return cls.get(value)
|
||||
|
@ -1488,6 +1491,8 @@ class FormData(StorableObject):
|
|||
for field in fields:
|
||||
if anonymise and field.anonymise == 'final':
|
||||
continue
|
||||
if field.is_no_data_field:
|
||||
continue
|
||||
if not field.varname and not include_unnamed_fields:
|
||||
continue
|
||||
if field.varname in seen:
|
||||
|
@ -1930,7 +1935,7 @@ class FormData(StorableObject):
|
|||
if object_type:
|
||||
# workflow action
|
||||
try:
|
||||
yield objectdef.data_class().get(target_id)
|
||||
yield objectdef.data_class().get_by_id(target_id)
|
||||
except KeyError:
|
||||
# linked object may be missing
|
||||
pass
|
||||
|
@ -1950,7 +1955,7 @@ class FormData(StorableObject):
|
|||
_('%s - not found') % origin,
|
||||
)
|
||||
else:
|
||||
yield (_objectdef.data_class().get(target_id), origin)
|
||||
yield (_objectdef.data_class().get_by_id(target_id), origin)
|
||||
except ValueError:
|
||||
pass
|
||||
except KeyError:
|
||||
|
|
|
@ -1967,7 +1967,7 @@ class FormDef(StorableObject):
|
|||
# chunk contains the fields.
|
||||
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
|
||||
|
||||
def change_workflow(self, new_workflow, status_mapping=None):
|
||||
def change_workflow(self, new_workflow, status_mapping=None, user_id=None):
|
||||
old_workflow = self.get_workflow()
|
||||
|
||||
formdata_count = self.data_class().count()
|
||||
|
@ -2003,7 +2003,7 @@ class FormDef(StorableObject):
|
|||
if function_key not in new_workflow.roles:
|
||||
del self.workflow_roles[function_key]
|
||||
removed_functions.add(function_key)
|
||||
self.store(comment=_('Workflow change'))
|
||||
self.store(comment=_('Workflow change'), snapshot_store_user=user_id)
|
||||
if formdata_count:
|
||||
# instruct formdef to update its security rules
|
||||
self.data_class().rebuild_security()
|
||||
|
|
|
@ -37,7 +37,7 @@ from quixote.util import randbytes
|
|||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.categories import Category
|
||||
from wcs.fields import MissingBlockFieldError, SetValueError
|
||||
from wcs.fields import MissingBlockFieldError, PageField, SetValueError
|
||||
from wcs.formdata import Evolution, FormData
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.forms.common import FormStatusPage, FormTemplateMixin, TempfileDirectoryMixin
|
||||
|
@ -1006,9 +1006,12 @@ class FormPage(Directory, TempfileDirectoryMixin, FormTemplateMixin):
|
|||
|
||||
def create_form(self, *args, **kwargs):
|
||||
form = self.formdef.create_form(*args, **kwargs)
|
||||
if len(self.pages) == 1 and not self.formdef.confirmation:
|
||||
# if there's a form with a single page, no confirmation, add native quixote
|
||||
# CSRF protection.
|
||||
if (
|
||||
len([x for x in self.formdef.fields if isinstance(x, PageField)]) < 2
|
||||
and not self.formdef.confirmation
|
||||
):
|
||||
# if there's a form with a single page (at all, not as the result of conditions),
|
||||
# and no confirmation page, add native quixote CSRF protection.
|
||||
form.add(FormTokenWidget, form.TOKEN_NAME)
|
||||
form.attrs['data-live-url'] = self.formdef.get_url(language=get_publisher().current_language) + 'live'
|
||||
form.attrs['data-live-validation-url'] = (
|
||||
|
|
|
@ -4,8 +4,8 @@ msgid ""
|
|||
msgstr ""
|
||||
"Project-Id-Version: wcs 0\n"
|
||||
"Report-Msgid-Bugs-To: \n"
|
||||
"POT-Creation-Date: 2024-03-04 17:49+0100\n"
|
||||
"PO-Revision-Date: 2024-03-04 17:49+0100\n"
|
||||
"POT-Creation-Date: 2024-03-12 09:34+0100\n"
|
||||
"PO-Revision-Date: 2024-03-12 09:34+0100\n"
|
||||
"Last-Translator: Thomas Noël <tnoel@entrouvert.com>\n"
|
||||
"Language-Team: french\n"
|
||||
"Language: fr\n"
|
||||
|
@ -2568,6 +2568,12 @@ msgstr "Saisir les données manuellement"
|
|||
msgid "Import data from form"
|
||||
msgstr "Importer les données depuis une demande"
|
||||
|
||||
#: admin/tests.py
|
||||
msgid "Import data from form (and initialise workflow tests)"
|
||||
msgstr ""
|
||||
"Importer les données depuis une demande (et initialiser les tests de "
|
||||
"workflow)"
|
||||
|
||||
#: admin/tests.py
|
||||
msgid ""
|
||||
"Form is only used for initial data alimentation, no link is kept with "
|
||||
|
@ -3483,6 +3489,15 @@ msgstr "Attention, ce changement est risqué"
|
|||
msgid "Request"
|
||||
msgstr "Requête"
|
||||
|
||||
#: admin/wscalls.py
|
||||
msgid ""
|
||||
"This tab is about connection, payload, and HTTP errors. Application errors "
|
||||
"(\"err\" property in response different than zero) are always silent."
|
||||
msgstr ""
|
||||
"Cet onglet concerne les erreurs de connexion et de message à transmettre, "
|
||||
"ainsi qque les erreurs HTTP. Les erreurs applicatives (quand l’attribut "
|
||||
"« err » dans les réponses est différent de zéro) sont toujours silencieuses."
|
||||
|
||||
#: admin/wscalls.py wf/wscall.py
|
||||
msgid "Error Handling"
|
||||
msgstr "Gestion des erreurs"
|
||||
|
@ -5433,6 +5448,11 @@ msgstr "La valeur doit contenir un ou plusieurs noms valides."
|
|||
msgid "Missing block field: %s"
|
||||
msgstr "Bloc de champ manquant : %s"
|
||||
|
||||
#: fields/block.py
|
||||
#, python-format
|
||||
msgid "invalid value when creating block: %s"
|
||||
msgstr "valeur invalide pour la création du bloc : %s"
|
||||
|
||||
#: fields/block.py
|
||||
#, python-format
|
||||
msgid "Field Block (%s)"
|
||||
|
@ -11343,7 +11363,7 @@ msgstr "Valeur actuelle"
|
|||
|
||||
#: wf/export_to_model.py
|
||||
msgid "Template to obtain model file"
|
||||
msgstr "Gabarit pour obtenir le ficher servant de modèle"
|
||||
msgstr "Gabarit pour obtenir le fichier servant de modèle"
|
||||
|
||||
#: wf/export_to_model.py
|
||||
msgid "Convert generated file to PDF"
|
||||
|
@ -11884,7 +11904,7 @@ msgid ""
|
|||
"additional POST data in an additional \"extra\" key. It is often not "
|
||||
"necessary."
|
||||
msgstr ""
|
||||
"Attention : cette option envoie l’intégralité des donneés de la demande ou "
|
||||
"Attention : cette option envoie l’intégralité des données de la demande ou "
|
||||
"fiche concernée, avec les données POST supplémentaires dans une clé "
|
||||
"« extra ». Cette option ne devrait généralement pas être utilisée."
|
||||
|
||||
|
|
|
@ -455,6 +455,7 @@ class WcsPublisher(QommonPublisher):
|
|||
for _formdef in FormDef.select() + CardDef.select():
|
||||
sql.do_formdef_tables(_formdef)
|
||||
sql.migrate_global_views(conn, cur)
|
||||
sql.init_search_tokens()
|
||||
cur.close()
|
||||
|
||||
def record_deprecated_usage(self, *args, **kwargs):
|
||||
|
|
|
@ -565,6 +565,7 @@ class HtmlWidget:
|
|||
self.attrs = {}
|
||||
self.string = string
|
||||
self.title = title
|
||||
self.tab = kwargs.pop('tab', None)
|
||||
|
||||
def render(self):
|
||||
return self.render_content()
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
|
@ -252,7 +253,7 @@ class WorkCell:
|
|||
url = '%sfiles/%s/%s' % (
|
||||
self.formdata.get_url(backoffice=True),
|
||||
self.data_field.id,
|
||||
self.native_value,
|
||||
urllib.parse.quote(self.native_value),
|
||||
)
|
||||
a = ET.SubElement(p, '{%s}a' % NS['text'])
|
||||
a.attrib['{%s}href' % NS['xlink']] = url
|
||||
|
|
|
@ -690,6 +690,11 @@ class QommonPublisher(Publisher):
|
|||
for error in self.loggederror_class.select(clause=clauses):
|
||||
self.loggederror_class.remove_object(error.id)
|
||||
|
||||
def clean_search_tokens(self, **kwargs):
|
||||
from wcs import sql
|
||||
|
||||
sql.purge_obsolete_search_tokens()
|
||||
|
||||
@classmethod
|
||||
def register_cronjobs(cls):
|
||||
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions'))
|
||||
|
@ -702,6 +707,9 @@ class QommonPublisher(Publisher):
|
|||
cls.register_cronjob(
|
||||
CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors')
|
||||
)
|
||||
cls.register_cronjob(
|
||||
CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens')
|
||||
)
|
||||
|
||||
_initialized = False
|
||||
|
||||
|
|
|
@ -677,6 +677,11 @@ div.form-preview > br {
|
|||
clear: both; /* stop grid */
|
||||
}
|
||||
|
||||
div.form-preview {
|
||||
overflow-y: auto;
|
||||
clear: both;
|
||||
}
|
||||
|
||||
div.widget.no-bottom-margin {
|
||||
margin-bottom: 0.2rem;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,24 @@
|
|||
outline: 1px dashed var(--primary-color, #bbb);
|
||||
}
|
||||
}
|
||||
&.pk-vertical-items {
|
||||
.content {
|
||||
flex-direction: column;
|
||||
}
|
||||
.item-with-image {
|
||||
padding: 0;
|
||||
grid-template-areas: 'input picture label';
|
||||
justify-items: flex-start;
|
||||
align-items: center;
|
||||
grid-template-columns: auto var(--image-size) 1fr;
|
||||
grid-column-gap: 0.7em;
|
||||
grid-template-rows: auto;
|
||||
flex: 0 0 auto;
|
||||
&--input {
|
||||
margin-right: 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
.item-with-image {
|
||||
|
|
|
@ -461,7 +461,7 @@ $.WcsFileUpload = {
|
|||
image_preview: function(base_widget, img_token) {
|
||||
var file_button = base_widget.find('.file-button');
|
||||
if(file_button.hasClass("file-image")) {
|
||||
file_button[0].style.setProperty('--image-preview-url', `url(${window.location.href}tempfile?t=${img_token}&thumbnail=1)`);
|
||||
file_button[0].style.setProperty('--image-preview-url', `url(${window.location.pathname}tempfile?t=${img_token}&thumbnail=1)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1331,6 +1331,7 @@ def json_dumps(value):
|
|||
|
||||
@register.simple_tag(takes_context=True)
|
||||
def make_public_url(context, url=None):
|
||||
url = unlazy(url)
|
||||
if not url:
|
||||
return ''
|
||||
token = get_session().create_token('sign-url-token', {'url': url})
|
||||
|
|
|
@ -150,13 +150,21 @@ class Snapshot:
|
|||
]
|
||||
|
||||
@classmethod
|
||||
def snap(cls, instance, comment=None, label=None, store_user=True, application=None):
|
||||
def snap(cls, instance, comment=None, label=None, store_user=None, application=None):
|
||||
obj = cls()
|
||||
obj.object_type = instance.xml_root_node
|
||||
obj.object_id = instance.id
|
||||
obj.timestamp = now()
|
||||
if get_session() and store_user:
|
||||
obj.user_id = get_session().user
|
||||
# store_user:
|
||||
# None/True: get user from active session
|
||||
# False: do not store user
|
||||
# any value: consider it as user id
|
||||
# (store_user is explicitely checked to be a boolean, to avoid the "1" integer being treated as True)
|
||||
if store_user is None or (isinstance(store_user, bool) and store_user is True):
|
||||
if get_session():
|
||||
obj.user_id = get_session().user
|
||||
elif store_user:
|
||||
obj.user_id = store_user
|
||||
|
||||
tree = instance.export_to_xml(include_id=True)
|
||||
# remove position for categories
|
||||
|
|
187
wcs/sql.py
187
wcs/sql.py
|
@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = {
|
|||
}
|
||||
|
||||
|
||||
def _table_exists(cur, table_name):
|
||||
cur.execute('SELECT 1 FROM pg_class WHERE relname = %s;', (table_name,))
|
||||
rows = cur.fetchall()
|
||||
return len(rows) > 0
|
||||
|
||||
|
||||
def _trigger_exists(cur, table_name, trigger_name):
|
||||
cur.execute(
|
||||
'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s;', (table_name, trigger_name)
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
return len(rows) > 0
|
||||
|
||||
|
||||
class WcsPgConnection(psycopg2.extensions.connection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -1579,6 +1593,8 @@ def do_global_views(conn, cur):
|
|||
% (name, category.id)
|
||||
)
|
||||
|
||||
init_search_tokens_triggers(cur)
|
||||
|
||||
|
||||
def clean_global_views(conn, cur):
|
||||
# Purge of any dead data
|
||||
|
@ -1671,11 +1687,154 @@ def init_global_table(conn=None, cur=None):
|
|||
endpoint_status=endpoint_status_filter,
|
||||
)
|
||||
)
|
||||
init_search_tokens_data(cur)
|
||||
|
||||
if own_conn:
|
||||
cur.close()
|
||||
|
||||
|
||||
def init_search_tokens(conn=None, cur=None):
|
||||
own_cur = False
|
||||
if cur is None:
|
||||
own_cur = True
|
||||
conn, cur = get_connection_and_cursor()
|
||||
|
||||
# Create table
|
||||
cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);')
|
||||
|
||||
# Create triggers
|
||||
init_search_tokens_triggers(cur)
|
||||
|
||||
# Fill table
|
||||
init_search_tokens_data(cur)
|
||||
|
||||
# Index at the end, small performance trick... not that useful, but it's free...
|
||||
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;')
|
||||
cur.execute(
|
||||
'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);'
|
||||
)
|
||||
|
||||
# And last: functions to use this brand new table
|
||||
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);')
|
||||
cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);')
|
||||
cur.execute(
|
||||
r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text)
|
||||
RETURNS tsquery
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $function$
|
||||
with
|
||||
tokenized as (select unnest(regexp_split_to_array($1, '\s+')) w),
|
||||
super_tokenized as (
|
||||
select w,
|
||||
coalesce((select plainto_tsquery(perfect.token) from wcs_search_tokens perfect where perfect.token = plainto_tsquery(w)::text),
|
||||
tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc),
|
||||
plainto_tsquery(w)) tokens
|
||||
from tokenized
|
||||
left join wcs_search_tokens partial on partial.token % w and w not similar to '%[0-9]{2,}%'
|
||||
group by w)
|
||||
select tsquery_agg_and(tokens) from super_tokenized;
|
||||
$function$;"""
|
||||
)
|
||||
|
||||
if own_cur:
|
||||
cur.close()
|
||||
|
||||
|
||||
def init_search_tokens_triggers(cur):
|
||||
# We define only appending triggers, ie on INSERT and UPDATE.
|
||||
# It would be far heavier to maintain deletions here, and having extra data has
|
||||
# no or marginal side effect on search performances, and absolutely no impact
|
||||
# on search results.
|
||||
# Instead, a weekly cron job will delete obsolete entries, thus making it sure no
|
||||
# personal data is kept uselessly.
|
||||
|
||||
# First part: the appending function
|
||||
cur.execute(
|
||||
"""CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn ()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$function$;"""
|
||||
)
|
||||
|
||||
if not (_table_exists(cur, 'wcs_search_tokens')):
|
||||
# abort trigger creation if tokens table doesn't exist yet
|
||||
return
|
||||
|
||||
if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists(
|
||||
cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd'
|
||||
):
|
||||
# Second part: insert and update triggers for wcs_all_forms
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER wcs_all_forms_fts_trg_ins
|
||||
AFTER INSERT ON wcs_all_forms
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER wcs_all_forms_fts_trg_upd
|
||||
AFTER UPDATE OF fts ON wcs_all_forms
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
|
||||
if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists(
|
||||
cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd'
|
||||
):
|
||||
# Third part: insert and update triggers for searchable_formdefs
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER searchable_formdefs_fts_trg_ins
|
||||
AFTER INSERT ON searchable_formdefs
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
cur.execute(
|
||||
"""CREATE TRIGGER searchable_formdefs_fts_trg_upd
|
||||
AFTER UPDATE OF fts ON searchable_formdefs
|
||||
FOR EACH ROW WHEN (NEW.fts IS NOT NULL)
|
||||
EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();"""
|
||||
)
|
||||
|
||||
|
||||
def init_search_tokens_data(cur):
|
||||
if not (_table_exists(cur, 'wcs_search_tokens')):
|
||||
# abort table data initialization if tokens table doesn't exist yet
|
||||
return
|
||||
|
||||
if _table_exists(cur, 'wcs_all_forms'):
|
||||
cur.execute(
|
||||
"""INSERT INTO wcs_search_tokens
|
||||
SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms
|
||||
ON CONFLICT(token) DO NOTHING;"""
|
||||
)
|
||||
if _table_exists(cur, 'searchable_formdefs'):
|
||||
cur.execute(
|
||||
"""INSERT INTO wcs_search_tokens
|
||||
SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs
|
||||
ON CONFLICT(token) DO NOTHING;"""
|
||||
)
|
||||
|
||||
|
||||
def purge_obsolete_search_tokens(cur=None):
|
||||
own_cur = False
|
||||
if cur is None:
|
||||
own_cur = True
|
||||
_, cur = get_connection_and_cursor()
|
||||
|
||||
cur.execute(
|
||||
"""DELETE FROM wcs_search_tokens
|
||||
WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms)
|
||||
AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);"""
|
||||
)
|
||||
if own_cur:
|
||||
cur.close()
|
||||
|
||||
|
||||
class SqlMixin:
|
||||
_table_name = None
|
||||
_numerical_id = True
|
||||
|
@ -2355,13 +2514,20 @@ class SqlDataMixin(SqlMixin):
|
|||
if period_end:
|
||||
criterias.append(Less('f.receipt_time', period_end))
|
||||
if extra_criterias:
|
||||
for criteria in extra_criterias:
|
||||
|
||||
def alter_criteria(criteria):
|
||||
# change attributes to point to the formdata table (f)
|
||||
if hasattr(criteria, 'attribute'):
|
||||
criteria.attribute = f'f.{criteria.attribute}'
|
||||
elif hasattr(criteria, 'criteria'): # Not()
|
||||
alter_criteria(criteria.criteria)
|
||||
elif hasattr(criteria, 'criterias'): # Or()
|
||||
for c in criteria.criterias:
|
||||
alter_criteria(c)
|
||||
|
||||
for criteria in extra_criterias:
|
||||
altered_criteria = copy.deepcopy(criteria)
|
||||
if isinstance(criteria, Not):
|
||||
altered_criteria.criteria.attribute = f'f.{criteria.criteria.attribute}'
|
||||
else:
|
||||
altered_criteria.attribute = f'f.{criteria.attribute}'
|
||||
alter_criteria(altered_criteria)
|
||||
criterias.append(altered_criteria)
|
||||
|
||||
where_clauses, params, dummy = parse_clause(criterias)
|
||||
|
@ -4789,7 +4955,6 @@ class SearchableFormDef(SqlMixin):
|
|||
% (cls._table_name, cls._table_name)
|
||||
)
|
||||
cls.do_indexes(cur)
|
||||
cur.close()
|
||||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.formdef import FormDef
|
||||
|
@ -4798,6 +4963,8 @@ class SearchableFormDef(SqlMixin):
|
|||
CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True)
|
||||
):
|
||||
cls.update(obj=objectdef)
|
||||
init_search_tokens(cur)
|
||||
cur.close()
|
||||
|
||||
@classmethod
|
||||
def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None):
|
||||
|
@ -4835,7 +5002,7 @@ class SearchableFormDef(SqlMixin):
|
|||
def search(cls, obj_type, string):
|
||||
_, cur = get_connection_and_cursor()
|
||||
cur.execute(
|
||||
'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)',
|
||||
'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)',
|
||||
(FtsMatch.get_fts_value(string),),
|
||||
)
|
||||
ids = [x[0] for x in cur.fetchall()]
|
||||
|
@ -5100,7 +5267,7 @@ def get_period_total(
|
|||
# latest migration, number + description (description is not used
|
||||
# programmaticaly but will make sure git conflicts if two migrations are
|
||||
# separately added with the same number)
|
||||
SQL_LEVEL = (105, 'change test result json structure')
|
||||
SQL_LEVEL = (106, 'improved fts method')
|
||||
|
||||
|
||||
def migrate_global_views(conn, cur):
|
||||
|
@ -5433,6 +5600,10 @@ def migrate():
|
|||
for formdef in FormDef.select() + CardDef.select():
|
||||
do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
|
||||
|
||||
if sql_level < 106:
|
||||
# 106: new fts mechanism with tokens table
|
||||
init_search_tokens()
|
||||
|
||||
if sql_level != SQL_LEVEL[0]:
|
||||
cur.execute(
|
||||
'''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''',
|
||||
|
|
|
@ -373,6 +373,11 @@ class FtsMatch(Criteria):
|
|||
return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value)
|
||||
|
||||
|
||||
class WcsFtsMatch(FtsMatch):
|
||||
def as_sql(self):
|
||||
return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value)
|
||||
|
||||
|
||||
class ElementEqual(Criteria):
|
||||
def __init__(self, attribute, key, value, **kwargs):
|
||||
super().__init__(attribute, value)
|
||||
|
|
|
@ -185,7 +185,7 @@ class TestDef(sql.TestDef):
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def create_from_formdata(cls, formdef, formdata):
|
||||
def create_from_formdata(cls, formdef, formdata, add_workflow_tests=False):
|
||||
testdef = cls()
|
||||
testdef.object_type = formdef.get_table_name()
|
||||
testdef.object_id = formdef.id
|
||||
|
@ -213,6 +213,10 @@ class TestDef(sql.TestDef):
|
|||
'fields': field_data,
|
||||
'user': formdata.user.get_json_export_dict() if formdata.user else None,
|
||||
}
|
||||
|
||||
if add_workflow_tests:
|
||||
testdef.workflow_tests.add_actions_from_formdata(formdata)
|
||||
|
||||
return testdef
|
||||
|
||||
def build_formdata(self, objectdef, include_fields=False):
|
||||
|
|
|
@ -267,7 +267,7 @@ class ExternalWorkflowGlobalAction(WorkflowStatusItem):
|
|||
return
|
||||
|
||||
try:
|
||||
yield objectdef.data_class().get(target_id)
|
||||
yield objectdef.data_class().get_by_id(target_id)
|
||||
except KeyError as e:
|
||||
# use custom error message depending on target type
|
||||
get_publisher().record_error(
|
||||
|
|
|
@ -184,6 +184,7 @@ class RegisterCommenterWorkflowStatusItem(WorkflowStatusItem):
|
|||
yield from super().get_computed_strings()
|
||||
if not self.comment_template:
|
||||
yield self.comment
|
||||
yield from (self.attachments or [])
|
||||
|
||||
def migrate(self):
|
||||
changed = super().migrate()
|
||||
|
|
|
@ -238,6 +238,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
yield self.body
|
||||
if self.to:
|
||||
yield from self.to
|
||||
yield from (self.attachments or [])
|
||||
|
||||
def perform(self, formdata, ignore_i18n=False):
|
||||
if not self.to:
|
||||
|
|
|
@ -77,6 +77,9 @@ class WorkflowTests(XmlStorableObject):
|
|||
for action in self.actions:
|
||||
status = formdata.get_status()
|
||||
|
||||
if not action.is_configured:
|
||||
continue
|
||||
|
||||
if not action.is_assertion:
|
||||
formdata.sent_emails.clear()
|
||||
formdata.used_webservice_responses.clear()
|
||||
|
@ -106,10 +109,43 @@ class WorkflowTests(XmlStorableObject):
|
|||
if not self.actions:
|
||||
return '1'
|
||||
|
||||
return str(int(max(x.id for x in self.actions)) + 1)
|
||||
return str(max(int(x.id) for x in self.actions) + 1)
|
||||
|
||||
def add_action(self, action_class):
|
||||
self.actions.append(action_class(id=self.get_new_action_id()))
|
||||
action = action_class(id=self.get_new_action_id())
|
||||
self.actions.append(action)
|
||||
return action
|
||||
|
||||
def add_actions_from_formdata(self, formdata):
|
||||
test_action_class_by_trace_id = {
|
||||
'sendmail': AssertEmail,
|
||||
'webservice_call': AssertWebserviceCall,
|
||||
'set-backoffice-fields': AssertBackofficeFieldValues,
|
||||
'button': ButtonClick,
|
||||
'timeout-jump': SkipTime,
|
||||
}
|
||||
|
||||
previous_trace = None
|
||||
workflow_traces = formdata.get_workflow_traces()
|
||||
for trace in workflow_traces:
|
||||
trace_id = trace.event or trace.action_item_key
|
||||
|
||||
if trace_id not in test_action_class_by_trace_id:
|
||||
previous_trace = trace
|
||||
continue
|
||||
|
||||
if trace.event:
|
||||
action = self.add_action(AssertStatus)
|
||||
action.set_attributes_from_trace(formdata.formdef, trace)
|
||||
|
||||
action = self.add_action(test_action_class_by_trace_id[trace_id])
|
||||
action.set_attributes_from_trace(formdata.formdef, trace, previous_trace)
|
||||
|
||||
previous_trace = trace
|
||||
|
||||
if workflow_traces:
|
||||
action = self.add_action(AssertStatus)
|
||||
action.set_attributes_from_trace(formdata.formdef, workflow_traces[-1])
|
||||
|
||||
def store(self, *args, **kwargs):
|
||||
super().store(*args, **kwargs)
|
||||
|
@ -162,10 +198,20 @@ class WorkflowTestAction(XmlStorableObject):
|
|||
def __str__(self):
|
||||
return str(self.label)
|
||||
|
||||
@property
|
||||
def is_configured(self):
|
||||
return not any(
|
||||
field
|
||||
for field, _ in self.XML_NODES
|
||||
if field != 'id' and field not in self.optional_fields and not getattr(self, field)
|
||||
)
|
||||
|
||||
def set_attributes_from_trace(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def render_as_line(self):
|
||||
for field, dummy in self.XML_NODES:
|
||||
if field not in self.optional_fields and not getattr(self, field):
|
||||
return _('not configured')
|
||||
if not self.is_configured:
|
||||
return _('not configured')
|
||||
|
||||
return self.details_label
|
||||
|
||||
|
@ -187,6 +233,16 @@ class ButtonClick(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return _('Click on "%s"') % self.button_name
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
try:
|
||||
item = [
|
||||
x for x in self.get_all_choice_actions(formdef) if x.id == trace.event_args['action_item_id']
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
|
||||
self.button_name = item.label
|
||||
|
||||
def perform(self, formdata, user):
|
||||
status = formdata.get_status()
|
||||
form = status.get_action_form(formdata, user)
|
||||
|
@ -198,11 +254,14 @@ class ButtonClick(WorkflowTestAction):
|
|||
form.get_submit = lambda: button_widget.name
|
||||
status.handle_form(form, formdata, user, check_replay=False)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
possible_button_names = set()
|
||||
@staticmethod
|
||||
def get_all_choice_actions(formdef):
|
||||
for item in formdef.workflow.get_all_items():
|
||||
if isinstance(item, wf.choice.ChoiceWorkflowStatusItem) and item.status:
|
||||
possible_button_names.add(item.label)
|
||||
yield item
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
possible_button_names = {x.label for x in self.get_all_choice_actions(formdef)}
|
||||
|
||||
if not possible_button_names:
|
||||
return
|
||||
|
@ -238,6 +297,14 @@ class AssertStatus(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return _('Status is "%s"') % self.status_name
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
try:
|
||||
status = formdef.workflow.get_status(trace.status_id)
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
self.status_name = status.name
|
||||
|
||||
def perform(self, formdata, user):
|
||||
status = formdata.get_status()
|
||||
if status.name != self.status_name:
|
||||
|
@ -365,6 +432,10 @@ class SkipTime(WorkflowTestAction):
|
|||
def details_label(self):
|
||||
return seconds2humanduration(self.seconds)
|
||||
|
||||
def set_attributes_from_trace(self, formdef, trace, previous_trace=None):
|
||||
if previous_trace:
|
||||
self.seconds = (trace.timestamp - previous_trace.timestamp).total_seconds()
|
||||
|
||||
def rewind(self, formdata):
|
||||
def rewind_time(timestamp):
|
||||
return timestamp - datetime.timedelta(seconds=self.seconds)
|
||||
|
@ -492,6 +563,8 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
webservice_response_id = None
|
||||
call_count = 1
|
||||
|
||||
optional_fields = ['call_count']
|
||||
|
||||
XML_NODES = WorkflowTestAction.XML_NODES + [
|
||||
('webservice_response_id', 'str'),
|
||||
('call_count', 'int'),
|
||||
|
@ -519,14 +592,18 @@ class AssertWebserviceCall(WorkflowTestAction):
|
|||
return r
|
||||
|
||||
def perform(self, formdata, user):
|
||||
try:
|
||||
response = WebserviceResponse.get(self.webservice_response_id)
|
||||
except KeyError:
|
||||
raise WorkflowTestError(_('Broken, missing webservice response'))
|
||||
|
||||
call_count = 0
|
||||
for response in formdata.used_webservice_responses.copy():
|
||||
if response.id == self.webservice_response_id:
|
||||
formdata.used_webservice_responses.remove(response)
|
||||
for used_response in formdata.used_webservice_responses.copy():
|
||||
if used_response.id == self.webservice_response_id:
|
||||
formdata.used_webservice_responses.remove(used_response)
|
||||
call_count += 1
|
||||
|
||||
if call_count != self.call_count:
|
||||
response = WebserviceResponse.get(self.webservice_response_id)
|
||||
raise WorkflowTestError(
|
||||
_('Webservice response %(name)s was used %(count)s times (instead of %(expected_count)s).')
|
||||
% {'name': response.name, 'count': call_count, 'expected_count': self.call_count}
|
||||
|
|
|
@ -3314,6 +3314,10 @@ class WorkflowStatusItem(XmlSerialisable):
|
|||
if expression['type'] == 'template':
|
||||
old_allow_complex_value = vars.get('allow_complex')
|
||||
vars['allow_complex'] = allow_complex
|
||||
# make sure complex data context manager is used
|
||||
assert (
|
||||
not vars['allow_complex'] or get_publisher().complex_data_cache is not None
|
||||
), 'missing complex_data context manager'
|
||||
try:
|
||||
return Template(expression['value'], raises=raises, autoescape=False).render(vars)
|
||||
except TemplateError as e:
|
||||
|
|
|
@ -114,18 +114,19 @@ def call_webservice(
|
|||
if qs_data: # merge qs_data into url
|
||||
qs = list(urllib.parse.parse_qsl(parsed.query))
|
||||
for key, value in qs_data.items():
|
||||
try:
|
||||
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
|
||||
except Exception as e:
|
||||
get_publisher().record_error(exception=e, notify=True)
|
||||
else:
|
||||
if value:
|
||||
value = get_publisher().get_cached_complex_data(value)
|
||||
if isinstance(value, (tuple, list, set)):
|
||||
qs.extend((key, x) for x in value)
|
||||
with get_publisher().complex_data():
|
||||
try:
|
||||
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
|
||||
except Exception as e:
|
||||
get_publisher().record_error(exception=e, notify=True)
|
||||
else:
|
||||
value = str(value) if value is not None else ''
|
||||
qs.append((key, value))
|
||||
if value:
|
||||
value = get_publisher().get_cached_complex_data(value)
|
||||
if isinstance(value, (tuple, list, set)):
|
||||
qs.extend((key, x) for x in value)
|
||||
else:
|
||||
value = str(value) if value is not None else ''
|
||||
qs.append((key, value))
|
||||
qs = urllib.parse.urlencode(qs)
|
||||
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
||||
|
||||
|
@ -205,8 +206,8 @@ def call_webservice(
|
|||
)
|
||||
return (None, None, None)
|
||||
|
||||
app_error_code = get_app_error_code(response, data, 'json')
|
||||
if (app_error_code != 0 or status >= 400) and (notify_on_errors or record_on_errors):
|
||||
if status >= 400 and (notify_on_errors or record_on_errors):
|
||||
app_error_code = get_app_error_code(response, data, 'json')
|
||||
record_wscall_error(status, data, response, app_error_code, notify_on_errors, record_on_errors)
|
||||
|
||||
return (response, status, data)
|
||||
|
|
Loading…
Reference in New Issue