general: remove is_using_postgresql conditionals (#67190)

This commit is contained in:
Frédéric Péters 2022-07-10 21:32:33 +02:00
parent b184acabd5
commit 3484ae5341
49 changed files with 920 additions and 1755 deletions

View File

@ -788,30 +788,27 @@ def test_api_list_formdata(pub, local_user):
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
if pub.is_using_postgresql():
# just check ordering
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=f0', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
# just check ordering
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=f0', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-f0', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-f0', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=foobar', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=foobar', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-foobar', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-foobar', user=local_user))
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
# check fts
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=foo', user=local_user))
assert len(resp.json) == 30
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=baz', user=local_user))
assert len(resp.json) == 14
# check fts
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=foo', user=local_user))
assert len(resp.json) == 30
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=baz', user=local_user))
assert len(resp.json) == 14
def test_api_list_formdata_order_by_rank(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
@ -938,8 +935,8 @@ def test_api_list_formdata_string_filter(pub, local_user):
params = [
('eq', 'FOO 2', 1),
('ne', 'FOO 2', 3),
('lt', 'FOO 2', 2 if pub.is_using_postgresql() else 3),
('lte', 'FOO 2', 3 if pub.is_using_postgresql() else 4),
('lt', 'FOO 2', 2),
('lte', 'FOO 2', 3),
('gt', 'FOO 2', 0),
('gt', '42', 0),
('gte', 'FOO 2', 1),
@ -957,8 +954,8 @@ def test_api_list_formdata_string_filter(pub, local_user):
('eq', '10', 1),
('eq', '010', 1),
('ne', '10', 3),
('lt', '10', 1 if pub.is_using_postgresql() else 2),
('lte', '10', 2 if pub.is_using_postgresql() else 3),
('lt', '10', 1),
('lte', '10', 2),
('gt', '10', 1),
('gt', '9', 2),
('gte', '10', 2),
@ -1021,8 +1018,8 @@ def test_api_list_formdata_item_filter(pub, local_user):
('eq', '10', 1),
('eq', '010', 1),
('ne', '10', 3),
('lt', '10', 1 if pub.is_using_postgresql() else 2),
('lte', '10', 2 if pub.is_using_postgresql() else 3),
('lt', '10', 1),
('lte', '10', 2),
('gt', '10', 1),
('gt', '9', 2),
('gte', '10', 2),
@ -1041,8 +1038,8 @@ def test_api_list_formdata_item_filter(pub, local_user):
params = [
('eq', 'foo', 1),
('ne', 'foo', 3),
('lt', 'foo', 2 if pub.is_using_postgresql() else 3),
('lte', 'foo', 3 if pub.is_using_postgresql() else 4),
('lt', 'foo', 2),
('lte', 'foo', 3),
('gt', 'foo', 0),
('gt', '42', 0),
('gte', 'foo', 1),
@ -1182,7 +1179,7 @@ def test_api_list_formdata_bool_filter(pub, local_user):
formdata.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user))
assert len(resp.json) == 2 if pub.is_using_postgresql() else 3
assert len(resp.json) == 2
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user))
assert len(resp.json) == 1
params = [
@ -1209,9 +1206,6 @@ def test_api_list_formdata_bool_filter(pub, local_user):
def test_api_list_formdata_date_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
@ -1412,9 +1406,6 @@ def test_api_list_formdata_number_filter(pub, local_user):
def test_api_list_formdata_block_field_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {
@ -2142,11 +2133,6 @@ def test_api_global_geojson(pub, local_user):
formdata.jump_status('finished')
formdata.store()
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
# check empty content if user doesn't have the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
assert 'features' in resp.json
@ -2170,11 +2156,6 @@ def test_api_global_geojson(pub, local_user):
@pytest.mark.parametrize('user', ['query-email', 'api-access'])
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
def test_api_global_listing(pub, local_user, user, auth):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
@ -2279,10 +2260,6 @@ def test_api_global_listing(pub, local_user, user, auth):
def test_api_global_listing_categories_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
Category.wipe()
category1 = Category()
category1.name = 'Category 1'
@ -2391,11 +2368,6 @@ def test_api_global_listing_ignored_roles(pub, local_user):
def test_api_include_anonymised(pub, local_user):
if not pub.is_using_postgresql():
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
@ -2438,10 +2410,6 @@ def test_api_include_anonymised(pub, local_user):
def test_global_forms_api_user_uuid_filter(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()

View File

@ -177,11 +177,8 @@ def test_formdef_list(pub):
formdata.store()
resp = get_app(pub).get(sign_uri('/api/formdefs/?include-count=on'))
if not pub.is_using_postgresql():
assert resp.json['data'][0]['count'] == 8
else:
# 3*4 + 2*2 + 1*1
assert resp.json['data'][0]['count'] == 17
# 3*4 + 2*2 + 1*1
assert resp.json['data'][0]['count'] == 17
def test_formdef_list_categories_filter(pub):

View File

@ -410,10 +410,6 @@ def test_user_forms(pub, local_user, access):
def test_user_forms_limit_offset(pub, local_user):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test limit offset'
@ -614,10 +610,6 @@ def test_user_forms_api_access_restrict_to_anonymised_data(pub, local_user, acce
def test_user_forms_include_accessible(pub, local_user, access):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()

View File

@ -269,25 +269,22 @@ def test_backoffice_forms(pub):
assert '9 open on 50' in resp.text
# anonymise some formdata, they should no longer be included
if pub.is_using_postgresql():
formdef = FormDef.get_by_urlname('form-title')
for i, formdata in enumerate(
formdef.data_class().select([st.Equal('status', 'wf-finished')], order_by='id')
):
if i >= 20:
break
formdata.anonymise()
formdef = FormDef.get_by_urlname('form-title')
for i, formdata in enumerate(
formdef.data_class().select([st.Equal('status', 'wf-finished')], order_by='id')
):
if i >= 20:
break
formdata.anonymise()
for i, formdata in enumerate(
formdef.data_class().select([st.Equal('status', 'wf-new')], order_by='id')
):
if i >= 5:
break
formdata.anonymise()
for i, formdata in enumerate(formdef.data_class().select([st.Equal('status', 'wf-new')], order_by='id')):
if i >= 5:
break
formdata.anonymise()
resp = app.get('/backoffice/management/forms')
assert 'Forms in your care' in resp.text
assert '4 open on 25' in resp.text
resp = app.get('/backoffice/management/forms')
assert 'Forms in your care' in resp.text
assert '4 open on 25' in resp.text
def test_backoffice_management_css_class(pub):
@ -340,22 +337,15 @@ def test_backoffice_listing(pub):
resp = app.get('/backoffice/management/form-title/')
resp.forms['listing-settings']['filter'] = 'all'
resp = resp.forms['listing-settings'].submit()
if pub.is_using_postgresql():
assert resp.text.count('data-link') == 20
else:
# not using sql -> no pagination
assert resp.text.count('data-link') == 50
assert resp.text.count('data-link') == 20
# check status filter <select>
resp = app.get('/backoffice/management/form-title/')
resp.forms['listing-settings']['filter'] = 'done'
resp = resp.forms['listing-settings'].submit()
if pub.is_using_postgresql():
assert resp.text.count('data-link') == 20
resp = resp.click('Next Page')
assert resp.text.count('data-link') == 13
else:
assert resp.text.count('data-link') == 33
assert resp.text.count('data-link') == 20
resp = resp.click('Next Page')
assert resp.text.count('data-link') == 13
# add an extra status to workflow and move a few formdatas to it, they
# should then be marked as open but not waiting for actions.
@ -424,9 +414,6 @@ def test_backoffice_listing(pub):
def test_backoffice_listing_pagination(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
create_environment(pub)
app = login(get_app(pub))
@ -469,9 +456,6 @@ def test_backoffice_listing_pagination(pub):
def test_backoffice_listing_anonymised(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
create_environment(pub)
app = login(get_app(pub))
@ -488,9 +472,6 @@ def test_backoffice_listing_anonymised(pub):
def test_backoffice_listing_fts(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
create_environment(pub)
formdef = FormDef.get_by_urlname('form-title')
@ -1366,10 +1347,6 @@ def test_backoffice_wscall_failure_display(http_requests, pub):
@pytest.mark.parametrize('notify_on_errors', [True, False])
@pytest.mark.parametrize('record_on_errors', [True, False])
def test_backoffice_wscall_on_error(http_requests, pub, emails, notify_on_errors, record_on_errors):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.cfg['debug'] = {'error_email': 'errors@localhost.invalid'}
pub.cfg['emails'] = {'from': 'from@localhost.invalid'}
pub.write_cfg()
@ -1986,10 +1963,6 @@ def test_backoffice_wfedit_and_live_condition(pub):
def test_global_listing(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
app = login(get_app(pub))
@ -2105,10 +2078,6 @@ def test_global_listing(pub):
def test_global_listing_parameters_from_query_string(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
app = login(get_app(pub))
@ -2131,10 +2100,6 @@ def test_global_listing_parameters_from_query_string(pub):
def test_global_listing_user_label(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
FormDef.wipe()
@ -2170,10 +2135,6 @@ def test_global_listing_user_label(pub):
def test_management_views_with_no_formdefs(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
FormDef.wipe()
@ -2193,10 +2154,6 @@ def test_management_views_with_no_formdefs(pub):
def test_category_in_global_listing(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
Category.wipe()
@ -2283,10 +2240,6 @@ def test_category_in_global_listing(pub):
def test_datetime_in_global_listing(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
@ -2324,10 +2277,6 @@ def test_datetime_in_global_listing(pub):
def test_global_listing_anonymised(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
app = login(get_app(pub))
@ -2346,10 +2295,6 @@ def test_global_listing_anonymised(pub):
def test_global_listing_geojson(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
@ -2381,10 +2326,6 @@ def test_global_listing_geojson(pub):
def test_global_map(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
create_environment(pub)
@ -2447,38 +2388,33 @@ def test_formdata_lookup(pub):
resp = resp.form.submit()
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
if pub.is_using_postgresql():
# check looking up on a custom display_id
formdata.id_display = '999999'
formdata.store()
assert formdata.get_display_id() == '999999'
resp = app.get('/backoffice/management/').follow()
resp.form['query'] = formdata.get_display_id()
resp = resp.form.submit()
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
# check looking up on a custom display_id
formdata.id_display = '999999'
formdata.store()
assert formdata.get_display_id() == '999999'
resp = app.get('/backoffice/management/').follow()
resp.form['query'] = formdata.get_display_id()
resp = resp.form.submit()
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
# try it from the global listing
resp = app.get('/backoffice/management/listing')
assert 'id="lookup-box"' in resp.text
resp.forms[0]['query'] = formdata.tracking_code
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
resp = resp.follow()
assert 'The form has been recorded' in resp.text
# try it from the global listing
resp = app.get('/backoffice/management/listing')
assert 'id="lookup-box"' in resp.text
resp.forms[0]['query'] = formdata.tracking_code
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
resp = resp.follow()
assert 'The form has been recorded' in resp.text
resp = app.get('/backoffice/management/listing')
resp.forms[0]['query'] = 'AAAAAAAA'
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/management/listing'
resp = resp.follow()
assert 'No such tracking code or identifier.' in resp.text
resp = app.get('/backoffice/management/listing')
resp.forms[0]['query'] = 'AAAAAAAA'
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/management/listing'
resp = resp.follow()
assert 'No such tracking code or identifier.' in resp.text
def test_backoffice_sidebar_user_context(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
user = create_user(pub)
create_environment(pub)
form_class = FormDef.get_by_urlname('form-title').data_class()
@ -2576,9 +2512,6 @@ def test_backoffice_sidebar_lateral_block(pub):
def test_count_open(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_user(pub)
FormDef.wipe()
@ -2710,9 +2643,6 @@ def test_backoffice_backoffice_submission_in_listings(pub):
def test_backoffice_backoffice_submission_in_global_listing(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
create_environment(pub)
@ -2758,12 +2688,11 @@ def test_backoffice_advisory_lock(pub):
resp = app.get('/backoffice/management/form-title/')
assert 'advisory-lock' not in resp.text
if pub.is_using_postgresql():
# check global view
resp = app2.get('/backoffice/management/listing?limit=100')
assert 'advisory-lock' in resp.text
resp = app.get('/backoffice/management/listing?limit=100')
assert 'advisory-lock' not in resp.text
# check global view
resp = app2.get('/backoffice/management/listing?limit=100')
assert 'advisory-lock' in resp.text
resp = app.get('/backoffice/management/listing?limit=100')
assert 'advisory-lock' not in resp.text
resp = app.get('/backoffice/management/form-title/' + first_link)
assert 'Be warned forms of this user are also being looked' not in resp.text
@ -2810,9 +2739,6 @@ def test_backoffice_advisory_lock(pub):
def test_backoffice_advisory_lock_related_formdatas(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.session_manager.session_class.wipe()
user = create_superuser(pub)
create_environment(pub)
@ -3622,10 +3548,6 @@ def test_backoffice_fields(pub):
def test_backoffice_logged_errors(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
Workflow.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '12'
@ -4144,8 +4066,7 @@ def test_backoffice_create_formdata_backoffice_submission(pub, create_formdata):
assert target_data_class.count() == 0
# resubmit it through backoffice submission
resp = resp.form.submit(name='button_resubmit')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 0
assert pub.loggederror_class.count() == 0
assert target_data_class.count() == 1
target_formdata = target_data_class.select()[0]
@ -4264,8 +4185,7 @@ def test_backoffice_create_formdata_map_fields_by_varname(pub, create_formdata):
assert target_data_class.count() == 0
# resubmit it through backoffice submission
resp = resp.form.submit(name='button_resubmit')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 0
assert pub.loggederror_class.count() == 0
assert target_data_class.count() == 1
target_formdata = target_data_class.select()[0]

View File

@ -552,10 +552,6 @@ def test_backoffice_cards_import_data_csv_invalid_columns(pub):
def test_backoffice_cards_wscall_failure_display(http_requests, pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
user = create_user(pub)
Workflow.wipe()

View File

@ -309,10 +309,6 @@ def test_backoffice_user_columns(pub):
resp = app.get('/backoffice/management/form-title/')
assert resp.text.count('</th>') == 6 # four columns
if not pub.is_using_postgresql():
# no support for relation columns unless using SQL
assert 'user-label$3' not in resp.forms['listing-settings'].fields
return
resp.forms['listing-settings']['user-label$3'].checked = True
resp = resp.forms['listing-settings'].submit()
assert resp.text.count('</th>') == 7
@ -381,10 +377,6 @@ def test_backoffice_card_field_columns(pub):
app = login(get_app(pub))
resp = app.get('/backoffice/management/form-title/')
assert resp.text.count('</th>') == 6 # four columns
if not pub.is_using_postgresql():
# no support for relation columns unless using SQL
assert '4$1' not in resp.forms['listing-settings'].fields
return
assert '4$0' not in resp.forms['listing-settings'].fields
resp.forms['listing-settings']['4$1'].checked = True
resp.forms['listing-settings']['4$2'].checked = True
@ -415,10 +407,6 @@ def test_backoffice_card_field_columns(pub):
def test_backoffice_block_columns(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -520,10 +508,6 @@ def test_backoffice_block_columns(pub):
def test_backoffice_block_email_column(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -585,10 +569,6 @@ def test_backoffice_block_email_column(pub):
def test_backoffice_block_bool_column(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -649,10 +629,6 @@ def test_backoffice_block_bool_column(pub):
def test_backoffice_block_date_column(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -713,10 +689,6 @@ def test_backoffice_block_date_column(pub):
def test_backoffice_block_file_column(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -780,10 +752,6 @@ def test_backoffice_block_file_column(pub):
def test_backoffice_block_column_position(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()

View File

@ -827,10 +827,6 @@ def test_backoffice_custom_view_columns(pub):
def test_backoffice_custom_view_sort_field(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
FormDef.wipe()

View File

@ -589,10 +589,6 @@ def test_backoffice_csv_export_table(pub):
def test_backoffice_csv_export_ordering(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
create_superuser(pub)
FormDef.wipe()

View File

@ -349,47 +349,32 @@ def test_backoffice_item_filter(pub):
assert resp.text.count('<td>b</td>') == 0
assert resp.text.count('<td>d</td>') > 0
if not pub.is_using_postgresql():
# in pickle all options are always displayed
# in postgresql, option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-4-value'].value = 'c'
resp.forms['listing-settings']['filter-4-operator'].value = 'eq'
# check json view used to fill select filters from javascript
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string)
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=4&_search=d&' + resp.request.query_string
)
assert [x['id'] for x in resp2.json['data']] == ['d']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=7&' + resp.request.query_string, status=404
)
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
resp.forms['listing-settings']['filter'] = status
resp = resp.forms['listing-settings'].submit()
assert resp.text.count('<td>â</td>') == 0
assert resp.text.count('<td>b</td>') == 0
assert resp.text.count('<td>c</td>') == 0
else:
# in postgresql, option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-4-value'].value = 'c'
# check json view used to fill select filters from javascript
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string)
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=4&_search=d&' + resp.request.query_string
)
assert [x['id'] for x in resp2.json['data']] == ['d']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=7&' + resp.request.query_string, status=404
)
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
resp.forms['listing-settings']['filter'] = status
resp = resp.forms['listing-settings'].submit()
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string
)
if status == 'accepted':
assert [x['id'] for x in resp2.json['data']] == []
else:
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
if status == 'accepted':
assert [x['id'] for x in resp2.json['data']] == []
else:
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
def test_backoffice_item_double_filter(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -566,40 +551,28 @@ def test_backoffice_bofield_item_filter(pub):
assert resp.text.count('<td>b</td>') == 0
assert resp.text.count('<td>d</td>') > 0
if not pub.is_using_postgresql():
# in pickle all options are always displayed
# in postgresql, option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-bo0-1-value'].value = 'c'
resp.forms['listing-settings']['filter-bo0-1-operator'].value = 'eq'
# check json view used to fill select filters from javascript
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string)
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=bo0-1&_search=d&' + resp.request.query_string
)
assert [x['id'] for x in resp2.json['data']] == ['d']
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
resp.forms['listing-settings']['filter'] = status
resp = resp.forms['listing-settings'].submit()
assert resp.text.count('<td>â</td>') == 0
assert resp.text.count('<td>b</td>') == 0
assert resp.text.count('<td>c</td>') == 0
else:
# in postgresql, option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-bo0-1-value'].value = 'c'
# check json view used to fill select filters from javascript
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string
)
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=bo0-1&_search=d&' + resp.request.query_string
)
assert [x['id'] for x in resp2.json['data']] == ['d']
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
resp.forms['listing-settings']['filter'] = status
resp = resp.forms['listing-settings'].submit()
resp2 = app.get(
resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string
)
if status == 'accepted':
assert [x['id'] for x in resp2.json['data']] == []
else:
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
if status == 'accepted':
assert [x['id'] for x in resp2.json['data']] == []
else:
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
def test_backoffice_items_filter(pub):
@ -681,18 +654,9 @@ def test_backoffice_items_filter(pub):
assert resp.text.count('<td>â</td>') > 0
assert resp.text.count('<td>b, d</td>') == 0
if pub.is_using_postgresql():
# option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-4-value'].value = 'c'
else:
# option 'c' is never used so not even listed
with pytest.raises(ValueError):
resp.forms['listing-settings']['filter-4-value'].value = 'c'
resp.forms['listing-settings']['filter-4-operator'].value = 'eq'
resp = resp.forms['listing-settings'].submit()
assert resp.text.count('<td>â, b</td>') == 0
assert resp.text.count('<td>â</td>') == 0
assert resp.text.count('<td>b, d</td>') == 0
assert resp.text.count('data-link') == 0 # no rows
def test_backoffice_items_cards_filter(pub):
@ -766,22 +730,13 @@ def test_backoffice_items_cards_filter(pub):
resp.forms['listing-settings']['filter-1'].checked = True
resp = resp.forms['listing-settings'].submit()
if pub.is_using_postgresql():
# option 'bar' is never used so not even listed
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
'',
'card baz',
'card foo',
'card foo, bar',
]
else:
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
'',
'card bar',
'card baz',
'card foo',
'card foo, bar',
]
# option 'bar' is never used so not even listed
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
'',
'card baz',
'card foo',
'card foo, bar',
]
resp.forms['listing-settings']['filter-1-value'].value = card_ids['foo']
resp = resp.forms['listing-settings'].submit()
@ -957,9 +912,6 @@ def test_backoffice_email_filter(pub):
def test_backoffice_date_filter(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()
@ -1363,9 +1315,6 @@ def test_backoffice_table_varname_filter(pub):
def test_backoffice_block_field_filter(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
pub.user_class.wipe()
create_superuser(pub)
pub.role_class.wipe()

View File

@ -349,10 +349,6 @@ def test_inspect_page(pub, local_user):
def test_inspect_page_with_related_objects(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
user = create_user(pub, is_admin=True)
FormDef.wipe()
@ -746,10 +742,6 @@ def test_inspect_page_map_field(pub, local_user):
def test_inspect_page_lazy_list(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
formdef = FormDef()

View File

@ -38,13 +38,12 @@ def test_backoffice_statistics_with_no_formdefs(pub):
create_user(pub)
create_environment(pub)
FormDef.wipe()
if pub.is_using_postgresql():
from wcs.sql import drop_global_views, get_connection_and_cursor
from wcs.sql import drop_global_views, get_connection_and_cursor
conn, cur = get_connection_and_cursor()
drop_global_views(conn, cur)
conn.commit()
cur.close()
conn, cur = get_connection_and_cursor()
drop_global_views(conn, cur)
conn.commit()
cur.close()
app = login(get_app(pub))
resp = app.get('/backoffice/management/statistics')
@ -72,14 +71,11 @@ def test_backoffice_statistics_status_filter(pub):
assert 'Total number of records: 17' in resp.text
assert '<h2>Filters</h2>' in resp.text
assert 'Status: Open' in resp.text
if pub.is_using_postgresql():
resp.forms['listing-settings']['filter'].value = 'waiting'
resp = resp.forms['listing-settings'].submit()
assert 'Total number of records: 17' in resp.text
assert '<h2>Filters</h2>' in resp.text
assert 'Status: Waiting for an action' in resp.text
else:
assert 'waiting' not in [x[0] for x in resp.forms['listing-settings']['filter'].options]
resp.forms['listing-settings']['filter'].value = 'waiting'
resp = resp.forms['listing-settings'].submit()
assert 'Total number of records: 17' in resp.text
assert '<h2>Filters</h2>' in resp.text
assert 'Status: Waiting for an action' in resp.text
resp.forms['listing-settings']['filter'].value = 'done'
resp = resp.forms['listing-settings'].submit()

View File

@ -1521,12 +1521,7 @@ def test_form_submit_with_just_disabled_user(pub, emails):
user.store()
resp = resp.form.submit('submit')
resp = resp.follow()
if pub.is_using_postgresql():
assert 'Sorry, your session have been lost.' in resp
else:
assert 'The form has been recorded' in resp
assert formdef.data_class().count() == 1
assert formdef.data_class().select()[0].user_id is None
assert 'Sorry, your session have been lost.' in resp
def test_form_titles(pub):
@ -2836,9 +2831,8 @@ def test_form_multi_page_post_edit(pub):
assert formdef.data_class().get(data_id).evolution[-1].status == 'wf-%s' % st2.id
# jump to a nonexistent status == do not jump, but add a LoggedError
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
assert pub.loggederror_class.count() == 0
pub.loggederror_class.wipe()
assert pub.loggederror_class.count() == 0
editable.status = 'deleted_status_id'
workflow.store()
# go back to st1
@ -2854,15 +2848,14 @@ def test_form_multi_page_post_edit(pub):
resp = resp.forms[0].submit('submit')
resp = resp.follow()
assert formdef.data_class().get(data_id).status == 'wf-%s' % st1.id # stay on st1
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.formdef_id == formdef.id
assert logged_error.workflow_id == workflow.id
assert logged_error.status_id == st1.id
assert logged_error.status_item_id == editable.id
assert logged_error.occurences_count == 1
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.formdef_id == formdef.id
assert logged_error.workflow_id == workflow.id
assert logged_error.status_id == st1.id
assert logged_error.status_item_id == editable.id
assert logged_error.occurences_count == 1
# do it again: increment logged_error.occurences_count
page = login(get_app(pub), username='foo', password='foo').get('/test/%s/' % data_id)
@ -2873,10 +2866,9 @@ def test_form_multi_page_post_edit(pub):
resp = resp.forms[0].submit('submit')
resp = resp.follow()
assert formdef.data_class().get(data_id).status == 'wf-%s' % st1.id # stay on st1
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.occurences_count == 2
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.occurences_count == 2
def test_form_edit_autocomplete_list(pub):
@ -3765,11 +3757,10 @@ def test_form_page_template_prefill_items_field(pub):
assert not resp.form['f0$elementbar'].checked
assert not resp.form['f0$elementbaz'].checked
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid value for items prefill on field "items"'
pub.loggederror_class.wipe()
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid value for items prefill on field "items"'
pub.loggederror_class.wipe()
def test_form_page_changing_prefill(pub):
@ -5006,13 +4997,8 @@ def test_form_autosave_never_overwrite(mocker, pub, settings):
autosave_data['_ajax_form_token'] = resp.form['_form_id'].value
resp_autosave = app.post('/test/autosave', params=autosave_data)
formdata.refresh_from_storage()
if pub.is_using_postgresql():
assert resp_autosave.json != {'result': 'success'}
assert formdata.data['3'] == '1'
else:
# with pickle invalid data has been saved
assert resp_autosave.json == {'result': 'success'}
assert formdata.data['3'] == 'tmp'
assert resp_autosave.json != {'result': 'success'}
assert formdata.data['3'] == '1'
# validate
resp = resp.form.submit('submit') # -> submit
@ -5546,10 +5532,9 @@ def test_workflow_message_with_template_error(pub):
resp = resp.follow()
assert 'Error rendering message.' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == "Error in template of workflow message ('int' object is not iterable)"
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == "Error in template of workflow message ('int' object is not iterable)"
def test_session_cookie_flags(pub):
@ -6030,10 +6015,7 @@ def test_item_field_from_custom_view_on_cards(pub):
app = login(get_app(pub), username='foo', password='foo')
resp = app.get('/backoffice/data/items/')
if pub.is_using_postgresql():
assert resp.text.count('<tr') == 21 # thead + 20 items (max per page)
else:
assert resp.text.count('<tr') == 31 # thead + all items
assert resp.text.count('<tr') == 21 # thead + 20 items (max per page)
resp.forms['listing-settings']['filter-0'].checked = True
resp = resp.forms['listing-settings'].submit()
@ -6524,11 +6506,10 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
resp2 = app.get(select2_url + '?q=hell')
assert emails.count() == 1
assert emails.get_latest('subject') == '[ERROR] [DATASOURCE] Error loading JSON data source (...)'
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert logged_error.summary == '[DATASOURCE] Error loading JSON data source (...)'
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert logged_error.summary == '[DATASOURCE] Error loading JSON data source (...)'
data_source.notify_on_errors = False
data_source.store()
@ -6891,10 +6872,6 @@ def test_form_data_keywords(pub):
def test_logged_errors(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
Workflow.wipe()
workflow = Workflow.get_default_workflow()
workflow.id = '12'
@ -8896,11 +8873,9 @@ def test_create_formdata_empty_item_ds_with_id_parameter(pub, create_formdata):
resp = resp.form.submit('submit') # -> submission
resp = resp.follow()
assert create_formdata['target_formdef'].data_class().count() == 0
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 0
assert pub.loggederror_class.count() == 0
resp = resp.form.submit('button_resubmit')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 0
assert pub.loggederror_class.count() == 0
def test_create_formdata_locked_prefill_parent(create_formdata):

View File

@ -1808,14 +1808,13 @@ def test_removed_block_in_form_page(pub):
resp = get_app(pub).get(formdef.get_url(), status=500)
assert 'A fatal error happened.' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert '(id:%s)' % logged_error.id in resp.text
resp = get_app(pub).get(formdef.get_url(), status=500)
assert '(id:%s)' % logged_error.id in resp.text
logged_error = pub.loggederror_class.select()[0]
assert logged_error.occurences_count == 2
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert '(id:%s)' % logged_error.id in resp.text
resp = get_app(pub).get(formdef.get_url(), status=500)
assert '(id:%s)' % logged_error.id in resp.text
logged_error = pub.loggederror_class.select()[0]
assert logged_error.occurences_count == 2
def test_block_with_static_condition(pub):

View File

@ -631,8 +631,7 @@ def test_computed_field_with_data_source(pub):
def test_computed_field_with_bad_objects_filter_in_prefill(pub):
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
CardDef.wipe()
FormDef.wipe()
@ -676,22 +675,20 @@ def test_computed_field_with_bad_objects_filter_in_prefill(pub):
formdef.store()
resp = get_app(pub).get('/test/')
assert 'XY' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'wcs.carddef.CardDefDoesNotExist: unknown'
assert logged_error.formdef_id == formdef.id
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert (
logged_error.summary
== 'Invalid value "{{ cards|objects:"unknown"|first|get:"form_number_raw"|default:"" }}" for field "computed"'
)
assert logged_error.formdef_id == formdef.id
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'wcs.carddef.CardDefDoesNotExist: unknown'
assert logged_error.formdef_id == formdef.id
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert (
logged_error.summary
== 'Invalid value "{{ cards|objects:"unknown"|first|get:"form_number_raw"|default:"" }}" for field "computed"'
)
assert logged_error.formdef_id == formdef.id
def test_computed_field_with_bad_value_type_in_prefill(pub):
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
CardDef.wipe()
FormDef.wipe()
@ -740,11 +737,10 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
formdef.store()
resp = get_app(pub).get('/test/')
assert 'XY' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid value "foo" for field "computed"'
assert logged_error.formdef_id == formdef.id
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid value "foo" for field "computed"'
assert logged_error.formdef_id == formdef.id
formdef.fields[0].value_template = (
'{{ cards|objects:"%s"|first|get:"form_var_bool"|default:"" }}' % carddef.url_name
@ -752,11 +748,10 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
formdef.store()
resp = get_app(pub).get('/test/')
assert 'XY' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid value "True" for field "computed"'
assert logged_error.formdef_id == formdef.id
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid value "True" for field "computed"'
assert logged_error.formdef_id == formdef.id
for value_template in [
'{{ cards|objects:"%s"|get:42|get:"form_number_raw" }}' % carddef.url_name,
@ -766,5 +761,4 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
formdef.store()
resp = get_app(pub).get('/test/')
assert 'XY' in resp.text
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
assert pub.loggederror_class.count() == 2

View File

@ -392,10 +392,9 @@ def test_form_file_field_with_wrong_value(pub):
formdef.store()
get_app(pub).get('/test/')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == 'Failed to set value on field "file"'
assert logged_error.exception_class == 'AttributeError'
assert logged_error.exception_message == "'str' object has no attribute 'time'"
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == 'Failed to set value on field "file"'
assert logged_error.exception_class == 'AttributeError'
assert logged_error.exception_message == "'str' object has no attribute 'time'"

View File

@ -828,11 +828,10 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
http_post_request.return_value = None, 400, '{"code": "document-exists"}', None # fail
resp = resp.form.submit('button_export_to')
assert http_post_request.call_count == 1
if pub.is_using_postgresql():
error = pub.loggederror_class.select()[0]
assert error.summary.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo")
assert 'status: 400' in error.summary
assert "payload: {'code': 'document-exists'}" in error.summary
error = pub.loggederror_class.select()[0]
assert error.summary.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo")
assert 'status: 400' in error.summary
assert "payload: {'code': 'document-exists'}" in error.summary
# failed to push to portfolio, but document is here
resp = resp.follow() # $form/$id/create_doc

View File

@ -1320,10 +1320,6 @@ def test_field_live_locked_error_prefilled_field(pub, http_requests):
@pytest.mark.parametrize('field_type', ['item', 'string', 'email'])
def test_dynamic_item_field_from_custom_view_on_cards(pub, field_type):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
pub.custom_view_class.wipe()
@ -1461,23 +1457,17 @@ def test_dynamic_item_field_from_custom_view_on_cards(pub, field_type):
assert formdef.data_class().select()[0].data['1_structured']['item'] == 'baz'
# delete custom view
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
custom_view.remove_self()
resp = get_app(pub).get('/test/')
assert resp.form['f1'].options == []
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
def test_dynamic_items_field_from_custom_view_on_cards(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
pub.custom_view_class.wipe()
@ -1606,23 +1596,17 @@ def test_dynamic_items_field_from_custom_view_on_cards(pub):
assert formdef.data_class().select()[0].data['1_structured']['text'] == 'attr1 - foo,bar'
# delete custom view
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
custom_view.remove_self()
resp = get_app(pub).get('/test/')
assert resp.form['f1'].options == []
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.formdef_id == formdef.id
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
def test_dynamic_internal_id_from_custom_view_on_cards(pub):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.role_class.wipe()
pub.custom_view_class.wipe()

View File

@ -453,14 +453,13 @@ def test_data_source_custom_view_unknown_filter(pub):
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo')] == ['Hello']
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo:view')] == []
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Invalid filter "42"'
assert logged_error.formdef_id == str(carddef.id)
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid filter "foobar"'
assert logged_error.formdef_id == str(carddef.id)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Invalid filter "42"'
assert logged_error.formdef_id == str(carddef.id)
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid filter "foobar"'
assert logged_error.formdef_id == str(carddef.id)
def test_data_source_anonymised_cards(pub):
@ -600,14 +599,13 @@ def test_data_source_custom_view_digest(pub):
carddef.store()
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo')] == ['', '']
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo:view')] == ['', '']
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Digest (default) not defined'
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Digest (custom view "view") not defined'
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Digest (default) not defined'
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Digest (custom view "view") not defined'
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
assert CardDef.get_data_source_items('carddef:foo', get_by_text='') == []
assert CardDef.get_data_source_items('carddef:foo:view', get_by_text='') == []

View File

@ -139,21 +139,19 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
datasource = {'type': 'formula', 'value': 'foobar', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert 'Failed to eval() Python data source' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Failed to eval() Python data source ('foobar')"
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Failed to eval() Python data source ('foobar')"
# expression not iterable
datasource = {'type': 'formula', 'value': '2', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert 'gave a non-iterable result' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Python data source ('2') gave a non-iterable result"
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Python data source ('2') gave a non-iterable result"
datasource = {
'type': 'formula',
@ -161,13 +159,12 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
'record_on_errors': True,
}
assert data_sources.get_items(datasource) == []
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == (
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
)
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == (
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
)
# list of dictionaries but some are missing a text key
datasource = {
@ -176,13 +173,12 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
'record_on_errors': True,
}
assert data_sources.get_items(datasource) == []
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == (
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
)
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == (
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
)
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
@ -191,8 +187,7 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
pub.site_options.write(fd)
# running with disabled python expressions
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
datasource = {
'type': 'formula',
'value': repr(['foo', 'bar']),
@ -203,11 +198,10 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
assert data_sources.get_items(datasource) == []
assert emails.count() == 1 # notified even with notify_on_errors set to False
assert 'Unauthorized Python Usage' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select(order_by='latest_occurence_timestamp')[-1]
assert logged_error.workflow_id is None
assert logged_error.summary == 'Unauthorized Python Usage'
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select(order_by='latest_occurence_timestamp')[-1]
assert logged_error.workflow_id is None
assert logged_error.summary == 'Unauthorized Python Usage'
def test_python_datasource_with_evalutils(pub):
@ -450,14 +444,13 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
assert 'error in HTTP request to http://remote.example.net/404 (status: 404)' in emails.get_latest(
'subject'
)
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
)
datasource = {
'type': 'json',
@ -468,14 +461,13 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
assert data_sources.get_items(datasource) == []
assert emails.count() == 2
assert 'Error reading JSON data source' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
)
datasource = {
'type': 'json',
@ -485,11 +477,10 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
datasource = {
'type': 'json',
@ -499,45 +490,40 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
}
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output (err 1)' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
def test_json_datasource_bad_url_scheme(pub, error_email, emails):
datasource = {'type': 'json', 'value': '', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert emails.count() == 0
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 0
assert pub.loggederror_class.count() == 0
datasource = {'type': 'json', 'value': 'foo://bar', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'invalid scheme in URL' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
)
datasource = {'type': 'json', 'value': '/bla/blo', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'invalid scheme in URL' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary == "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
)
@pytest.mark.parametrize('notify', [True, False])
@ -561,13 +547,12 @@ def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record):
assert message in emails.get_latest('subject')
else:
assert emails.count() == 0
if pub.is_using_postgresql():
if record:
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == message
else:
assert pub.loggederror_class.count() == 0
if record:
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == message
else:
assert pub.loggederror_class.count() == 0
def test_geojson_datasource(pub, requests_pub, http_requests):
@ -896,14 +881,13 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'status: 404' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
)
datasource = {
'type': 'geojson',
@ -914,14 +898,13 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output' in emails.get_latest('subject')
assert 'Expecting value:' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
)
datasource = {
'type': 'geojson',
@ -932,11 +915,10 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'error' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
datasource = {
'type': 'geojson',
@ -946,11 +928,10 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
}
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output (err 1)' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.workflow_id is None
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
def test_geojson_datasource_bad_url_scheme(pub, error_email, emails):
@ -962,27 +943,24 @@ def test_geojson_datasource_bad_url_scheme(pub, error_email, emails):
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'invalid scheme in URL' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
)
datasource = {'type': 'geojson', 'value': '/bla/blo', 'notify_on_errors': True, 'record_on_errors': True}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in emails.get_latest('subject')
assert 'invalid scheme in URL' in emails.get_latest('subject')
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.workflow_id is None
assert (
logged_error.summary == "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
)
def test_item_field_named_python_datasource(requests_pub):

View File

@ -193,8 +193,7 @@ def test_fc_login_page(caplog):
}
)
)
if pub.is_using_postgresql():
assert 'user did not authorize login' in pub.loggederror_class.select(order_by='id')[-1].summary
assert 'user did not authorize login' in pub.loggederror_class.select(order_by='id')[-1].summary
resp = app.get(
'/ident/fc/callback?%s'
% urllib.parse.urlencode(
@ -204,8 +203,7 @@ def test_fc_login_page(caplog):
}
)
)
if pub.is_using_postgresql():
assert 'whatever' in pub.loggederror_class.select(order_by='id')[-1].summary
assert 'whatever' in pub.loggederror_class.select(order_by='id')[-1].summary
# Login existing user
def logme(login_url):

View File

@ -1250,14 +1250,12 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert queryset.count == 4
queryset = lazy_formdata.objects.filter_by('foo_foo').apply_filter_value('X')
assert queryset.count == 0
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
queryset = lazy_formdata.objects.filter_by('unknown').apply_filter_value('X')
assert queryset.count == 0
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid filter "unknown"'
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Invalid filter "unknown"'
queryset = lazy_formdata.objects.filter_by('datefield').apply_filter_value(
datetime.date(2018, 7, 31).timetuple()
@ -1273,33 +1271,29 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert queryset.count == 5
queryset = lazy_formdata.objects.filter_by('datefield').apply_filter_value('not a date')
assert queryset.count == 0
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid value "not a date" for filter "datefield"'
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid value "not a date" for filter "datefield"'
if pub.is_using_postgresql():
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
datetime.date(2018, 7, 31).timetuple()
)
assert queryset.count == 6 # 1 + 5 null
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
datetime.date(2018, 7, 31)
)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
datetime.datetime(2018, 7, 31)
)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('2018-07-31')
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(None)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('still not a date')
assert queryset.count == 0
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.summary == 'Invalid value "still not a date" for filter "datefield"'
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
datetime.date(2018, 7, 31).timetuple()
)
assert queryset.count == 6 # 1 + 5 null
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(datetime.date(2018, 7, 31))
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
datetime.datetime(2018, 7, 31)
)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('2018-07-31')
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(None)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('still not a date')
assert queryset.count == 0
assert pub.loggederror_class.count() == 3
logged_error = pub.loggederror_class.select(order_by='id')[2]
assert logged_error.summary == 'Invalid value "still not a date" for filter "datefield"'
queryset = lazy_formdata.objects.filter_by('boolfield').apply_filter_value(True)
assert queryset.count == 6
@ -1311,9 +1305,8 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert queryset.count == 5
queryset = lazy_formdata.objects.filter_by('boolfield').apply_filter_value(0)
assert queryset.count == 5
if pub.is_using_postgresql():
queryset = lazy_formdata.objects.filter_by('boolfield').apply_exclude_value(0)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('boolfield').apply_exclude_value(0)
assert queryset.count == 6
queryset = lazy_formdata.objects.filter_by('term1').apply_filter_value('3')
assert queryset.count == 7
@ -1321,11 +1314,10 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert queryset.count == 7
queryset = lazy_formdata.objects.filter_by('term1').apply_filter_value('foobar')
assert queryset.count == 0
if pub.is_using_postgresql():
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('3')
assert queryset.count == 4
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('foobar')
assert queryset.count == 11
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('3')
assert queryset.count == 4
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('foobar')
assert queryset.count == 11
queryset = lazy_formdata.objects.filter_by('email').apply_filter_value('bar')
assert queryset.count == 0
@ -1382,11 +1374,10 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert tmpl.render(context) == '7'
tmpl = Template('{{form_objects|filter_by:"foo_foo"|filter_value:"bar"|length}}')
assert tmpl.render(context) == '7'
if pub.is_using_postgresql():
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|length}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|length}}')
assert tmpl.render(context) == '4'
pub.substitutions.feed(formdata)
tmpl = Template('{{form_objects|filter_by:"foo_foo"|filter_value:form_var_foo_foo|count}}')
@ -1397,15 +1388,14 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert tmpl.render(context) == '5'
tmpl = Template('{{form_objects|filter_by:"term1"|filter_value:form_var_term1|count}}')
assert tmpl.render(context) == '7'
if pub.is_using_postgresql():
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:form_var_foo_foo|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"datefield"|exclude_value:form_var_datefield|count}}')
assert tmpl.render(context) == '6'
tmpl = Template('{{form_objects|filter_by:"boolfield"|exclude_value:form_var_boolfield|count}}')
assert tmpl.render(context) == '6'
tmpl = Template('{{form_objects|filter_by:"term1"|exclude_value:form_var_term1|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:form_var_foo_foo|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form_objects|filter_by:"datefield"|exclude_value:form_var_datefield|count}}')
assert tmpl.render(context) == '6'
tmpl = Template('{{form_objects|filter_by:"boolfield"|exclude_value:form_var_boolfield|count}}')
assert tmpl.render(context) == '6'
tmpl = Template('{{form_objects|filter_by:"term1"|exclude_value:form_var_term1|count}}')
assert tmpl.render(context) == '4'
tmpl = Template('{{form.objects|exclude_self|filter_by:"foo_foo"|filter_value:form_var_foo_foo|count}}')
assert tmpl.render(context) == '6'
@ -1494,10 +1484,9 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
assert tmpl.render(context) == '0'
tmpl = Template('{{form_objects|filter_by_internal_id:"%s"|count}}' % 'invalid value')
assert tmpl.render(context) == '0'
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.summary == 'Invalid value "invalid value" for "filter_by_internal_id"'
assert pub.loggederror_class.count() == 4
logged_error = pub.loggederror_class.select(order_by='id')[3]
assert logged_error.summary == 'Invalid value "invalid value" for "filter_by_internal_id"'
# test |filter_by_number
context = pub.substitutions.get_context_variables(mode='lazy')
@ -1675,9 +1664,6 @@ def test_lazy_formdata_queryset_get_from_first(pub, variable_test_data):
def test_lazy_formdata_queryset_order_by(pub, variable_test_data):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
lazy_formdata = variable_test_data
data_class = lazy_formdata._formdef.data_class()
for i in range(6):
@ -1828,14 +1814,13 @@ def test_lazy_global_forms(pub):
custom_view4.store()
tmpl = Template('{{forms|objects:"foobarlazy"|with_custom_view:"unknown-filter"|count}}')
assert tmpl.render(context) == '0'
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Invalid filter "42"'
assert logged_error.formdef_id == str(formdef.id)
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid filter "foobar"'
assert logged_error.formdef_id == str(formdef.id)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[0]
assert logged_error.summary == 'Invalid filter "42"'
assert logged_error.formdef_id == str(formdef.id)
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Invalid filter "foobar"'
assert logged_error.formdef_id == str(formdef.id)
def test_lazy_variables(pub, variable_test_data):
@ -2674,11 +2659,10 @@ def test_form_digest_error(pub):
formdata.store()
assert formdef.data_class().get(formdata.id).digests['default'] == 'ERROR'
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Could not render digest (default)'
assert logged_error.formdata_id == str(formdata.id)
assert pub.loggederror_class.count() == 1
logged_error = pub.loggederror_class.select()[0]
assert logged_error.summary == 'Could not render digest (default)'
assert logged_error.formdata_id == str(formdata.id)
formdef.digest_templates = {
'default': 'plop plop',
@ -2690,11 +2674,10 @@ def test_form_digest_error(pub):
formdata.store()
assert formdef.data_class().get(formdata.id).digests['custom-view:foobar'] == 'ERROR'
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Could not render digest (custom view "foobar")'
assert logged_error.formdata_id == str(formdata.id)
assert pub.loggederror_class.count() == 2
logged_error = pub.loggederror_class.select(order_by='id')[1]
assert logged_error.summary == 'Could not render digest (custom view "foobar")'
assert logged_error.formdata_id == str(formdata.id)
def test_lazy_formdata_decimal_filter(pub):

View File

@ -121,13 +121,12 @@ def test_title_change(pub):
assert FormDef.get(formdef.id).url_name == 'foo'
assert FormDef.get(formdef.id).internal_identifier == 'bar'
if not pub.is_using_postgresql():
# makes sure the internal_name doesn't change if there are submitted forms
formdef.data_class()().store()
formdef.name = 'baz'
formdef.store()
assert FormDef.get(formdef.id).name == 'baz'
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
# makes sure the internal_name doesn't change if there are submitted forms
formdef.data_class()().store()
formdef.name = 'baz'
formdef.store()
assert FormDef.get(formdef.id).name == 'baz'
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
def test_overlong_slug(pub):

View File

@ -224,10 +224,6 @@ def test_inactive_user(pub, user, app):
def test_transient_data_removal(pub, app):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.session_manager.session_class.wipe()
sql.TransientData.wipe()
resp = app.get('/')
@ -251,10 +247,6 @@ def test_transient_data_removal(pub, app):
def test_magictoken_migration(pub, app):
if not pub.is_using_postgresql():
pytest.skip('this requires SQL')
return
pub.session_manager.session_class.wipe()
sql.TransientData.wipe()
resp = app.get('/')

View File

@ -85,7 +85,6 @@ def create_temporary_pub(sql_mode=True, legacy_theme_mode=False, lazy_mode=False
pub.custom_view_class = sql.CustomView
pub.snapshot_class = sql.Snapshot
pub.loggederror_class = sql.LoggedError
pub.is_using_postgresql = lambda: True
else:
pub.user_class = User
pub.role_class = Role
@ -93,7 +92,6 @@ def create_temporary_pub(sql_mode=True, legacy_theme_mode=False, lazy_mode=False
pub.tracking_code_class = TrackingCode
pub.session_class = sessions.BasicSession
pub.custom_view_class = custom_views.CustomView
pub.is_using_postgresql = lambda: False
pub.session_manager_class = sessions.StorageSessionManager
pub.session_manager = pub.session_manager_class(session_class=pub.session_class)

View File

@ -373,10 +373,6 @@ def test_jump_count_condition(pub):
def test_jump_bad_python_condition(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
@ -433,19 +429,17 @@ def test_jump_django_conditions(two_pubs):
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
assert item.check_condition(formdata) is False
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
assert two_pubs.loggederror_class.count() == 0
item.condition = {'type': 'django', 'value': '~ invalid ~'}
assert item.check_condition(formdata) is False
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'TemplateSyntaxError'
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'django'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'TemplateSyntaxError'
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'django'
def test_check_auth(pub):
@ -580,10 +574,6 @@ def test_dispatch_multi(pub):
def test_dispatch_auto(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires sql')
return
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
@ -671,10 +661,6 @@ def test_dispatch_auto(two_pubs):
def test_dispatch_computed(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires sql')
return
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
@ -1275,21 +1261,19 @@ def test_register_comment_attachment(two_pubs):
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
url1 = formdata.evolution[-1].parts[-1].content
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 1
two_pubs.substitutions.feed(formdata)
item.comment = '{{ form_attachments.testfile.url }}'
item.perform(formdata)
url2 = formdata.evolution[-1].parts[-1].content
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 1
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
@ -1316,21 +1300,19 @@ def test_register_comment_attachment(two_pubs):
item.comment = '[attachments.testfile.url]'
item.perform(formdata)
url3 = formdata.evolution[-1].parts[-1].content
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 2
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 2
two_pubs.substitutions.feed(formdata)
item.comment = '[form_attachments.testfile.url]'
item.perform(formdata)
url4 = formdata.evolution[-1].parts[-1].content
assert url3 == url4
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 2
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.kind == 'deprecated_usage'
assert error.occurences_count == 2
def test_register_comment_with_attachment_file(pub):
@ -1808,19 +1790,17 @@ def test_webservice_call(http_requests, two_pubs):
item.perform(formdata)
assert formdata.status == 'wf-sterr'
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
pub.loggederror_class.wipe()
item.action_on_5xx = 'stdeleted' # removed status
formdata.status = 'wf-st1'
formdata.store()
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-st1' # unknown status acts like :stop
if pub.is_using_postgresql():
assert pub.loggederror_class.count() == 1
error = pub.loggederror_class.select()[0]
assert 'reference-to-invalid-status-stdeleted-in-workflow' in error.tech_id
assert error.occurences_count == 1
assert pub.loggederror_class.count() == 1
error = pub.loggederror_class.select()[0]
assert 'reference-to-invalid-status-stdeleted-in-workflow' in error.tech_id
assert error.occurences_count == 1
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
@ -2451,35 +2431,34 @@ def test_timeout(two_pubs):
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
if two_pubs.is_using_postgresql():
formdata = formdef.data_class()()
formdata.just_created()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
with mock.patch('wcs.wf.jump.JumpWorkflowStatusItem.check_condition') as must_jump:
must_jump.return_value = False
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0 # not enough time has passed
# check a lower than minimal delay is not considered
jump.timeout = 5 * 50 # 5 minutes
workflow.store()
rewind(formdata, seconds=10 * 60)
formdata.store()
formdata_id = formdata.id
with mock.patch('wcs.wf.jump.JumpWorkflowStatusItem.check_condition') as must_jump:
must_jump.return_value = False
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0 # not enough time has passed
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0
# check a lower than minimal delay is not considered
jump.timeout = 5 * 50 # 5 minutes
workflow.store()
rewind(formdata, seconds=10 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 0
# but is executed once delay is reached
rewind(formdata, seconds=10 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 1
# but is executed once delay is reached
rewind(formdata, seconds=10 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 1
# check a templated timeout is considered as minimal delay for explicit evaluation
jump.timeout = '{{ "0" }}'
workflow.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 2
# check a templated timeout is considered as minimal delay for explicit evaluation
jump.timeout = '{{ "0" }}'
workflow.store()
_apply_timeouts(two_pubs)
assert must_jump.call_count == 2
# check there's no crash on workflow without jumps
formdef = FormDef()
@ -2530,18 +2509,16 @@ def test_timeout_with_humantime_template(two_pubs):
formdata.store()
formdata_id = formdata.id
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
rewind(formdata, seconds=40 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert formdef.data_class().get(formdata_id).status == 'wf-st1' # no change
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Error in timeout value '30 plop' (computed from '{{ 30 }} plop')"
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Error in timeout value '30 plop' (computed from '{{ 30 }} plop')"
# template timeout value returning nothing
jump.timeout = '{% if 1 %}{% endif %}'
@ -2553,18 +2530,16 @@ def test_timeout_with_humantime_template(two_pubs):
formdata.store()
formdata_id = formdata.id
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
rewind(formdata, seconds=40 * 60)
formdata.store()
_apply_timeouts(two_pubs)
assert formdef.data_class().get(formdata_id).status == 'wf-st1' # no change
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Error in timeout value '' (computed from '{% if 1 %}{% endif %}')"
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Error in timeout value '' (computed from '{% if 1 %}{% endif %}')"
def test_legacy_timeout(pub):
@ -2663,10 +2638,6 @@ def test_timeout_with_mark(two_pubs):
def test_jump_missing_previous_mark(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
Workflow.wipe()
@ -2847,15 +2818,14 @@ def test_sms_with_passerelle(two_pubs):
assert json_payload['to'] == ['1234']
assert json_payload['from'] == 'Passerelle'
if pub.is_using_postgresql():
pub.loggederror_class.wipe()
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = (mock.Mock(headers={}), 400, '{"err": 1}', 'headers')
item.perform(formdata)
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'Could not send SMS'
pub.loggederror_class.wipe()
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = (mock.Mock(headers={}), 400, '{"err": 1}', 'headers')
item.perform(formdata)
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'Could not send SMS'
def test_display_form(two_pubs):
@ -3340,11 +3310,10 @@ def test_geolocate_action_enable_geolocation(two_pubs):
formdef.change_workflow(workflow)
assert formdef.geolocations
if two_pubs.is_using_postgresql():
conn, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
# change to current workflow
workflow = Workflow(name='wf2')
@ -3367,11 +3336,10 @@ def test_geolocate_action_enable_geolocation(two_pubs):
formdef.refresh_from_storage()
assert formdef.geolocations
if two_pubs.is_using_postgresql():
conn, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
def test_geolocate_address(two_pubs):
@ -3450,20 +3418,19 @@ def test_geolocate_address(two_pubs):
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert (
logged_error.summary
== 'error in template for address string [syntax error in ezt template: unclosed block at line 1 and column 24]'
)
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'TemplateError'
assert (
logged_error.exception_message
== 'syntax error in ezt template: unclosed block at line 1 and column 24'
)
two_pubs.loggederror_class.wipe()
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert (
logged_error.summary
== 'error in template for address string [syntax error in ezt template: unclosed block at line 1 and column 24]'
)
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'TemplateError'
assert (
logged_error.exception_message
== 'syntax error in ezt template: unclosed block at line 1 and column 24'
)
two_pubs.loggederror_class.wipe()
# check for None
item.address_string = '=None'
@ -3492,13 +3459,12 @@ def test_geolocate_address(two_pubs):
http_get_page.side_effect = ConnectionError('some error')
item.perform(formdata)
assert formdata.geolocations == {}
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'error calling geocoding service [some error]'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ConnectionError'
assert logged_error.exception_message == 'some error'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'error calling geocoding service [some error]'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ConnectionError'
assert logged_error.exception_message == 'some error'
def test_geolocate_image(pub):
@ -3583,13 +3549,12 @@ def test_geolocate_map(two_pubs):
formdata.geolocations = None
formdata.data = {'2': '48.8337085'}
item.map_variable = '=form_var_map'
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'error geolocating from map variable'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
assert logged_error.exception_message == 'not enough values to unpack (expected 2, got 1)'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'error geolocating from map variable'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
assert logged_error.exception_message == 'not enough values to unpack (expected 2, got 1)'
def test_geolocate_overwrite(pub):
@ -4581,36 +4546,32 @@ def test_set_backoffice_field(http_requests, two_pubs):
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ''
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
assert two_pubs.loggederror_class.count() == 0
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to compute Python expression'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == ' ~ invalid python ~'
assert logged_error.expression_type == 'python'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to compute Python expression'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == ' ~ invalid python ~'
assert logged_error.expression_type == 'python'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to compute template'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == '{% if bad django %}'
assert logged_error.expression_type == 'template'
assert logged_error.exception_class == 'TemplateError'
assert logged_error.exception_message.startswith('syntax error in Django template')
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == 'Failed to compute template'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == '{% if bad django %}'
assert logged_error.expression_type == 'template'
assert logged_error.exception_class == 'TemplateError'
assert logged_error.exception_message.startswith('syntax error in Django template')
def test_set_backoffice_field_map(http_requests, two_pubs):
@ -4652,35 +4613,32 @@ def test_set_backoffice_field_map(http_requests, two_pubs):
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') is None
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
assert two_pubs.loggederror_class.count() == 0
item.fields = [{'field_id': 'bo1', 'value': 'invalid value'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert (
logged_error.summary
== "Failed to set Map field (bo1), error: invalid coordinates 'invalid value' (missing ;)"
)
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'SetValueError'
assert logged_error.exception_message == "invalid coordinates 'invalid value' (missing ;)"
two_pubs.loggederror_class.wipe()
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert (
logged_error.summary
== "Failed to set Map field (bo1), error: invalid coordinates 'invalid value' (missing ;)"
)
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'SetValueError'
assert logged_error.exception_message == "invalid coordinates 'invalid value' (missing ;)"
two_pubs.loggederror_class.wipe()
item.fields = [{'field_id': 'bo1', 'value': 'XXX;YYY'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Failed to set Map field (bo1), error: invalid coordinates 'XXX;YYY'"
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'SetValueError'
assert logged_error.exception_message == "invalid coordinates 'XXX;YYY'"
two_pubs.loggederror_class.wipe()
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary == "Failed to set Map field (bo1), error: invalid coordinates 'XXX;YYY'"
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'SetValueError'
assert logged_error.exception_message == "invalid coordinates 'XXX;YYY'"
two_pubs.loggederror_class.wipe()
def test_set_backoffice_field_decimal(http_requests, two_pubs):
@ -4977,19 +4935,17 @@ def test_set_backoffice_field_file(http_requests, two_pubs):
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary.startswith('Failed to convert')
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary.startswith('Failed to convert')
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
# check wrong field
item = SetBackofficeFieldsWorkflowStatusItem()
@ -5207,8 +5163,7 @@ def test_set_backoffice_field_card_item(two_pubs):
assert formdata.data['bo1_structured']['attr'] == 'attr1'
# reset, with unknown value
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
formdata.data = {}
formdata.store()
item.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
@ -5217,10 +5172,9 @@ def test_set_backoffice_field_card_item(two_pubs):
assert formdata.data.get('bo1') is None # invalid value is not stored
assert formdata.data.get('bo1_display') is None
assert formdata.data.get('bo1_structured') is None
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary.startswith('Failed to convert')
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.summary.startswith('Failed to convert')
# reset, and get empty value
formdata.data = {}
@ -5398,17 +5352,16 @@ def test_set_backoffice_field_items(two_pubs):
assert len(formdata.data['bo1_structured']) == 1
# using an invalid value
if two_pubs.is_using_postgresql():
formdata.data = {}
formdata.store()
two_pubs.loggederror_class.wipe()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=Ellipsis"}]
item.perform(formdata)
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert 'Failed to convert' in logged_error.summary
formdata.data = {}
formdata.store()
two_pubs.loggederror_class.wipe()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=Ellipsis"}]
item.perform(formdata)
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert 'Failed to convert' in logged_error.summary
# using a string with multiple values
item = SetBackofficeFieldsWorkflowStatusItem()
@ -5473,21 +5426,18 @@ def test_set_backoffice_field_date(two_pubs):
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
# invalid values => do nothing
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
assert two_pubs.loggederror_class.count() == 0
for value in ('plop', '={}', '=[]'):
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
assert two_pubs.loggederror_class.select()[0].summary.startswith('Failed to convert')
assert two_pubs.loggederror_class.count() == 1
assert two_pubs.loggederror_class.select()[0].summary.startswith('Failed to convert')
# None : empty date
item = SetBackofficeFieldsWorkflowStatusItem()
@ -5986,20 +5936,18 @@ def test_workflow_action_condition(two_pubs):
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
# bad condition
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'foobar' is not defined"
assert logged_error.expression == 'foobar == barfoo'
assert logged_error.expression_type == 'python'
assert two_pubs.loggederror_class.count() == 1
logged_error = two_pubs.loggederror_class.select()[0]
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'foobar' is not defined"
assert logged_error.expression == 'foobar == barfoo'
assert logged_error.expression_type == 'python'
def test_workflow_field_migration(pub):
@ -6517,10 +6465,6 @@ def test_call_external_workflow_use_caller_variable(pub):
def test_call_external_workflow_manual_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()
@ -6701,10 +6645,6 @@ def test_form_update_after_backoffice_fields(sql_pub):
def test_call_external_workflow_manual_queryset_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()

View File

@ -104,11 +104,10 @@ def test_create_carddata(two_pubs):
assert carddef.data_class().count() == 1
if two_pubs.is_using_postgresql():
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert any('form_var_undefined' in (error.exception_message or '') for error in errors)
assert any('invalid date value' in (error.exception_message or '') for error in errors)
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert any('form_var_undefined' in (error.exception_message or '') for error in errors)
assert any('invalid date value' in (error.exception_message or '') for error in errors)
formdata = formdef.data_class()()
today = datetime.date.today()
@ -384,10 +383,9 @@ def test_create_carddata_with_map_field(two_pubs):
assert carddef.data_class().count() == 1
assert not carddef.data_class().select()[0].data.get('1')
if two_pubs.is_using_postgresql():
errors = two_pubs.loggederror_class.select()
assert len(errors) == 1
assert any('invalid coordinates' in (error.exception_message or '') for error in errors)
errors = two_pubs.loggederror_class.select()
assert len(errors) == 1
assert any('invalid coordinates' in (error.exception_message or '') for error in errors)
# value from formdata
create.mappings[0].expression = '{{ form_var_map }}'
@ -742,10 +740,6 @@ def test_edit_carddata_with_linked_object(pub):
def test_edit_carddata_manual_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()
@ -1142,10 +1136,6 @@ def test_assign_carddata_with_linked_object(pub):
def test_assign_carddata_manual_targeting(two_pubs):
if not two_pubs.is_using_postgresql():
pytest.skip('this requires SQL')
return
FormDef.wipe()
CardDef.wipe()
two_pubs.user_class.wipe()

View File

@ -82,8 +82,7 @@ def test_create_formdata(two_pubs):
formdata.just_created()
assert target_formdef.data_class().count() == 0
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 0
assert two_pubs.loggederror_class.count() == 0
# check unconfigure action do nothing
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
@ -94,28 +93,26 @@ def test_create_formdata(two_pubs):
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
if two_pubs.is_using_postgresql():
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert 'form_var_toto_string' in errors[0].exception_message
assert errors[1].summary == 'Missing field: unknown (1), unknown (2)'
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert 'form_var_toto_string' in errors[0].exception_message
assert errors[1].summary == 'Missing field: unknown (1), unknown (2)'
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
if two_pubs.is_using_postgresql():
# add field labels cache
two_pubs.loggederror_class.wipe()
target_formdef.data_class().wipe()
create.formdef_slug = target_formdef.url_name
create.cached_field_labels = {'0': 'field0', '1': 'field1', '2': 'field2'}
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
# add field labels cache
two_pubs.loggederror_class.wipe()
target_formdef.data_class().wipe()
create.formdef_slug = target_formdef.url_name
create.cached_field_labels = {'0': 'field0', '1': 'field1', '2': 'field2'}
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert errors[1].summary == 'Missing field: field1, field2'
errors = two_pubs.loggederror_class.select()
assert len(errors) == 2
assert errors[1].summary == 'Missing field: field1, field2'
# no tracking code has been created
created_formdata = target_formdef.data_class().select()[0]
@ -274,8 +271,7 @@ def test_create_formdata_attach_to_history(two_pubs):
def test_create_formdata_card_item_mapping(two_pubs):
if two_pubs.is_using_postgresql():
two_pubs.loggederror_class.wipe()
two_pubs.loggederror_class.wipe()
FormDef.wipe()
CardDef.wipe()
@ -367,8 +363,7 @@ def test_create_formdata_card_item_mapping(two_pubs):
target_formdata = target_formdef.data_class().select()[0]
assert target_formdata.data.get('0') is None
assert target_formdata.data.get('0_display') is None
if two_pubs.is_using_postgresql():
assert two_pubs.loggederror_class.count() == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.exception_message == "unknown card value ('XXX')"
assert two_pubs.loggederror_class.count() == 1
assert two_pubs.loggederror_class.count() == 1
error = two_pubs.loggederror_class.select()[0]
assert error.exception_message == "unknown card value ('XXX')"

View File

@ -539,7 +539,7 @@ class NamedDataSourcesDirectory(Directory):
'categories': categories,
'user_data_sources': user_data_sources,
'has_chrono': has_chrono(get_publisher()),
'has_users': get_publisher().is_using_postgresql(),
'has_users': True,
'agenda_data_sources': agenda_data_sources,
'generated_data_sources': generated_data_sources,
},

View File

@ -16,8 +16,6 @@
import difflib
import io
import tarfile
import time
import xml.etree.ElementTree as ET
from collections import defaultdict
@ -604,7 +602,6 @@ class FormDefPage(Directory):
'duplicate',
'export',
'anonymise',
'archive',
'enable',
'workflow',
'role',
@ -932,8 +929,6 @@ class FormDefPage(Directory):
r += htmltext('<li><a rel="popup" href="history/save">%s</a></li>') % _('Save snapshot')
r += htmltext('<li><a href="history/">%s</a></li>') % _('History')
r += htmltext('<li><a href="inspect">%s</a></li>') % _('Inspector')
if not get_publisher().is_using_postgresql() and self.formdef_class == FormDef:
r += htmltext('<li><a href="archive">%s</a></li>') % _('Archive')
r += htmltext('</ul>')
r += htmltext('<ul>')
@ -1276,8 +1271,6 @@ class FormDefPage(Directory):
return redirect('../%s/' % self.formdefui.formdef.id)
def get_check_deletion_message(self):
if not get_publisher().is_using_postgresql():
return None
from wcs import sql
criterias = [
@ -1570,105 +1563,6 @@ class FormDefPage(Directory):
content_type='application/x-wcs-form',
)
def archive(self):
if get_publisher().is_using_postgresql():
raise TraversalError()
if get_request().form.get('download'):
return self.archive_download()
form = Form(enctype='multipart/form-data')
form.add(DateWidget, 'date', title=_('Archive forms handled before'))
form.add(CheckboxWidget, 'not-done', title=_('Include forms that have not been handled'), value=False)
form.add(CheckboxWidget, 'keep', title=_('Do not remove forms'), value=False)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('archive', _('Archive')))
self.html_top(title=_('Archive Forms'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Archive Forms')
r += form.render()
return r.getvalue()
else:
return self.archive_submit(form)
def archive_submit(self, form):
class Archiver:
def __init__(self, formdef):
self.formdef = formdef
def archive(self, job=None):
all_forms = self.formdef.data_class().select()
if form.get_widget('not-done').parse() is False:
not_endpoint_status = self.formdef.workflow.get_not_endpoint_status()
not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status]
all_forms = [x for x in all_forms if x.status not in not_endpoint_status_ids]
if form.get_widget('date').parse():
date = form.get_widget('date').parse()
date = time.strptime(date, misc.date_format())
all_forms = [x for x in all_forms if x.last_update_time < date]
self.fd = io.BytesIO()
with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t:
t.add(self.formdef.get_object_filename(), 'formdef')
for formdata in all_forms:
t.add(
formdata.get_object_filename(),
'%s/%s' % (self.formdef.url_name, str(formdata.id)),
)
if form.get_widget('keep').parse() is False:
for f in all_forms:
f.remove_self()
if job:
job.file_content = self.fd.getvalue()
job.store()
count = self.formdef.data_class().count()
archiver = Archiver(self.formdef)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
_('Archiving forms'),
archiver.archive,
done_action_url=self.formdef.get_admin_url() + 'archive?job=%(job_id)s',
done_action_label=_('Download Archive'),
)
job.store()
return redirect(job.get_processing_url())
else:
archiver.archive()
response = get_response()
response.set_content_type('application/x-wcs-archive')
response.set_header(
'content-disposition', 'attachment; filename=%s-archive.wcs' % self.formdef.url_name
)
return archiver.fd.getvalue()
def archive_download(self):
try:
job = AfterJob.get(get_request().form.get('download'))
except KeyError:
return redirect('.')
if not job.status == 'completed':
raise TraversalError()
response = get_response()
response.set_content_type('application/x-wcs-archive')
response.set_header(
'content-disposition', 'attachment; filename=%s-archive.wcs' % self.formdef.url_name
)
return job.file_content
def enable(self):
self.formdef.disabled = False
self.formdef.store(comment=_('Enable'))

View File

@ -221,8 +221,7 @@ class LoggedErrorsDirectory(Directory):
workflow_id=self.workflow_id,
)
links = ''
if get_publisher().is_using_postgresql():
links = pagination_links(offset, limit, total_count, load_js=False)
links = pagination_links(offset, limit, total_count, load_js=False)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/logged-errors.html'],
context={

View File

@ -300,10 +300,9 @@ class UserFieldsFormDef(FormDef):
self.publisher.cfg['users'] = users_cfg
self.publisher.write_cfg()
self.publisher._cached_user_fields_formdef = None
if self.publisher.is_using_postgresql():
from wcs import sql
from wcs import sql
sql.do_user_table()
sql.do_user_table()
class UsersDirectory(Directory):

View File

@ -422,9 +422,6 @@ class ApiFormsDirectory(Directory):
raise AccessForbiddenError('user not allowed to ignore roles')
def _q_index(self):
if not get_publisher().is_using_postgresql():
raise TraversalError()
self.check_access()
get_request()._user = get_user_from_api_query_string() or get_request().user
@ -491,8 +488,6 @@ class ApiFormsDirectory(Directory):
return json.dumps({'data': output}, cls=misc.JSONEncoder)
def geojson(self):
if not get_publisher().is_using_postgresql():
raise TraversalError()
self.check_access()
get_request()._user = get_user_from_api_query_string() or get_request().user
return ManagementDirectory().geojson()
@ -743,37 +738,33 @@ class ApiFormdefsDirectory(Directory):
if include_count:
# we include the count of submitted forms so it's possible to sort
# them by "popularity"
if get_publisher().is_using_postgresql():
from wcs import sql
from wcs import sql
# 4 * number of submitted forms of last 2 days
# + 2 * number of submitted forms of last 8 days
# + 1 * number of submitted forms of last 30 days
# exclude drafts
criterias = [Equal('formdef_id', formdef.id), StrictNotEqual('status', 'draft')]
d_now = datetime.datetime.now()
count = 4 * sql.get_period_total(
period_start=d_now - datetime.timedelta(days=2),
include_start=True,
criterias=criterias,
)
count += 2 * sql.get_period_total(
period_start=d_now - datetime.timedelta(days=8),
include_start=True,
period_end=d_now - datetime.timedelta(days=2),
include_end=False,
criterias=criterias,
)
count += sql.get_period_total(
period_start=d_now - datetime.timedelta(days=30),
include_start=True,
period_end=d_now - datetime.timedelta(days=8),
include_end=False,
criterias=criterias,
)
else:
# naive count
count = formdef.data_class().count()
# 4 * number of submitted forms of last 2 days
# + 2 * number of submitted forms of last 8 days
# + 1 * number of submitted forms of last 30 days
# exclude drafts
criterias = [Equal('formdef_id', formdef.id), StrictNotEqual('status', 'draft')]
d_now = datetime.datetime.now()
count = 4 * sql.get_period_total(
period_start=d_now - datetime.timedelta(days=2),
include_start=True,
criterias=criterias,
)
count += 2 * sql.get_period_total(
period_start=d_now - datetime.timedelta(days=8),
include_start=True,
period_end=d_now - datetime.timedelta(days=2),
include_end=False,
criterias=criterias,
)
count += sql.get_period_total(
period_start=d_now - datetime.timedelta(days=30),
include_start=True,
period_end=d_now - datetime.timedelta(days=8),
include_end=False,
criterias=criterias,
)
formdict['count'] = count
formdict['functions'] = {}
@ -923,104 +914,79 @@ class ApiUserDirectory(Directory):
if category_slugs:
categories = Category.select([Contains('url_name', category_slugs)])
if get_publisher().is_using_postgresql():
from wcs import sql
from wcs import sql
order_by = 'receipt_time'
if get_request().form.get('sort') == 'desc':
order_by = '-receipt_time'
if get_query_flag('include-accessible'):
user_roles = user.get_roles()
criterias = [
order_by = 'receipt_time'
if get_request().form.get('sort') == 'desc':
order_by = '-receipt_time'
if get_query_flag('include-accessible'):
user_roles = user.get_roles()
criterias = [
Or(
[
Intersects('concerned_roles_array', user_roles),
Equal('user_id', str(user.id)),
]
)
]
else:
criterias = [Equal('user_id', str(user.id))]
if category_slugs:
criterias.append(Contains('category_id', [c.id for c in categories]))
status_criteria = get_request().form.get('status') or 'all'
if status_criteria == 'open':
criterias.append(Equal('is_at_endpoint', False))
elif status_criteria == 'done':
criterias.append(Equal('is_at_endpoint', True))
elif status_criteria == 'all':
pass
else:
return HttpResponseBadRequest('invalid status parameter value')
if include_drafts:
disabled_formdef_ids = [formdef.id for formdef in FormDef.select() if formdef.is_disabled()]
if disabled_formdef_ids:
criterias.append(
Or(
[
Intersects('concerned_roles_array', user_roles),
Equal('user_id', str(user.id)),
StrictNotEqual('status', 'draft'),
NotContains('formdef_id', disabled_formdef_ids),
]
)
]
else:
criterias = [Equal('user_id', str(user.id))]
if category_slugs:
criterias.append(Contains('category_id', [c.id for c in categories]))
status_criteria = get_request().form.get('status') or 'all'
if status_criteria == 'open':
criterias.append(Equal('is_at_endpoint', False))
elif status_criteria == 'done':
criterias.append(Equal('is_at_endpoint', True))
elif status_criteria == 'all':
pass
else:
return HttpResponseBadRequest('invalid status parameter value')
if include_drafts:
disabled_formdef_ids = [formdef.id for formdef in FormDef.select() if formdef.is_disabled()]
if disabled_formdef_ids:
criterias.append(
Or(
[
StrictNotEqual('status', 'draft'),
NotContains('formdef_id', disabled_formdef_ids),
]
)
)
else:
criterias.append(StrictNotEqual('status', 'draft'))
if not include_non_drafts:
criterias.append(Equal('status', 'draft'))
user_forms = sql.AnyFormData.select(
criterias,
limit=misc.get_int_or_400(get_request().form.get('limit')),
offset=misc.get_int_or_400(get_request().form.get('offset')),
order_by=order_by,
)
if get_request().form.get('full') == 'on':
# load full objects
formdefs = {x.formdef_id: x.formdef for x in user_forms}
formdef_user_forms = {}
for formdef_id, formdef in formdefs.items():
formdef_user_forms.update(
{
(formdef_id, x.id): x
for x in formdef.data_class().select(
[Contains('id', [x.id for x in user_forms if x.formdef_id == formdef_id])]
)
}
)
# and put them back in order
sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms]
user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples]
else:
# prefetch evolutions to avoid individual loads when computing
# formdata.get_visible_status().
sql.AnyFormData.load_all_evolutions(user_forms)
)
else:
if get_query_flag('include-accessible'):
return HttpResponseBadRequest('not supported')
formdefs = FormDef.select()
user_forms = []
for formdef in formdefs:
user_forms.extend(formdef.data_class().get_with_indexed_value('user_id', user.id))
if category_slugs:
user_forms = [f for f in user_forms if f.formdef.category_id in [c.id for c in categories]]
criterias.append(StrictNotEqual('status', 'draft'))
status_criteria = get_request().form.get('status') or 'all'
if status_criteria == 'open':
user_forms = [x for x in user_forms if not x.is_at_endpoint_status()]
elif status_criteria == 'done':
user_forms = [x for x in user_forms if x.is_at_endpoint_status()]
elif status_criteria == 'all':
pass
else:
return HttpResponseBadRequest('invalid status parameter value')
if not include_non_drafts:
criterias.append(Equal('status', 'draft'))
typed_none = time.gmtime(-(10**10))
user_forms.sort(key=lambda x: x.receipt_time or typed_none)
if get_request().form.get('sort') == 'desc':
user_forms.reverse()
user_forms = sql.AnyFormData.select(
criterias,
limit=misc.get_int_or_400(get_request().form.get('limit')),
offset=misc.get_int_or_400(get_request().form.get('offset')),
order_by=order_by,
)
if get_request().form.get('full') == 'on':
# load full objects
formdefs = {x.formdef_id: x.formdef for x in user_forms}
formdef_user_forms = {}
for formdef_id, formdef in formdefs.items():
formdef_user_forms.update(
{
(formdef_id, x.id): x
for x in formdef.data_class().select(
[Contains('id', [x.id for x in user_forms if x.formdef_id == formdef_id])]
)
}
)
# and put them back in order
sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms]
user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples]
else:
# prefetch evolutions to avoid individual loads when computing
# formdata.get_visible_status().
sql.AnyFormData.load_all_evolutions(user_forms)
return user_forms
def drafts(self):
@ -1056,16 +1022,10 @@ class ApiUserDirectory(Directory):
raise AccessForbiddenError('user not allowed to query data from others')
# mark forms that are readable by querying user
user_roles = set(query_user.get_roles())
if get_publisher().is_using_postgresql():
# use concerned_roles_array attribute that was saved in the
# table.
for form in forms:
form.readable = bool(set(form.concerned_roles_array).intersection(user_roles))
else:
# recomputed concerned roles.
for form in forms:
concerned_roles_array = [str(x) for x in form.concerned_roles if x]
form.readable = bool(set(concerned_roles_array).intersection(user_roles))
# use concerned_roles_array attribute that was saved in the
# table.
for form in forms:
form.readable = bool(set(form.concerned_roles_array).intersection(user_roles))
# ignore confidential forms
forms = [x for x in forms if x.readable or not x.formdef.skip_from_360_view]
@ -1132,8 +1092,7 @@ class ApiUsersDirectory(Directory):
for field in formdef.fields:
if field.type in ('string', 'text', 'email'):
criteria_fields.append(st.ILike('f%s' % field.id, query))
if get_publisher().is_using_postgresql():
criteria_fields.append(st.FtsMatch(query))
criteria_fields.append(st.FtsMatch(query))
criterias.append(st.Or(criteria_fields))
def as_dict(user):

View File

@ -213,9 +213,6 @@ class CardDefPage(FormDefPage):
if self.formdef.is_used():
return _('Deletion is not possible as it is still used as datasource.')
if not get_publisher().is_using_postgresql():
return None
criterias = [
StrictNotEqual('status', 'draft'),
Null('anonymised'),

View File

@ -182,23 +182,14 @@ class ManagementDirectory(Directory):
forms_without_pending_stuff = []
forms_with_pending_stuff = []
using_postgresql = get_publisher().is_using_postgresql()
if using_postgresql:
from wcs import sql
from wcs import sql
actionable_counts = sql.get_actionable_counts(user_roles)
total_counts = sql.get_total_counts(user_roles)
actionable_counts = sql.get_actionable_counts(user_roles)
total_counts = sql.get_total_counts(user_roles)
def append_form_entry(formdef):
if using_postgresql:
count_forms = total_counts.get(formdef.id) or 0
waiting_forms_count = actionable_counts.get(formdef.id) or 0
else:
formdef_data_class = formdef.data_class()
count_forms = formdef_data_class.count() - len(
formdef_data_class.get_ids_with_indexed_value('status', 'draft')
)
waiting_forms_count = formdef_data_class.get_actionable_count(user_roles)
count_forms = total_counts.get(formdef.id) or 0
waiting_forms_count = actionable_counts.get(formdef.id) or 0
if waiting_forms_count == 0:
forms_without_pending_stuff.append((formdef, waiting_forms_count, count_forms))
else:
@ -211,11 +202,7 @@ class ManagementDirectory(Directory):
def top_action_links(r):
r += htmltext('<span class="actions">')
if (
get_publisher().is_using_postgresql()
and get_publisher().get_site_option('postgresql_views') != 'false'
):
r += htmltext('<a href="listing">%s</a>') % _('Global View')
r += htmltext('<a href="listing">%s</a>') % _('Global View')
for formdef in formdefs:
if formdef.geolocations:
r += htmltext(' <a href="map">%s</a>') % _('Map View')
@ -261,12 +248,12 @@ class ManagementDirectory(Directory):
def lookup(self):
query = get_request().form.get('query', '').strip()
if get_publisher().is_using_postgresql():
from wcs import sql
from wcs import sql
formdatas = sql.AnyFormData.select([Equal('id_display', query)])
if formdatas:
return redirect(formdatas[0].get_url(backoffice=True))
formdatas = sql.AnyFormData.select([Equal('id_display', query)])
if formdatas:
return redirect(formdatas[0].get_url(backoffice=True))
if any(x for x in FormDef.select(lightweight=True) if x.enable_tracking_codes):
try:
tracking_code = get_publisher().tracking_code_class.get(query)
@ -433,27 +420,14 @@ class ManagementDirectory(Directory):
period_start = parsed_values.get('period_start')
period_end = parsed_values.get('period_end')
if get_publisher().is_using_postgresql():
from wcs import sql
from wcs import sql
formdef_totals = sql.get_formdef_totals(period_start, period_end, criterias)
counts = {str(x): y for x, y in formdef_totals}
else:
for formdef in formdefs:
values = formdef.data_class().select(criterias)
counts[formdef.id] = len(values)
do_graphs = False
if (
get_publisher().is_using_postgresql()
and get_publisher().get_site_option('postgresql_views') != 'false'
):
do_graphs = True
formdef_totals = sql.get_formdef_totals(period_start, period_end, criterias)
counts = {str(x): y for x, y in formdef_totals}
r += htmltext('<p>%s %s</p>') % (_('Total count:'), sum(counts.values()))
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
r += htmltext('<div class="splitcontent-left">')
cats = Category.select()
for cat in cats:
category_formdefs = [x for x in formdefs if x.category_id == cat.id]
@ -462,11 +436,10 @@ class ManagementDirectory(Directory):
category_formdefs = [x for x in formdefs if x.category_id is None]
r += self.category_global_stats(_('Misc'), category_formdefs, counts)
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
r += do_graphs_section(period_start, period_end, criterias=[StrictNotEqual('status', 'draft')])
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
r += do_graphs_section(period_start, period_end, criterias=[StrictNotEqual('status', 'draft')])
r += htmltext('</div>')
return r.getvalue()
@ -597,9 +570,6 @@ class ManagementDirectory(Directory):
return r.getvalue()
def listing(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['wcs.listing.js'])
from wcs import sql
@ -731,8 +701,6 @@ class ManagementDirectory(Directory):
return rt.getvalue()
def count(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
if not (FormDef.exists()):
return misc.json_response({'count': 0})
from wcs import sql
@ -754,8 +722,6 @@ class ManagementDirectory(Directory):
return json.dumps(geojson_formdatas(formdatas, fields=fields), cls=misc.JSONEncoder)
def map(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['wcs.listing.js', 'qommon.map.js'])
html_top('management', _('Global Map'))
r = TemplateIO(html=True)
@ -1051,9 +1017,8 @@ class FormPage(Directory):
'user-id',
'user-function',
'submission-agent-id',
'date',
]
if get_publisher().is_using_postgresql():
types.append('date')
return types
def get_filter_sidebar(
@ -1200,8 +1165,6 @@ class FormPage(Directory):
('pending', C_('formdata|Open'), None),
('done', _('Done'), None),
]
if mode == 'stats' and not get_publisher().is_using_postgresql():
filters = [x for x in filters if x[0] != 'waiting']
for status in waitpoint_status:
filters.append((status.id, status.name, status.colour))
for filter_id, filter_label, filter_colour in filters:
@ -1282,73 +1245,47 @@ class FormPage(Directory):
elif filter_field.type in ('item', 'items'):
filter_field.required = False
if get_publisher().is_using_postgresql():
# Get options from existing formdatas.
# This allows for options that don't appear anymore in the
# data source to be listed (for example because the field
# is using a parametrized URL depending on unavailable
# variables, or simply returning different results now).
display_mode = 'select'
if filter_field.type == 'item' and filter_field.get_display_mode() == 'autocomplete':
display_mode = 'select2'
# Get options from existing formdatas.
# This allows for options that don't appear anymore in the
# data source to be listed (for example because the field
# is using a parametrized URL depending on unavailable
# variables, or simply returning different results now).
display_mode = 'select'
if filter_field.type == 'item' and filter_field.get_display_mode() == 'autocomplete':
display_mode = 'select2'
if display_mode == 'select':
options = self.get_item_filter_options(
filter_field,
selected_filter,
selected_filter_operator,
criterias,
)
options = [(x[0], x[1], x[0]) for x in options]
options.insert(0, (None, '', ''))
attrs = {'data-refresh-options': str(filter_field.contextual_id)}
else:
options = [(filter_field_value, filter_field_value or '', filter_field_value or '')]
attrs = {'data-remote-options': str(filter_field.contextual_id)}
get_response().add_javascript(
['jquery.js', '../../i18n.js', 'qommon.forms.js', 'select2.js']
)
get_response().add_css_include('select2.css')
if self.view and self.view.visibility == 'datasource':
options.append(('{}', _('custom value'), '{}'))
if filter_field_value and filter_field_value not in [x[0] for x in options]:
options.append((filter_field_value, filter_field_value, filter_field_value))
attrs['data-allow-template'] = 'true'
widget = SingleSelectWidget(
filter_field_key,
title=filter_field.label,
options=options,
value=filter_field_value,
render_br=False,
attrs=attrs,
if display_mode == 'select':
options = self.get_item_filter_options(
filter_field,
selected_filter,
selected_filter_operator,
criterias,
)
r += render_widget(widget, operators)
options = [(x[0], x[1], x[0]) for x in options]
options.insert(0, (None, '', ''))
attrs = {'data-refresh-options': str(filter_field.contextual_id)}
else:
# In pickle environments, get options from data source
options = filter_field.get_options()
if options:
if len(options[0]) == 2:
options = [(x[0], x[1], x[0]) for x in options]
options.insert(0, (None, '', ''))
widget = SingleSelectWidget(
filter_field_key,
title=filter_field.label,
options=options,
value=filter_field_value,
render_br=False,
)
r += render_widget(widget, operators)
else:
# and fall back on a string widget if there are none.
widget = StringWidget(
filter_field_key,
title=filter_field.label,
value=filter_field_value,
render_br=False,
)
r += render_widget(widget, operators)
options = [(filter_field_value, filter_field_value or '', filter_field_value or '')]
attrs = {'data-remote-options': str(filter_field.contextual_id)}
get_response().add_javascript(
['jquery.js', '../../i18n.js', 'qommon.forms.js', 'select2.js']
)
get_response().add_css_include('select2.css')
if self.view and self.view.visibility == 'datasource':
options.append(('{}', _('custom value'), '{}'))
if filter_field_value and filter_field_value not in [x[0] for x in options]:
options.append((filter_field_value, filter_field_value, filter_field_value))
attrs['data-allow-template'] = 'true'
widget = SingleSelectWidget(
filter_field_key,
title=filter_field.label,
options=options,
value=filter_field_value,
render_br=False,
attrs=attrs,
)
r += render_widget(widget, operators)
elif filter_field.type == 'bool':
options = [(None, '', ''), (True, _('Yes'), 'true'), (False, _('No'), 'false')]
@ -1421,19 +1358,17 @@ class FormPage(Directory):
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if get_publisher().is_using_postgresql():
if order_by is None:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
if order_by is None:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
if get_publisher().is_using_postgresql():
r += htmltext('<h3>%s</h3>') % self.search_label
if get_request().form.get('q'):
q = force_text(get_request().form.get('q'))
r += htmltext('<input class="inline-input" name="q" value="%s">') % force_str(q)
else:
r += htmltext('<input class="inline-input" name="q">')
r += htmltext('<button class="side-button">%s</button>') % _('Search')
r += htmltext('<h3>%s</h3>') % self.search_label
if get_request().form.get('q'):
q = force_text(get_request().form.get('q'))
r += htmltext('<input class="inline-input" name="q" value="%s">') % force_str(q)
else:
r += htmltext('<input class="inline-input" name="q">')
r += htmltext('<button class="side-button">%s</button>') % _('Search')
r += self.get_filter_sidebar(
selected_filter=selected_filter,
@ -1664,18 +1599,14 @@ class FormPage(Directory):
yield FakeField('last_update_time', 'last_update_time', _('Last Modified'))
# user fields
if not get_publisher().is_using_postgresql():
# full name of user, this will load individual user objects to get it
yield FakeField('user-label', 'user-label', _('User Label'))
else:
# user-label field but as a custom field, to get full name of user
# using a sql join clause.
yield UserLabelRelatedField()
for field in get_publisher().user_class.get_fields():
if not hasattr(field, 'get_view_value'):
continue
field.has_relations = True
yield UserRelatedField(field)
# user-label field but as a custom field, to get full name of user
# using a sql join clause.
yield UserLabelRelatedField()
for field in get_publisher().user_class.get_fields():
if not hasattr(field, 'get_view_value'):
continue
field.has_relations = True
yield UserRelatedField(field)
for field in self.formdef.iter_fields(include_block_fields=True):
if getattr(field, 'block_field', None):
@ -1683,8 +1614,6 @@ class FormPage(Directory):
# not yet
continue
yield field
if not get_publisher().is_using_postgresql():
continue
if field.key == 'block':
continue
if getattr(field, 'block_field', None):
@ -2145,14 +2074,9 @@ class FormPage(Directory):
selected_filter_operator = self.get_filter_operator_from_query()
criterias = self.get_criterias_from_query()
if get_publisher().is_using_postgresql():
# only enable pagination in SQL mode, as we do not have sorting in
# the other case.
limit = misc.get_int_or_400(
get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20)
)
else:
limit = misc.get_int_or_400(get_request().form.get('limit', 0))
limit = misc.get_int_or_400(
get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20)
)
offset = misc.get_int_or_400(get_request().form.get('offset', 0))
order_by = misc.get_order_by_or_400(get_request().form.get('order_by'))
if self.view and not order_by:
@ -2251,8 +2175,7 @@ class FormPage(Directory):
def submit_multi(self, action, selected_filter, selected_filter_operator, query, criterias):
item_ids = get_request().form['select[]']
if '_all' in item_ids:
if get_publisher().is_using_postgresql():
criterias.append(Null('anonymised'))
criterias.append(Null('anonymised'))
item_ids = FormDefUI(self.formdef).get_listing_item_ids(
selected_filter,
selected_filter_operator,
@ -2444,8 +2367,7 @@ class FormPage(Directory):
offset=offset,
limit=limit,
)[0]
if get_publisher().is_using_postgresql():
self.formdef.data_class().load_all_evolutions(items)
self.formdef.data_class().load_all_evolutions(items)
digest_key = 'default'
if self.view and isinstance(self.formdef, CardDef):
view_digest_key = 'custom-view:%s' % self.view.get_url_slug()
@ -2455,26 +2377,25 @@ class FormPage(Directory):
output = []
prefetched_users = None
prefetched_roles = None
if get_publisher().is_using_postgresql:
prefetched_users = {
str(x.id): x
for x in get_publisher().user_class.get_ids(
[x.user_id for x in items if x.user_id], ignore_errors=True
)
if x is not None
}
role_ids = set((self.formdef.workflow_roles or {}).values())
for filled in items:
if filled.workflow_roles:
for value in filled.workflow_roles.values():
if not isinstance(value, list):
value = [value]
role_ids |= set(value)
prefetched_roles = {
str(x.id): x
for x in get_publisher().role_class.get_ids(role_ids, ignore_errors=True)
if x is not None
}
prefetched_users = {
str(x.id): x
for x in get_publisher().user_class.get_ids(
[x.user_id for x in items if x.user_id], ignore_errors=True
)
if x is not None
}
role_ids = set((self.formdef.workflow_roles or {}).values())
for filled in items:
if filled.workflow_roles:
for value in filled.workflow_roles.values():
if not isinstance(value, list):
value = [value]
role_ids |= set(value)
prefetched_roles = {
str(x.id): x
for x in get_publisher().role_class.get_ids(role_ids, ignore_errors=True)
if x is not None
}
for filled in items:
data = filled.get_json_export_dict(
include_files=False,
@ -2717,7 +2638,6 @@ class FormPage(Directory):
get_response().filter['sidebar'] = self.get_formdata_sidebar() + self.get_stats_sidebar(
selected_filter
)
do_graphs = get_publisher().is_using_postgresql()
displayed_criterias = None
if selected_filter and selected_filter != 'all':
@ -2749,10 +2669,9 @@ class FormPage(Directory):
criterias = [StrictNotEqual('status', 'draft')] + displayed_criterias
values = self.formdef.data_class().select(criterias)
if get_publisher().is_using_postgresql():
# load all evolutions in a single batch, to avoid as many query as
# there are formdata when computing resolution times statistics.
self.formdef.data_class().load_all_evolutions(values)
# load all evolutions in a single batch, to avoid as many query as
# there are formdata when computing resolution times statistics.
self.formdef.data_class().load_all_evolutions(values)
r += htmltext('<div id="statistics">')
if displayed_criterias:
@ -2764,8 +2683,7 @@ class FormPage(Directory):
if criteria_label:
r += htmltext('<li>%s</li>') % criteria_label
r += htmltext('</ul></div>')
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
r += htmltext('<div class="splitcontent-left">')
no_forms = len(values)
r += htmltext('<div class="bo-block">')
@ -2799,12 +2717,11 @@ class FormPage(Directory):
r += stats_times
r += htmltext('</div>')
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
criterias.append(Equal('formdef_id', int(self.formdef.id)))
r += do_graphs_section(criterias=criterias)
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
criterias.append(Equal('formdef_id', int(self.formdef.id)))
r += do_graphs_section(criterias=criterias)
r += htmltext('</div>')
r += htmltext('</div>') # id="statistics"
@ -3165,11 +3082,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
if formdata.formdef.lateral_template:
r += htmltext('<div data-async-url="%slateral-block"></div>' % formdata.get_url(backoffice=True))
if (
not isinstance(formdata.formdef, CardDef)
and formdata.user_id
and get_publisher().is_using_postgresql()
):
if not isinstance(formdata.formdef, CardDef) and formdata.user_id:
r += htmltext(
'<div data-async-url="%suser-pending-forms"></div>' % formdata.get_url(backoffice=True)
)

View File

@ -202,15 +202,9 @@ class FormFillPage(PublicFormFillPage):
return
data_class = self.formdef.data_class()
if get_publisher().is_using_postgresql():
has = bool(
data_class.count([StrictNotEqual('status', 'draft'), Equal('user_id', formdata.user_id)])
)
else:
has = any(
x for x in data_class.get_with_indexed_value('user_id', formdata.user_id) if not x.is_draft()
)
context['user_has_already_one_such_form'] = has
context['user_has_already_one_such_form'] = bool(
data_class.count([StrictNotEqual('status', 'draft'), Equal('user_id', formdata.user_id)])
)
def get_sidebar(self, data):
r = TemplateIO(html=True)

View File

@ -60,7 +60,7 @@ class CardDef(FormDef):
# carddef
if data_class._formdef is self:
return data_class
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
if (not mode == 'files') or mode == 'sql':
from . import sql
table_name = sql.get_formdef_table_name(self)

View File

@ -51,70 +51,66 @@ class CmdDeleteTenant(Command):
deletion_date = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
os.rename(pub.app_dir, pub.app_dir + '_removed_%s.invalid' % deletion_date)
# do this only if the wcs has a postgresql configuration
if pub.is_using_postgresql():
postgresql_cfg = {}
for k, v in pub.cfg['postgresql'].items():
if v and isinstance(v, str):
postgresql_cfg[k] = v
postgresql_cfg = {}
for k, v in pub.cfg['postgresql'].items():
if v and isinstance(v, str):
postgresql_cfg[k] = v
# if there's a createdb-connection-params, we can do a DROP DATABASE with
# the option --force-drop, rename it if not
createdb_cfg = pub.cfg['postgresql'].get('createdb-connection-params', {})
createdb = True
if not createdb_cfg:
createdb_cfg = postgresql_cfg
createdb = False
if 'database' in createdb_cfg:
createdb_cfg['dbname'] = createdb_cfg.pop('database')
try:
pgconn = psycopg2.connect(**createdb_cfg)
except psycopg2.Error as e:
print(
'failed to connect to postgresql (%s)' % psycopg2.errorcodes.lookup(e.pgcode),
file=sys.stderr,
)
return
# if there's a createdb-connection-params, we can do a DROP DATABASE with
# the option --force-drop, rename it if not
createdb_cfg = pub.cfg['postgresql'].get('createdb-connection-params', {})
createdb = True
if not createdb_cfg:
createdb_cfg = postgresql_cfg
createdb = False
if 'database' in createdb_cfg:
createdb_cfg['dbname'] = createdb_cfg.pop('database')
try:
pgconn = psycopg2.connect(**createdb_cfg)
except psycopg2.Error as e:
print(
'failed to connect to postgresql (%s)' % psycopg2.errorcodes.lookup(e.pgcode),
file=sys.stderr,
)
return
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
dbname = postgresql_cfg.get('dbname') or postgresql_cfg.get('database')
try:
if createdb:
if options.force_drop:
cur.execute('DROP DATABASE %s' % dbname)
else:
cur.execute(
'ALTER DATABASE %s RENAME TO removed_%s_%s' % (dbname, deletion_date, dbname)
)
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
dbname = postgresql_cfg.get('dbname') or postgresql_cfg.get('database')
try:
if createdb:
if options.force_drop:
cur.execute('DROP DATABASE %s' % dbname)
else:
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'"""
)
tables_names = [x[0] for x in cur.fetchall()]
if options.force_drop:
for table_name in tables_names:
cur.execute('DROP TABLE %s CASCADE' % table_name)
else:
schema_name = 'removed_%s_%s' % (deletion_date, dbname)
cur.execute("CREATE SCHEMA %s" % schema_name[:63])
for table_name in tables_names:
cur.execute('ALTER TABLE %s SET SCHEMA %s' % (table_name, schema_name[:63]))
except psycopg2.Error as e:
print(
'failed to alter database %s: (%s)' % (dbname, psycopg2.errorcodes.lookup(e.pgcode)),
file=sys.stderr,
cur.execute('ALTER DATABASE %s RENAME TO removed_%s_%s' % (dbname, deletion_date, dbname))
else:
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'"""
)
return
cur.close()
tables_names = [x[0] for x in cur.fetchall()]
if options.force_drop:
for table_name in tables_names:
cur.execute('DROP TABLE %s CASCADE' % table_name)
else:
schema_name = 'removed_%s_%s' % (deletion_date, dbname)
cur.execute("CREATE SCHEMA %s" % schema_name[:63])
for table_name in tables_names:
cur.execute('ALTER TABLE %s SET SCHEMA %s' % (table_name, schema_name[:63]))
except psycopg2.Error as e:
print(
'failed to alter database %s: (%s)' % (dbname, psycopg2.errorcodes.lookup(e.pgcode)),
file=sys.stderr,
)
return
cur.close()
CmdDeleteTenant.register()

View File

@ -28,18 +28,6 @@ def rebuild_vhost_indexes(pub, destroy=False):
if destroy:
Role.destroy_indexes()
Role.rebuild_indexes()
if pub.is_using_postgresql():
return
from wcs.users import User
if destroy:
User.destroy_indexes()
User.rebuild_indexes()
for formdef in FormDef.select():
formdata = formdef.data_class()
if destroy:
formdata.destroy_indexes()
formdata.rebuild_indexes()
class CmdRebuildIndexes(Command):

View File

@ -424,16 +424,13 @@ class FormData(StorableObject):
@classmethod
def get_actionable_count(cls, user_roles):
if get_publisher().is_using_postgresql():
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
criterias = [
Intersects('actions_roles_array', user_roles),
Contains('status', statuses),
Null('anonymised'),
]
return cls.count(criterias)
else:
return len(cls.get_actionable_ids(user_roles))
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
criterias = [
Intersects('actions_roles_array', user_roles),
Contains('status', statuses),
Null('anonymised'),
]
return cls.count(criterias)
@classmethod
def get_actionable_ids_criteria(cls, user_roles):
@ -442,17 +439,7 @@ class FormData(StorableObject):
@classmethod
def get_actionable_ids(cls, user_roles):
if get_publisher().is_using_postgresql():
return cls.keys([cls.get_actionable_ids_criteria(user_roles)])
else:
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
actions_ids = set()
for role in user_roles:
actions_ids |= set(cls.get_ids_with_indexed_value('actions_roles', str(role)))
open_ids = []
for status_id in statuses:
open_ids.extend(cls.get_ids_with_indexed_value('status', status_id))
return list(actions_ids.intersection(open_ids))
return cls.keys([cls.get_actionable_ids_criteria(user_roles)])
@classmethod
def get_submission_channels(cls):

View File

@ -44,7 +44,7 @@ from .qommon.cron import CronJob
from .qommon.form import Form, HtmlWidget, UploadedFile
from .qommon.misc import JSONEncoder, get_as_datetime, is_attachment, is_upload, simplify, xml_node_text
from .qommon.publisher import get_publisher_class
from .qommon.storage import Equal, StorableObject, StrictNotEqual, fix_key
from .qommon.storage import Equal, StorableObject, fix_key
from .qommon.substitution import Substitutions
from .qommon.template import Template
from .roles import logged_users_role
@ -329,7 +329,7 @@ class FormDef(StorableObject):
@classmethod
def remove_object(cls, id):
super().remove_object(id)
if get_publisher().is_using_postgresql() and cls == FormDef:
if cls == FormDef:
# recreate global views so they don't reference formdata from
# deleted formefs
from . import sql
@ -352,7 +352,7 @@ class FormDef(StorableObject):
# formdef
if data_class._formdef is self:
return data_class
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
if (not mode == 'files') or mode == 'sql':
from . import sql
table_name = sql.get_formdef_table_name(self)
@ -411,8 +411,7 @@ class FormDef(StorableObject):
@classmethod
def get_new_id(cls, create=False):
id = super().get_new_id(create=False)
if get_publisher().is_using_postgresql():
id = cls.get_sql_new_id(id_start=int(id))
id = cls.get_sql_new_id(id_start=int(id))
if create:
objects_dir = cls.get_objects_dir()
object_filename = os.path.join(objects_dir, fix_key(id))
@ -443,8 +442,7 @@ class FormDef(StorableObject):
@classmethod
def wipe(cls):
super().wipe()
if get_publisher().is_using_postgresql():
cls.sql_wipe()
cls.sql_wipe()
@classmethod
def sql_wipe(cls):
@ -457,6 +455,9 @@ class FormDef(StorableObject):
if self.url_name is None:
# set url name if it's not yet there
self.url_name = self.get_new_url_name()
object_only = kwargs.pop('object_only', False)
new_internal_identifier = self.get_new_internal_identifier()
if not self.internal_identifier:
self.internal_identifier = new_internal_identifier
@ -466,9 +467,8 @@ class FormDef(StorableObject):
# or if there are not yet any submitted forms (or if site
# is using the SQL storage as internal identifier is not used
# in that mode.
if self.id is None or get_publisher().is_using_postgresql() or not (self.data_class().exists()):
if self.id is None or not self.data_class().exists():
self.internal_identifier = new_internal_identifier
object_only = kwargs.pop('object_only', False)
if not object_only:
self.update_relations()
@ -487,9 +487,6 @@ class FormDef(StorableObject):
self.store_related_custom_views()
def update_storage(self):
if not get_publisher().is_using_postgresql():
return
from . import sql
actions = sql.do_formdef_tables(self, rebuild_views=True, rebuild_global_views=True)
@ -1919,7 +1916,6 @@ class FormDef(StorableObject):
status.id in status_mapping for status in old_workflow.possible_status
), 'a status was not mapped'
unmapped_status_suffix = '-invalid-%s' % str(self.workflow_id or 'default')
mapping = {}
for old_status, new_status in status_mapping.items():
mapping['wf-%s' % old_status] = 'wf-%s' % new_status
@ -1927,28 +1923,9 @@ class FormDef(StorableObject):
if any(x[0] != x[1] for x in mapping.items()):
# if there are status changes, update all formdatas (except drafts)
if get_publisher().is_using_postgresql():
from . import sql
from . import sql
sql.formdef_remap_statuses(self, mapping)
else:
def map_status(status):
if status is None:
return None
elif status in mapping:
return mapping[status]
elif '-invalid-' in status:
return status
else:
return '%s%s' % (status, unmapped_status_suffix)
for formdata in self.data_class().select([StrictNotEqual('status', 'draft')]):
formdata.status = map_status(formdata.status)
if formdata.evolution:
for evo in formdata.evolution:
evo.status = map_status(evo.status)
formdata.store()
sql.formdef_remap_statuses(self, mapping)
self.workflow = new_workflow
if new_workflow.has_action('geolocate') and not self.geolocations:

View File

@ -46,15 +46,12 @@ class FormDefUI:
):
# noqa pylint: disable=too-many-arguments
using_postgresql = get_publisher().is_using_postgresql()
if not items:
if offset and not limit:
limit = int(get_publisher().get_site_option('default-page-size') or 20)
if not criterias:
criterias = []
if using_postgresql:
criterias.append(Null('anonymised'))
criterias.append(Null('anonymised'))
items, total_count = self.get_listing_items(
fields,
selected_filter,
@ -101,7 +98,7 @@ class FormDefUI:
r += htmltext('</colgroup>')
r += htmltext('<thead><tr>')
if self.formdef.workflow.criticality_levels and using_postgresql:
if self.formdef.workflow.criticality_levels:
r += htmltext(
'<th class="criticality-level-cell" data-field-sort-key="criticality_level"><span></span></th>'
)
@ -127,7 +124,7 @@ class FormDefUI:
else:
field_sort_key = 'f%s' % f.contextual_id
if field_sort_key and using_postgresql:
if field_sort_key:
r += htmltext('<th data-field-sort-key="%s">') % field_sort_key
else:
r += htmltext('<th>')
@ -144,9 +141,8 @@ class FormDefUI:
r += htmltext('</tbody>')
r += htmltext('</table>')
if get_publisher().is_using_postgresql():
# add links to paginate
r += pagination_links(offset, limit, total_count)
# add links to paginate
r += pagination_links(offset, limit, total_count)
return r.getvalue()
@ -159,96 +155,6 @@ class FormDefUI:
user=None,
criterias=None,
anonymise=False,
):
if get_publisher().is_using_postgresql():
return self.get_listing_item_ids_sql(
selected_filter, selected_filter_operator, query, order_by, user, criterias, anonymise
)
def get_all_except_drafts():
item_ids = formdata_class.keys()
drafts = formdata_class.get_ids_with_indexed_value('status', 'draft')
return [x for x in item_ids if x not in drafts]
formdata_class = self.formdef.data_class()
# used for postgresql mode only
if selected_filter == 'all':
if selected_filter_operator == 'ne':
# nothing
item_ids = []
else:
# all except drafts
item_ids = get_all_except_drafts()
elif selected_filter == 'waiting':
user_roles = [logged_users_role().id] + user.get_roles()
waiting_item_ids = formdata_class.get_actionable_ids(user_roles)
if selected_filter_operator == 'ne':
# select all ids except drafts
item_ids = get_all_except_drafts()
# exclude waiting ids
item_ids = [x for x in item_ids if x not in waiting_item_ids]
else:
# only waiting ids
item_ids = waiting_item_ids
else:
# build selected status list
applied_filters = []
if selected_filter == 'pending':
applied_filters = ['wf-%s' % x.id for x in self.formdef.workflow.get_not_endpoint_status()]
elif selected_filter == 'done':
applied_filters = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
else:
applied_filters = ['wf-%s' % selected_filter]
filtered_item_ids = []
for status_id in applied_filters:
filtered_item_ids.extend(
formdata_class.get_ids_with_indexed_value(
'status',
status_id,
)
)
if selected_filter_operator == 'ne':
# select all ids except drafts
item_ids = get_all_except_drafts()
# exclude selected status list
item_ids = [x for x in item_ids if x not in filtered_item_ids]
else:
# only selected status list
item_ids = filtered_item_ids
if query:
query_ids = formdata_class.get_ids_from_query(query)
item_ids = list(set(item_ids).intersection(query_ids))
if criterias:
select_ids = formdata_class.keys(clause=criterias)
item_ids = list(set(item_ids).intersection(select_ids))
if item_ids and not anonymise:
# as we are in the backoffice, we don't have to care about the
# situation where the user is the submitter, and we limit ourselves
# to consider treating roles.
if not user.is_admin:
user_roles = set(user.get_roles())
concerned_ids = set()
for role in user_roles:
concerned_ids |= set(
formdata_class.get_ids_with_indexed_value('concerned_roles', str(role))
)
item_ids = list(set(item_ids).intersection(concerned_ids))
return item_ids
def get_listing_item_ids_sql(
self,
selected_filter,
selected_filter_operator,
query,
order_by,
user,
criterias,
anonymise,
):
formdata_class = self.formdef.data_class()
criterias = [] or criterias[:]
@ -354,9 +260,6 @@ class FormDefUI:
criterias=criterias,
anonymise=anonymise,
)
if not get_publisher().is_using_postgresql():
item_ids.sort(key=int)
item_ids.reverse()
total_count = len(item_ids)
@ -364,8 +267,7 @@ class FormDefUI:
offset = 0
kwargs = {}
if get_publisher().is_using_postgresql():
kwargs['fields'] = fields
kwargs['fields'] = fields
if limit:
items = formdata_class.get_ids(item_ids[offset : offset + limit], keep_order=True, **kwargs)

View File

@ -132,9 +132,6 @@ class WcsPublisher(QommonPublisher):
DeprecationsScanAfterJob().execute()
def is_using_postgresql(self):
return True
def has_postgresql_config(self):
return bool(self.cfg.get('postgresql', {}))
@ -467,10 +464,9 @@ class WcsPublisher(QommonPublisher):
def cleanup(self):
self._cached_user_fields_formdef = None
if self.is_using_postgresql():
from . import sql
from . import sql
sql.cleanup_connection()
sql.cleanup_connection()
@contextmanager
def complex_data(self):

View File

@ -84,8 +84,7 @@ def cron_worker(publisher, now, job_name=None, delayed_jobs=None):
jobs = delayed_jobs
else:
# reindex user and formdata if needed (should only be run once)
if publisher.is_using_postgresql():
publisher.reindex_sql()
publisher.reindex_sql()
jobs = []

View File

@ -19,7 +19,6 @@ import random
import string
from django.utils.timezone import make_aware, now
from quixote import get_publisher
from .storage import Equal, Less, StorableObject
@ -75,16 +74,5 @@ class Token(StorableObject):
@classmethod
def clean(cls):
if get_publisher().is_using_postgresql():
# noqa pylint: disable=unexpected-keyword-arg
cls.wipe(clause=[Less('expiration', now())])
else:
for token_id in cls.keys():
try:
cls.get(token_id) # will run migrate, will check expiration
except KeyError:
pass
except AttributeError:
# old python2 tokens:
# AttributeError: module 'builtins' has no attribute 'unicode'
cls.remove_object(token_id)
# noqa pylint: disable=unexpected-keyword-arg
cls.wipe(clause=[Less('expiration', now())])

View File

@ -1581,9 +1581,6 @@ def do_meta_table(conn=None, cur=None, insert_current_sql_level=True):
@guard_postgres
def redo_views(conn, cur, formdef, rebuild_global_views=False):
if get_publisher().get_site_option('postgresql_views') == 'false':
return
if formdef.id is None:
return
@ -1961,9 +1958,13 @@ class SqlMixin:
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
sql_statement += ' LIMIT 1'
conn, cur = get_connection_and_cursor()
cur.execute(sql_statement, parameters)
check = cur.fetchone()
result = check is not None
try:
cur.execute(sql_statement, parameters)
except psycopg2.errors.UndefinedTable:
result = False
else:
check = cur.fetchone()
result = check is not None
conn.commit()
cur.close()
return result

View File

@ -19,7 +19,6 @@ import collections
from django.http import HttpResponseBadRequest, HttpResponseForbidden, JsonResponse
from django.urls import reverse
from django.views.generic import View
from quixote import get_publisher
from wcs import sql
from wcs.api_utils import is_url_signed
@ -42,9 +41,6 @@ class RestrictedView(View):
class IndexView(RestrictedView):
def get(self, request, *args, **kwargs):
if not get_publisher().is_using_postgresql():
return JsonResponse({'data': [], 'err': 0})
categories = Category.select()
categories.sort(key=lambda x: misc.simplify(x.name))
category_options = [{'id': '_all', 'label': C_('categories|All')}] + [
@ -306,8 +302,6 @@ class FormsCountView(RestrictedView):
for status in waitpoint_status:
options.append((status.id, status.name))
elif field.type in ('item', 'items'):
if not get_publisher().is_using_postgresql():
continue
options = form_page.get_item_filter_options(field, selected_filter='all', anonymised=True)
if not options:
continue

View File

@ -188,9 +188,6 @@ class User(StorableObject):
@classmethod
def get_users_with_roles(cls, included_roles=None, excluded_roles=None, order_by=None):
if not get_publisher().is_using_postgresql():
return []
from wcs import sql
criterias = [sql.Null('deleted_timestamp')]

View File

@ -69,10 +69,7 @@ class LazyFormDefObjectsManager:
)
def order_by(self, attribute):
if get_publisher().is_using_postgresql():
field = self.get_field(attribute)
else:
field = None # no support for field search
field = self.get_field(attribute)
return self._clone(self._criterias, order_by=field or attribute)
def limit(self, limit):

View File

@ -340,28 +340,25 @@ def _apply_timeouts(publisher, **kwargs):
) if job else contextlib.ExitStack():
formdata_class = formdef.data_class()
for status_id in status_ids:
if publisher.is_using_postgresql():
# get minimum delay for jumps in this status
delay = math.inf
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
if Template.is_template_string(jump_action.timeout):
delay = 0
break
delay = min(delay, int(jump_action.timeout))
# limit delay to minimal delay
if delay < JUMP_TIMEOUT_INTERVAL * 60:
delay = JUMP_TIMEOUT_INTERVAL * 60
# get minimum delay for jumps in this status
delay = math.inf
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
if Template.is_template_string(jump_action.timeout):
delay = 0
break
delay = min(delay, int(jump_action.timeout))
# limit delay to minimal delay
if delay < JUMP_TIMEOUT_INTERVAL * 60:
delay = JUMP_TIMEOUT_INTERVAL * 60
criterias = [
Equal('status', status_id),
LessOrEqual(
'last_update_time',
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
),
]
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
else:
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
criterias = [
Equal('status', status_id),
LessOrEqual(
'last_update_time',
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
),
]
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
for formdata in formdatas:
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]: