general: remove is_using_postgresql conditionals (#67190)
This commit is contained in:
parent
b184acabd5
commit
3484ae5341
|
@ -788,30 +788,27 @@ def test_api_list_formdata(pub, local_user):
|
|||
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&offset=plop', user=local_user), status=400)
|
||||
get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
# just check ordering
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=f0', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
|
||||
# just check ordering
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=f0', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-f0', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-f0', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=foobar', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=foobar', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(0, 30)]
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-foobar', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&order_by=-foobar', user=local_user))
|
||||
assert [d['fields']['foobar'] for d in resp.json] == ['FOO BAR %02d' % i for i in range(29, -1, -1)]
|
||||
|
||||
# check fts
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=foo', user=local_user))
|
||||
assert len(resp.json) == 30
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=baz', user=local_user))
|
||||
assert len(resp.json) == 14
|
||||
# check fts
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=foo', user=local_user))
|
||||
assert len(resp.json) == 30
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on&q=baz', user=local_user))
|
||||
assert len(resp.json) == 14
|
||||
|
||||
|
||||
def test_api_list_formdata_order_by_rank(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
@ -938,8 +935,8 @@ def test_api_list_formdata_string_filter(pub, local_user):
|
|||
params = [
|
||||
('eq', 'FOO 2', 1),
|
||||
('ne', 'FOO 2', 3),
|
||||
('lt', 'FOO 2', 2 if pub.is_using_postgresql() else 3),
|
||||
('lte', 'FOO 2', 3 if pub.is_using_postgresql() else 4),
|
||||
('lt', 'FOO 2', 2),
|
||||
('lte', 'FOO 2', 3),
|
||||
('gt', 'FOO 2', 0),
|
||||
('gt', '42', 0),
|
||||
('gte', 'FOO 2', 1),
|
||||
|
@ -957,8 +954,8 @@ def test_api_list_formdata_string_filter(pub, local_user):
|
|||
('eq', '10', 1),
|
||||
('eq', '010', 1),
|
||||
('ne', '10', 3),
|
||||
('lt', '10', 1 if pub.is_using_postgresql() else 2),
|
||||
('lte', '10', 2 if pub.is_using_postgresql() else 3),
|
||||
('lt', '10', 1),
|
||||
('lte', '10', 2),
|
||||
('gt', '10', 1),
|
||||
('gt', '9', 2),
|
||||
('gte', '10', 2),
|
||||
|
@ -1021,8 +1018,8 @@ def test_api_list_formdata_item_filter(pub, local_user):
|
|||
('eq', '10', 1),
|
||||
('eq', '010', 1),
|
||||
('ne', '10', 3),
|
||||
('lt', '10', 1 if pub.is_using_postgresql() else 2),
|
||||
('lte', '10', 2 if pub.is_using_postgresql() else 3),
|
||||
('lt', '10', 1),
|
||||
('lte', '10', 2),
|
||||
('gt', '10', 1),
|
||||
('gt', '9', 2),
|
||||
('gte', '10', 2),
|
||||
|
@ -1041,8 +1038,8 @@ def test_api_list_formdata_item_filter(pub, local_user):
|
|||
params = [
|
||||
('eq', 'foo', 1),
|
||||
('ne', 'foo', 3),
|
||||
('lt', 'foo', 2 if pub.is_using_postgresql() else 3),
|
||||
('lte', 'foo', 3 if pub.is_using_postgresql() else 4),
|
||||
('lt', 'foo', 2),
|
||||
('lte', 'foo', 3),
|
||||
('gt', 'foo', 0),
|
||||
('gt', '42', 0),
|
||||
('gte', 'foo', 1),
|
||||
|
@ -1182,7 +1179,7 @@ def test_api_list_formdata_bool_filter(pub, local_user):
|
|||
formdata.store()
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user))
|
||||
assert len(resp.json) == 2 if pub.is_using_postgresql() else 3
|
||||
assert len(resp.json) == 2
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user))
|
||||
assert len(resp.json) == 1
|
||||
params = [
|
||||
|
@ -1209,9 +1206,6 @@ def test_api_list_formdata_bool_filter(pub, local_user):
|
|||
|
||||
|
||||
def test_api_list_formdata_date_filter(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
@ -1412,9 +1406,6 @@ def test_api_list_formdata_number_filter(pub, local_user):
|
|||
|
||||
|
||||
def test_api_list_formdata_block_field_filter(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
|
||||
NamedDataSource.wipe()
|
||||
data_source = NamedDataSource(name='foobar')
|
||||
data_source.data_source = {
|
||||
|
@ -2142,11 +2133,6 @@ def test_api_global_geojson(pub, local_user):
|
|||
formdata.jump_status('finished')
|
||||
formdata.store()
|
||||
|
||||
if not pub.is_using_postgresql():
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
# check empty content if user doesn't have the appropriate role
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user))
|
||||
assert 'features' in resp.json
|
||||
|
@ -2170,11 +2156,6 @@ def test_api_global_geojson(pub, local_user):
|
|||
@pytest.mark.parametrize('user', ['query-email', 'api-access'])
|
||||
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
|
||||
def test_api_global_listing(pub, local_user, user, auth):
|
||||
if not pub.is_using_postgresql():
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
@ -2279,10 +2260,6 @@ def test_api_global_listing(pub, local_user, user, auth):
|
|||
|
||||
|
||||
def test_api_global_listing_categories_filter(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
Category.wipe()
|
||||
category1 = Category()
|
||||
category1.name = 'Category 1'
|
||||
|
@ -2391,11 +2368,6 @@ def test_api_global_listing_ignored_roles(pub, local_user):
|
|||
|
||||
|
||||
def test_api_include_anonymised(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404)
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
@ -2438,10 +2410,6 @@ def test_api_include_anonymised(pub, local_user):
|
|||
|
||||
|
||||
def test_global_forms_api_user_uuid_filter(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='test')
|
||||
role.store()
|
||||
|
|
|
@ -177,11 +177,8 @@ def test_formdef_list(pub):
|
|||
formdata.store()
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/formdefs/?include-count=on'))
|
||||
if not pub.is_using_postgresql():
|
||||
assert resp.json['data'][0]['count'] == 8
|
||||
else:
|
||||
# 3*4 + 2*2 + 1*1
|
||||
assert resp.json['data'][0]['count'] == 17
|
||||
# 3*4 + 2*2 + 1*1
|
||||
assert resp.json['data'][0]['count'] == 17
|
||||
|
||||
|
||||
def test_formdef_list_categories_filter(pub):
|
||||
|
|
|
@ -410,10 +410,6 @@ def test_user_forms(pub, local_user, access):
|
|||
|
||||
|
||||
def test_user_forms_limit_offset(pub, local_user):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test limit offset'
|
||||
|
@ -614,10 +610,6 @@ def test_user_forms_api_access_restrict_to_anonymised_data(pub, local_user, acce
|
|||
|
||||
|
||||
def test_user_forms_include_accessible(pub, local_user, access):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class(name='Foo bar')
|
||||
role.store()
|
||||
|
|
|
@ -269,25 +269,22 @@ def test_backoffice_forms(pub):
|
|||
assert '9 open on 50' in resp.text
|
||||
|
||||
# anonymise some formdata, they should no longer be included
|
||||
if pub.is_using_postgresql():
|
||||
formdef = FormDef.get_by_urlname('form-title')
|
||||
for i, formdata in enumerate(
|
||||
formdef.data_class().select([st.Equal('status', 'wf-finished')], order_by='id')
|
||||
):
|
||||
if i >= 20:
|
||||
break
|
||||
formdata.anonymise()
|
||||
formdef = FormDef.get_by_urlname('form-title')
|
||||
for i, formdata in enumerate(
|
||||
formdef.data_class().select([st.Equal('status', 'wf-finished')], order_by='id')
|
||||
):
|
||||
if i >= 20:
|
||||
break
|
||||
formdata.anonymise()
|
||||
|
||||
for i, formdata in enumerate(
|
||||
formdef.data_class().select([st.Equal('status', 'wf-new')], order_by='id')
|
||||
):
|
||||
if i >= 5:
|
||||
break
|
||||
formdata.anonymise()
|
||||
for i, formdata in enumerate(formdef.data_class().select([st.Equal('status', 'wf-new')], order_by='id')):
|
||||
if i >= 5:
|
||||
break
|
||||
formdata.anonymise()
|
||||
|
||||
resp = app.get('/backoffice/management/forms')
|
||||
assert 'Forms in your care' in resp.text
|
||||
assert '4 open on 25' in resp.text
|
||||
resp = app.get('/backoffice/management/forms')
|
||||
assert 'Forms in your care' in resp.text
|
||||
assert '4 open on 25' in resp.text
|
||||
|
||||
|
||||
def test_backoffice_management_css_class(pub):
|
||||
|
@ -340,22 +337,15 @@ def test_backoffice_listing(pub):
|
|||
resp = app.get('/backoffice/management/form-title/')
|
||||
resp.forms['listing-settings']['filter'] = 'all'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
if pub.is_using_postgresql():
|
||||
assert resp.text.count('data-link') == 20
|
||||
else:
|
||||
# not using sql -> no pagination
|
||||
assert resp.text.count('data-link') == 50
|
||||
assert resp.text.count('data-link') == 20
|
||||
|
||||
# check status filter <select>
|
||||
resp = app.get('/backoffice/management/form-title/')
|
||||
resp.forms['listing-settings']['filter'] = 'done'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
if pub.is_using_postgresql():
|
||||
assert resp.text.count('data-link') == 20
|
||||
resp = resp.click('Next Page')
|
||||
assert resp.text.count('data-link') == 13
|
||||
else:
|
||||
assert resp.text.count('data-link') == 33
|
||||
assert resp.text.count('data-link') == 20
|
||||
resp = resp.click('Next Page')
|
||||
assert resp.text.count('data-link') == 13
|
||||
|
||||
# add an extra status to workflow and move a few formdatas to it, they
|
||||
# should then be marked as open but not waiting for actions.
|
||||
|
@ -424,9 +414,6 @@ def test_backoffice_listing(pub):
|
|||
|
||||
|
||||
def test_backoffice_listing_pagination(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
create_superuser(pub)
|
||||
create_environment(pub)
|
||||
app = login(get_app(pub))
|
||||
|
@ -469,9 +456,6 @@ def test_backoffice_listing_pagination(pub):
|
|||
|
||||
|
||||
def test_backoffice_listing_anonymised(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
create_superuser(pub)
|
||||
create_environment(pub)
|
||||
app = login(get_app(pub))
|
||||
|
@ -488,9 +472,6 @@ def test_backoffice_listing_anonymised(pub):
|
|||
|
||||
|
||||
def test_backoffice_listing_fts(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
create_superuser(pub)
|
||||
create_environment(pub)
|
||||
formdef = FormDef.get_by_urlname('form-title')
|
||||
|
@ -1366,10 +1347,6 @@ def test_backoffice_wscall_failure_display(http_requests, pub):
|
|||
@pytest.mark.parametrize('notify_on_errors', [True, False])
|
||||
@pytest.mark.parametrize('record_on_errors', [True, False])
|
||||
def test_backoffice_wscall_on_error(http_requests, pub, emails, notify_on_errors, record_on_errors):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.cfg['debug'] = {'error_email': 'errors@localhost.invalid'}
|
||||
pub.cfg['emails'] = {'from': 'from@localhost.invalid'}
|
||||
pub.write_cfg()
|
||||
|
@ -1986,10 +1963,6 @@ def test_backoffice_wfedit_and_live_condition(pub):
|
|||
|
||||
|
||||
def test_global_listing(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
app = login(get_app(pub))
|
||||
|
@ -2105,10 +2078,6 @@ def test_global_listing(pub):
|
|||
|
||||
|
||||
def test_global_listing_parameters_from_query_string(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
app = login(get_app(pub))
|
||||
|
@ -2131,10 +2100,6 @@ def test_global_listing_parameters_from_query_string(pub):
|
|||
|
||||
|
||||
def test_global_listing_user_label(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
FormDef.wipe()
|
||||
|
||||
|
@ -2170,10 +2135,6 @@ def test_global_listing_user_label(pub):
|
|||
|
||||
|
||||
def test_management_views_with_no_formdefs(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
FormDef.wipe()
|
||||
|
@ -2193,10 +2154,6 @@ def test_management_views_with_no_formdefs(pub):
|
|||
|
||||
|
||||
def test_category_in_global_listing(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
Category.wipe()
|
||||
|
||||
|
@ -2283,10 +2240,6 @@ def test_category_in_global_listing(pub):
|
|||
|
||||
|
||||
def test_datetime_in_global_listing(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
|
||||
|
@ -2324,10 +2277,6 @@ def test_datetime_in_global_listing(pub):
|
|||
|
||||
|
||||
def test_global_listing_anonymised(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
app = login(get_app(pub))
|
||||
|
@ -2346,10 +2295,6 @@ def test_global_listing_anonymised(pub):
|
|||
|
||||
|
||||
def test_global_listing_geojson(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
|
||||
|
@ -2381,10 +2326,6 @@ def test_global_listing_geojson(pub):
|
|||
|
||||
|
||||
def test_global_map(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_user(pub)
|
||||
create_environment(pub)
|
||||
|
||||
|
@ -2447,38 +2388,33 @@ def test_formdata_lookup(pub):
|
|||
resp = resp.form.submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
# check looking up on a custom display_id
|
||||
formdata.id_display = '999999'
|
||||
formdata.store()
|
||||
assert formdata.get_display_id() == '999999'
|
||||
resp = app.get('/backoffice/management/').follow()
|
||||
resp.form['query'] = formdata.get_display_id()
|
||||
resp = resp.form.submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
|
||||
# check looking up on a custom display_id
|
||||
formdata.id_display = '999999'
|
||||
formdata.store()
|
||||
assert formdata.get_display_id() == '999999'
|
||||
resp = app.get('/backoffice/management/').follow()
|
||||
resp.form['query'] = formdata.get_display_id()
|
||||
resp = resp.form.submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
|
||||
|
||||
# try it from the global listing
|
||||
resp = app.get('/backoffice/management/listing')
|
||||
assert 'id="lookup-box"' in resp.text
|
||||
resp.forms[0]['query'] = formdata.tracking_code
|
||||
resp = resp.forms[0].submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
|
||||
resp = resp.follow()
|
||||
assert 'The form has been recorded' in resp.text
|
||||
# try it from the global listing
|
||||
resp = app.get('/backoffice/management/listing')
|
||||
assert 'id="lookup-box"' in resp.text
|
||||
resp.forms[0]['query'] = formdata.tracking_code
|
||||
resp = resp.forms[0].submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/form-title/%s/' % formdata.id
|
||||
resp = resp.follow()
|
||||
assert 'The form has been recorded' in resp.text
|
||||
|
||||
resp = app.get('/backoffice/management/listing')
|
||||
resp.forms[0]['query'] = 'AAAAAAAA'
|
||||
resp = resp.forms[0].submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/listing'
|
||||
resp = resp.follow()
|
||||
assert 'No such tracking code or identifier.' in resp.text
|
||||
resp = app.get('/backoffice/management/listing')
|
||||
resp.forms[0]['query'] = 'AAAAAAAA'
|
||||
resp = resp.forms[0].submit()
|
||||
assert resp.location == 'http://example.net/backoffice/management/listing'
|
||||
resp = resp.follow()
|
||||
assert 'No such tracking code or identifier.' in resp.text
|
||||
|
||||
|
||||
def test_backoffice_sidebar_user_context(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
user = create_user(pub)
|
||||
create_environment(pub)
|
||||
form_class = FormDef.get_by_urlname('form-title').data_class()
|
||||
|
@ -2576,9 +2512,6 @@ def test_backoffice_sidebar_lateral_block(pub):
|
|||
|
||||
|
||||
def test_count_open(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
create_user(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
|
@ -2710,9 +2643,6 @@ def test_backoffice_backoffice_submission_in_listings(pub):
|
|||
|
||||
|
||||
def test_backoffice_backoffice_submission_in_global_listing(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
create_superuser(pub)
|
||||
create_environment(pub)
|
||||
|
||||
|
@ -2758,12 +2688,11 @@ def test_backoffice_advisory_lock(pub):
|
|||
resp = app.get('/backoffice/management/form-title/')
|
||||
assert 'advisory-lock' not in resp.text
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
# check global view
|
||||
resp = app2.get('/backoffice/management/listing?limit=100')
|
||||
assert 'advisory-lock' in resp.text
|
||||
resp = app.get('/backoffice/management/listing?limit=100')
|
||||
assert 'advisory-lock' not in resp.text
|
||||
# check global view
|
||||
resp = app2.get('/backoffice/management/listing?limit=100')
|
||||
assert 'advisory-lock' in resp.text
|
||||
resp = app.get('/backoffice/management/listing?limit=100')
|
||||
assert 'advisory-lock' not in resp.text
|
||||
|
||||
resp = app.get('/backoffice/management/form-title/' + first_link)
|
||||
assert 'Be warned forms of this user are also being looked' not in resp.text
|
||||
|
@ -2810,9 +2739,6 @@ def test_backoffice_advisory_lock(pub):
|
|||
|
||||
|
||||
def test_backoffice_advisory_lock_related_formdatas(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
pub.session_manager.session_class.wipe()
|
||||
user = create_superuser(pub)
|
||||
create_environment(pub)
|
||||
|
@ -3622,10 +3548,6 @@ def test_backoffice_fields(pub):
|
|||
|
||||
|
||||
def test_backoffice_logged_errors(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow.get_default_workflow()
|
||||
workflow.id = '12'
|
||||
|
@ -4144,8 +4066,7 @@ def test_backoffice_create_formdata_backoffice_submission(pub, create_formdata):
|
|||
assert target_data_class.count() == 0
|
||||
# resubmit it through backoffice submission
|
||||
resp = resp.form.submit(name='button_resubmit')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert target_data_class.count() == 1
|
||||
target_formdata = target_data_class.select()[0]
|
||||
|
||||
|
@ -4264,8 +4185,7 @@ def test_backoffice_create_formdata_map_fields_by_varname(pub, create_formdata):
|
|||
assert target_data_class.count() == 0
|
||||
# resubmit it through backoffice submission
|
||||
resp = resp.form.submit(name='button_resubmit')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert target_data_class.count() == 1
|
||||
target_formdata = target_data_class.select()[0]
|
||||
|
||||
|
|
|
@ -552,10 +552,6 @@ def test_backoffice_cards_import_data_csv_invalid_columns(pub):
|
|||
|
||||
|
||||
def test_backoffice_cards_wscall_failure_display(http_requests, pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
user = create_user(pub)
|
||||
|
||||
Workflow.wipe()
|
||||
|
|
|
@ -309,10 +309,6 @@ def test_backoffice_user_columns(pub):
|
|||
resp = app.get('/backoffice/management/form-title/')
|
||||
assert resp.text.count('</th>') == 6 # four columns
|
||||
|
||||
if not pub.is_using_postgresql():
|
||||
# no support for relation columns unless using SQL
|
||||
assert 'user-label$3' not in resp.forms['listing-settings'].fields
|
||||
return
|
||||
resp.forms['listing-settings']['user-label$3'].checked = True
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert resp.text.count('</th>') == 7
|
||||
|
@ -381,10 +377,6 @@ def test_backoffice_card_field_columns(pub):
|
|||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/management/form-title/')
|
||||
assert resp.text.count('</th>') == 6 # four columns
|
||||
if not pub.is_using_postgresql():
|
||||
# no support for relation columns unless using SQL
|
||||
assert '4$1' not in resp.forms['listing-settings'].fields
|
||||
return
|
||||
assert '4$0' not in resp.forms['listing-settings'].fields
|
||||
resp.forms['listing-settings']['4$1'].checked = True
|
||||
resp.forms['listing-settings']['4$2'].checked = True
|
||||
|
@ -415,10 +407,6 @@ def test_backoffice_card_field_columns(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_columns(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -520,10 +508,6 @@ def test_backoffice_block_columns(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_email_column(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -585,10 +569,6 @@ def test_backoffice_block_email_column(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_bool_column(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -649,10 +629,6 @@ def test_backoffice_block_bool_column(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_date_column(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -713,10 +689,6 @@ def test_backoffice_block_date_column(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_file_column(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -780,10 +752,6 @@ def test_backoffice_block_file_column(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_column_position(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
|
|
@ -827,10 +827,6 @@ def test_backoffice_custom_view_columns(pub):
|
|||
|
||||
|
||||
def test_backoffice_custom_view_sort_field(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_superuser(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
|
|
|
@ -589,10 +589,6 @@ def test_backoffice_csv_export_table(pub):
|
|||
|
||||
|
||||
def test_backoffice_csv_export_ordering(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
create_superuser(pub)
|
||||
|
||||
FormDef.wipe()
|
||||
|
|
|
@ -349,47 +349,32 @@ def test_backoffice_item_filter(pub):
|
|||
assert resp.text.count('<td>b</td>') == 0
|
||||
assert resp.text.count('<td>d</td>') > 0
|
||||
|
||||
if not pub.is_using_postgresql():
|
||||
# in pickle all options are always displayed
|
||||
# in postgresql, option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-4-value'].value = 'c'
|
||||
resp.forms['listing-settings']['filter-4-operator'].value = 'eq'
|
||||
|
||||
# check json view used to fill select filters from javascript
|
||||
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=4&_search=d&' + resp.request.query_string
|
||||
)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['d']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=7&' + resp.request.query_string, status=404
|
||||
)
|
||||
|
||||
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
|
||||
resp.forms['listing-settings']['filter'] = status
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert resp.text.count('<td>â</td>') == 0
|
||||
assert resp.text.count('<td>b</td>') == 0
|
||||
assert resp.text.count('<td>c</td>') == 0
|
||||
|
||||
else:
|
||||
# in postgresql, option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-4-value'].value = 'c'
|
||||
|
||||
# check json view used to fill select filters from javascript
|
||||
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=4&_search=d&' + resp.request.query_string
|
||||
)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['d']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=7&' + resp.request.query_string, status=404
|
||||
)
|
||||
|
||||
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
|
||||
resp.forms['listing-settings']['filter'] = status
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=4&' + resp.request.query_string
|
||||
)
|
||||
if status == 'accepted':
|
||||
assert [x['id'] for x in resp2.json['data']] == []
|
||||
else:
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
if status == 'accepted':
|
||||
assert [x['id'] for x in resp2.json['data']] == []
|
||||
else:
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
|
||||
|
||||
def test_backoffice_item_double_filter(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -566,40 +551,28 @@ def test_backoffice_bofield_item_filter(pub):
|
|||
assert resp.text.count('<td>b</td>') == 0
|
||||
assert resp.text.count('<td>d</td>') > 0
|
||||
|
||||
if not pub.is_using_postgresql():
|
||||
# in pickle all options are always displayed
|
||||
# in postgresql, option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-bo0-1-value'].value = 'c'
|
||||
resp.forms['listing-settings']['filter-bo0-1-operator'].value = 'eq'
|
||||
|
||||
# check json view used to fill select filters from javascript
|
||||
resp2 = app.get(resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=bo0-1&_search=d&' + resp.request.query_string
|
||||
)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['d']
|
||||
|
||||
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
|
||||
resp.forms['listing-settings']['filter'] = status
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert resp.text.count('<td>â</td>') == 0
|
||||
assert resp.text.count('<td>b</td>') == 0
|
||||
assert resp.text.count('<td>c</td>') == 0
|
||||
|
||||
else:
|
||||
# in postgresql, option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-bo0-1-value'].value = 'c'
|
||||
|
||||
# check json view used to fill select filters from javascript
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string
|
||||
)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=bo0-1&_search=d&' + resp.request.query_string
|
||||
)
|
||||
assert [x['id'] for x in resp2.json['data']] == ['d']
|
||||
|
||||
for status in ('all', 'waiting', 'pending', 'done', 'accepted'):
|
||||
resp.forms['listing-settings']['filter'] = status
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
resp2 = app.get(
|
||||
resp.request.path + 'filter-options?filter_field_id=bo0-1&' + resp.request.query_string
|
||||
)
|
||||
if status == 'accepted':
|
||||
assert [x['id'] for x in resp2.json['data']] == []
|
||||
else:
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
if status == 'accepted':
|
||||
assert [x['id'] for x in resp2.json['data']] == []
|
||||
else:
|
||||
assert [x['id'] for x in resp2.json['data']] == ['â', 'b', 'd']
|
||||
|
||||
|
||||
def test_backoffice_items_filter(pub):
|
||||
|
@ -681,18 +654,9 @@ def test_backoffice_items_filter(pub):
|
|||
assert resp.text.count('<td>â</td>') > 0
|
||||
assert resp.text.count('<td>b, d</td>') == 0
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
# option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-4-value'].value = 'c'
|
||||
else:
|
||||
# option 'c' is never used so not even listed
|
||||
with pytest.raises(ValueError):
|
||||
resp.forms['listing-settings']['filter-4-value'].value = 'c'
|
||||
resp.forms['listing-settings']['filter-4-operator'].value = 'eq'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert resp.text.count('<td>â, b</td>') == 0
|
||||
assert resp.text.count('<td>â</td>') == 0
|
||||
assert resp.text.count('<td>b, d</td>') == 0
|
||||
assert resp.text.count('data-link') == 0 # no rows
|
||||
|
||||
|
||||
def test_backoffice_items_cards_filter(pub):
|
||||
|
@ -766,22 +730,13 @@ def test_backoffice_items_cards_filter(pub):
|
|||
resp.forms['listing-settings']['filter-1'].checked = True
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
# option 'bar' is never used so not even listed
|
||||
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
|
||||
'',
|
||||
'card baz',
|
||||
'card foo',
|
||||
'card foo, bar',
|
||||
]
|
||||
else:
|
||||
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
|
||||
'',
|
||||
'card bar',
|
||||
'card baz',
|
||||
'card foo',
|
||||
'card foo, bar',
|
||||
]
|
||||
# option 'bar' is never used so not even listed
|
||||
assert [x[2] for x in resp.forms['listing-settings']['filter-1-value'].options] == [
|
||||
'',
|
||||
'card baz',
|
||||
'card foo',
|
||||
'card foo, bar',
|
||||
]
|
||||
|
||||
resp.forms['listing-settings']['filter-1-value'].value = card_ids['foo']
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
|
@ -957,9 +912,6 @@ def test_backoffice_email_filter(pub):
|
|||
|
||||
|
||||
def test_backoffice_date_filter(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
@ -1363,9 +1315,6 @@ def test_backoffice_table_varname_filter(pub):
|
|||
|
||||
|
||||
def test_backoffice_block_field_filter(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
|
||||
pub.user_class.wipe()
|
||||
create_superuser(pub)
|
||||
pub.role_class.wipe()
|
||||
|
|
|
@ -349,10 +349,6 @@ def test_inspect_page(pub, local_user):
|
|||
|
||||
|
||||
def test_inspect_page_with_related_objects(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
user = create_user(pub, is_admin=True)
|
||||
|
||||
FormDef.wipe()
|
||||
|
@ -746,10 +742,6 @@ def test_inspect_page_map_field(pub, local_user):
|
|||
|
||||
|
||||
def test_inspect_page_lazy_list(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
|
||||
formdef = FormDef()
|
||||
|
|
|
@ -38,13 +38,12 @@ def test_backoffice_statistics_with_no_formdefs(pub):
|
|||
create_user(pub)
|
||||
create_environment(pub)
|
||||
FormDef.wipe()
|
||||
if pub.is_using_postgresql():
|
||||
from wcs.sql import drop_global_views, get_connection_and_cursor
|
||||
from wcs.sql import drop_global_views, get_connection_and_cursor
|
||||
|
||||
conn, cur = get_connection_and_cursor()
|
||||
drop_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn, cur = get_connection_and_cursor()
|
||||
drop_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
app = login(get_app(pub))
|
||||
resp = app.get('/backoffice/management/statistics')
|
||||
|
@ -72,14 +71,11 @@ def test_backoffice_statistics_status_filter(pub):
|
|||
assert 'Total number of records: 17' in resp.text
|
||||
assert '<h2>Filters</h2>' in resp.text
|
||||
assert 'Status: Open' in resp.text
|
||||
if pub.is_using_postgresql():
|
||||
resp.forms['listing-settings']['filter'].value = 'waiting'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert 'Total number of records: 17' in resp.text
|
||||
assert '<h2>Filters</h2>' in resp.text
|
||||
assert 'Status: Waiting for an action' in resp.text
|
||||
else:
|
||||
assert 'waiting' not in [x[0] for x in resp.forms['listing-settings']['filter'].options]
|
||||
resp.forms['listing-settings']['filter'].value = 'waiting'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
assert 'Total number of records: 17' in resp.text
|
||||
assert '<h2>Filters</h2>' in resp.text
|
||||
assert 'Status: Waiting for an action' in resp.text
|
||||
|
||||
resp.forms['listing-settings']['filter'].value = 'done'
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
|
|
|
@ -1521,12 +1521,7 @@ def test_form_submit_with_just_disabled_user(pub, emails):
|
|||
user.store()
|
||||
resp = resp.form.submit('submit')
|
||||
resp = resp.follow()
|
||||
if pub.is_using_postgresql():
|
||||
assert 'Sorry, your session have been lost.' in resp
|
||||
else:
|
||||
assert 'The form has been recorded' in resp
|
||||
assert formdef.data_class().count() == 1
|
||||
assert formdef.data_class().select()[0].user_id is None
|
||||
assert 'Sorry, your session have been lost.' in resp
|
||||
|
||||
|
||||
def test_form_titles(pub):
|
||||
|
@ -2836,9 +2831,8 @@ def test_form_multi_page_post_edit(pub):
|
|||
assert formdef.data_class().get(data_id).evolution[-1].status == 'wf-%s' % st2.id
|
||||
|
||||
# jump to a nonexistent status == do not jump, but add a LoggedError
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
assert pub.loggederror_class.count() == 0
|
||||
pub.loggederror_class.wipe()
|
||||
assert pub.loggederror_class.count() == 0
|
||||
editable.status = 'deleted_status_id'
|
||||
workflow.store()
|
||||
# go back to st1
|
||||
|
@ -2854,15 +2848,14 @@ def test_form_multi_page_post_edit(pub):
|
|||
resp = resp.forms[0].submit('submit')
|
||||
resp = resp.follow()
|
||||
assert formdef.data_class().get(data_id).status == 'wf-%s' % st1.id # stay on st1
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.workflow_id == workflow.id
|
||||
assert logged_error.status_id == st1.id
|
||||
assert logged_error.status_item_id == editable.id
|
||||
assert logged_error.occurences_count == 1
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.workflow_id == workflow.id
|
||||
assert logged_error.status_id == st1.id
|
||||
assert logged_error.status_item_id == editable.id
|
||||
assert logged_error.occurences_count == 1
|
||||
|
||||
# do it again: increment logged_error.occurences_count
|
||||
page = login(get_app(pub), username='foo', password='foo').get('/test/%s/' % data_id)
|
||||
|
@ -2873,10 +2866,9 @@ def test_form_multi_page_post_edit(pub):
|
|||
resp = resp.forms[0].submit('submit')
|
||||
resp = resp.follow()
|
||||
assert formdef.data_class().get(data_id).status == 'wf-%s' % st1.id # stay on st1
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count == 2
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count == 2
|
||||
|
||||
|
||||
def test_form_edit_autocomplete_list(pub):
|
||||
|
@ -3765,11 +3757,10 @@ def test_form_page_template_prefill_items_field(pub):
|
|||
assert not resp.form['f0$elementbar'].checked
|
||||
assert not resp.form['f0$elementbaz'].checked
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid value for items prefill on field "items"'
|
||||
pub.loggederror_class.wipe()
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid value for items prefill on field "items"'
|
||||
pub.loggederror_class.wipe()
|
||||
|
||||
|
||||
def test_form_page_changing_prefill(pub):
|
||||
|
@ -5006,13 +4997,8 @@ def test_form_autosave_never_overwrite(mocker, pub, settings):
|
|||
autosave_data['_ajax_form_token'] = resp.form['_form_id'].value
|
||||
resp_autosave = app.post('/test/autosave', params=autosave_data)
|
||||
formdata.refresh_from_storage()
|
||||
if pub.is_using_postgresql():
|
||||
assert resp_autosave.json != {'result': 'success'}
|
||||
assert formdata.data['3'] == '1'
|
||||
else:
|
||||
# with pickle invalid data has been saved
|
||||
assert resp_autosave.json == {'result': 'success'}
|
||||
assert formdata.data['3'] == 'tmp'
|
||||
assert resp_autosave.json != {'result': 'success'}
|
||||
assert formdata.data['3'] == '1'
|
||||
# validate
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
|
||||
|
@ -5546,10 +5532,9 @@ def test_workflow_message_with_template_error(pub):
|
|||
resp = resp.follow()
|
||||
assert 'Error rendering message.' in resp.text
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in template of workflow message ('int' object is not iterable)"
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in template of workflow message ('int' object is not iterable)"
|
||||
|
||||
|
||||
def test_session_cookie_flags(pub):
|
||||
|
@ -6030,10 +6015,7 @@ def test_item_field_from_custom_view_on_cards(pub):
|
|||
app = login(get_app(pub), username='foo', password='foo')
|
||||
|
||||
resp = app.get('/backoffice/data/items/')
|
||||
if pub.is_using_postgresql():
|
||||
assert resp.text.count('<tr') == 21 # thead + 20 items (max per page)
|
||||
else:
|
||||
assert resp.text.count('<tr') == 31 # thead + all items
|
||||
assert resp.text.count('<tr') == 21 # thead + 20 items (max per page)
|
||||
resp.forms['listing-settings']['filter-0'].checked = True
|
||||
resp = resp.forms['listing-settings'].submit()
|
||||
|
||||
|
@ -6524,11 +6506,10 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
|
|||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert emails.count() == 1
|
||||
assert emails.get_latest('subject') == '[ERROR] [DATASOURCE] Error loading JSON data source (...)'
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == '[DATASOURCE] Error loading JSON data source (...)'
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == '[DATASOURCE] Error loading JSON data source (...)'
|
||||
|
||||
data_source.notify_on_errors = False
|
||||
data_source.store()
|
||||
|
@ -6891,10 +6872,6 @@ def test_form_data_keywords(pub):
|
|||
|
||||
|
||||
def test_logged_errors(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow.get_default_workflow()
|
||||
workflow.id = '12'
|
||||
|
@ -8896,11 +8873,9 @@ def test_create_formdata_empty_item_ds_with_id_parameter(pub, create_formdata):
|
|||
resp = resp.form.submit('submit') # -> submission
|
||||
resp = resp.follow()
|
||||
assert create_formdata['target_formdef'].data_class().count() == 0
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert pub.loggederror_class.count() == 0
|
||||
resp = resp.form.submit('button_resubmit')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert pub.loggederror_class.count() == 0
|
||||
|
||||
|
||||
def test_create_formdata_locked_prefill_parent(create_formdata):
|
||||
|
|
|
@ -1808,14 +1808,13 @@ def test_removed_block_in_form_page(pub):
|
|||
resp = get_app(pub).get(formdef.get_url(), status=500)
|
||||
assert 'A fatal error happened.' in resp.text
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert '(id:%s)' % logged_error.id in resp.text
|
||||
resp = get_app(pub).get(formdef.get_url(), status=500)
|
||||
assert '(id:%s)' % logged_error.id in resp.text
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count == 2
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert '(id:%s)' % logged_error.id in resp.text
|
||||
resp = get_app(pub).get(formdef.get_url(), status=500)
|
||||
assert '(id:%s)' % logged_error.id in resp.text
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count == 2
|
||||
|
||||
|
||||
def test_block_with_static_condition(pub):
|
||||
|
|
|
@ -631,8 +631,7 @@ def test_computed_field_with_data_source(pub):
|
|||
|
||||
|
||||
def test_computed_field_with_bad_objects_filter_in_prefill(pub):
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
CardDef.wipe()
|
||||
FormDef.wipe()
|
||||
|
||||
|
@ -676,22 +675,20 @@ def test_computed_field_with_bad_objects_filter_in_prefill(pub):
|
|||
formdef.store()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'XY' in resp.text
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'wcs.carddef.CardDefDoesNotExist: unknown'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== 'Invalid value "{{ cards|objects:"unknown"|first|get:"form_number_raw"|default:"" }}" for field "computed"'
|
||||
)
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'wcs.carddef.CardDefDoesNotExist: unknown'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== 'Invalid value "{{ cards|objects:"unknown"|first|get:"form_number_raw"|default:"" }}" for field "computed"'
|
||||
)
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
|
||||
|
||||
def test_computed_field_with_bad_value_type_in_prefill(pub):
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
CardDef.wipe()
|
||||
FormDef.wipe()
|
||||
|
||||
|
@ -740,11 +737,10 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
|
|||
formdef.store()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'XY' in resp.text
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid value "foo" for field "computed"'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid value "foo" for field "computed"'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
|
||||
formdef.fields[0].value_template = (
|
||||
'{{ cards|objects:"%s"|first|get:"form_var_bool"|default:"" }}' % carddef.url_name
|
||||
|
@ -752,11 +748,10 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
|
|||
formdef.store()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'XY' in resp.text
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid value "True" for field "computed"'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid value "True" for field "computed"'
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
|
||||
for value_template in [
|
||||
'{{ cards|objects:"%s"|get:42|get:"form_number_raw" }}' % carddef.url_name,
|
||||
|
@ -766,5 +761,4 @@ def test_computed_field_with_bad_value_type_in_prefill(pub):
|
|||
formdef.store()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'XY' in resp.text
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
assert pub.loggederror_class.count() == 2
|
||||
|
|
|
@ -392,10 +392,9 @@ def test_form_file_field_with_wrong_value(pub):
|
|||
formdef.store()
|
||||
|
||||
get_app(pub).get('/test/')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == 'Failed to set value on field "file"'
|
||||
assert logged_error.exception_class == 'AttributeError'
|
||||
assert logged_error.exception_message == "'str' object has no attribute 'time'"
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == 'Failed to set value on field "file"'
|
||||
assert logged_error.exception_class == 'AttributeError'
|
||||
assert logged_error.exception_message == "'str' object has no attribute 'time'"
|
||||
|
|
|
@ -828,11 +828,10 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
|
|||
http_post_request.return_value = None, 400, '{"code": "document-exists"}', None # fail
|
||||
resp = resp.form.submit('button_export_to')
|
||||
assert http_post_request.call_count == 1
|
||||
if pub.is_using_postgresql():
|
||||
error = pub.loggederror_class.select()[0]
|
||||
assert error.summary.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo")
|
||||
assert 'status: 400' in error.summary
|
||||
assert "payload: {'code': 'document-exists'}" in error.summary
|
||||
error = pub.loggederror_class.select()[0]
|
||||
assert error.summary.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo")
|
||||
assert 'status: 400' in error.summary
|
||||
assert "payload: {'code': 'document-exists'}" in error.summary
|
||||
|
||||
# failed to push to portfolio, but document is here
|
||||
resp = resp.follow() # $form/$id/create_doc
|
||||
|
|
|
@ -1320,10 +1320,6 @@ def test_field_live_locked_error_prefilled_field(pub, http_requests):
|
|||
|
||||
@pytest.mark.parametrize('field_type', ['item', 'string', 'email'])
|
||||
def test_dynamic_item_field_from_custom_view_on_cards(pub, field_type):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
pub.custom_view_class.wipe()
|
||||
|
||||
|
@ -1461,23 +1457,17 @@ def test_dynamic_item_field_from_custom_view_on_cards(pub, field_type):
|
|||
assert formdef.data_class().select()[0].data['1_structured']['item'] == 'baz'
|
||||
|
||||
# delete custom view
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
custom_view.remove_self()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert resp.form['f1'].options == []
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
|
||||
|
||||
|
||||
def test_dynamic_items_field_from_custom_view_on_cards(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
pub.custom_view_class.wipe()
|
||||
|
||||
|
@ -1606,23 +1596,17 @@ def test_dynamic_items_field_from_custom_view_on_cards(pub):
|
|||
assert formdef.data_class().select()[0].data['1_structured']['text'] == 'attr1 - foo,bar'
|
||||
|
||||
# delete custom view
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
custom_view.remove_self()
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert resp.form['f1'].options == []
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.formdef_id == formdef.id
|
||||
assert logged_error.summary == '[DATASOURCE] Unknown custom view "as-data-source" for CardDef "items"'
|
||||
|
||||
|
||||
def test_dynamic_internal_id_from_custom_view_on_cards(pub):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.role_class.wipe()
|
||||
pub.custom_view_class.wipe()
|
||||
|
||||
|
|
|
@ -453,14 +453,13 @@ def test_data_source_custom_view_unknown_filter(pub):
|
|||
|
||||
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo')] == ['Hello']
|
||||
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo:view')] == []
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Invalid filter "42"'
|
||||
assert logged_error.formdef_id == str(carddef.id)
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid filter "foobar"'
|
||||
assert logged_error.formdef_id == str(carddef.id)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Invalid filter "42"'
|
||||
assert logged_error.formdef_id == str(carddef.id)
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid filter "foobar"'
|
||||
assert logged_error.formdef_id == str(carddef.id)
|
||||
|
||||
|
||||
def test_data_source_anonymised_cards(pub):
|
||||
|
@ -600,14 +599,13 @@ def test_data_source_custom_view_digest(pub):
|
|||
carddef.store()
|
||||
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo')] == ['', '']
|
||||
assert [i['text'] for i in CardDef.get_data_source_items('carddef:foo:view')] == ['', '']
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Digest (default) not defined'
|
||||
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Digest (custom view "view") not defined'
|
||||
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Digest (default) not defined'
|
||||
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Digest (custom view "view") not defined'
|
||||
assert logged_error.formdata_id in (str(carddata.id), str(carddata2.id))
|
||||
assert CardDef.get_data_source_items('carddef:foo', get_by_text='') == []
|
||||
assert CardDef.get_data_source_items('carddef:foo:view', get_by_text='') == []
|
||||
|
||||
|
|
|
@ -139,21 +139,19 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
|
|||
datasource = {'type': 'formula', 'value': 'foobar', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Failed to eval() Python data source' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Failed to eval() Python data source ('foobar')"
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Failed to eval() Python data source ('foobar')"
|
||||
|
||||
# expression not iterable
|
||||
datasource = {'type': 'formula', 'value': '2', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'gave a non-iterable result' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Python data source ('2') gave a non-iterable result"
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Python data source ('2') gave a non-iterable result"
|
||||
|
||||
datasource = {
|
||||
'type': 'formula',
|
||||
|
@ -161,13 +159,12 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
|
|||
'record_on_errors': True,
|
||||
}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == (
|
||||
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
|
||||
)
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == (
|
||||
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
|
||||
)
|
||||
|
||||
# list of dictionaries but some are missing a text key
|
||||
datasource = {
|
||||
|
@ -176,13 +173,12 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
|
|||
'record_on_errors': True,
|
||||
}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == (
|
||||
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
|
||||
)
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == (
|
||||
'[DATASOURCE] Python data source (\'[{"mairie-a-rdv", "Mairie A"}, {"mairie-b-rdv", "Mairie B"}]\') gave a non usable result'
|
||||
)
|
||||
|
||||
if not pub.site_options.has_section('options'):
|
||||
pub.site_options.add_section('options')
|
||||
|
@ -191,8 +187,7 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
|
|||
pub.site_options.write(fd)
|
||||
|
||||
# running with disabled python expressions
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
datasource = {
|
||||
'type': 'formula',
|
||||
'value': repr(['foo', 'bar']),
|
||||
|
@ -203,11 +198,10 @@ def test_python_datasource_errors(pub, error_email, http_requests, emails, caplo
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert emails.count() == 1 # notified even with notify_on_errors set to False
|
||||
assert 'Unauthorized Python Usage' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select(order_by='latest_occurence_timestamp')[-1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == 'Unauthorized Python Usage'
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select(order_by='latest_occurence_timestamp')[-1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == 'Unauthorized Python Usage'
|
||||
|
||||
|
||||
def test_python_datasource_with_evalutils(pub):
|
||||
|
@ -450,14 +444,13 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
|
|||
assert 'error in HTTP request to http://remote.example.net/404 (status: 404)' in emails.get_latest(
|
||||
'subject'
|
||||
)
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
|
||||
)
|
||||
|
||||
datasource = {
|
||||
'type': 'json',
|
||||
|
@ -468,14 +461,13 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert emails.count() == 2
|
||||
assert 'Error reading JSON data source' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
|
||||
)
|
||||
|
||||
datasource = {
|
||||
'type': 'json',
|
||||
|
@ -485,11 +477,10 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
|
|||
}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
|
||||
|
||||
datasource = {
|
||||
'type': 'json',
|
||||
|
@ -499,45 +490,40 @@ def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
|
|||
}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error reading JSON data source output (err 1)' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
|
||||
|
||||
|
||||
def test_json_datasource_bad_url_scheme(pub, error_email, emails):
|
||||
datasource = {'type': 'json', 'value': '', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert emails.count() == 0
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 0
|
||||
assert pub.loggederror_class.count() == 0
|
||||
|
||||
datasource = {'type': 'json', 'value': 'foo://bar', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'invalid scheme in URL' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
|
||||
)
|
||||
|
||||
datasource = {'type': 'json', 'value': '/bla/blo', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'invalid scheme in URL' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary == "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('notify', [True, False])
|
||||
|
@ -561,13 +547,12 @@ def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record):
|
|||
assert message in emails.get_latest('subject')
|
||||
else:
|
||||
assert emails.count() == 0
|
||||
if pub.is_using_postgresql():
|
||||
if record:
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == message
|
||||
else:
|
||||
assert pub.loggederror_class.count() == 0
|
||||
if record:
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == message
|
||||
else:
|
||||
assert pub.loggederror_class.count() == 0
|
||||
|
||||
|
||||
def test_geojson_datasource(pub, requests_pub, http_requests):
|
||||
|
@ -896,14 +881,13 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'status: 404' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (error in HTTP request to http://remote.example.net/404 (status: 404))"
|
||||
)
|
||||
|
||||
datasource = {
|
||||
'type': 'geojson',
|
||||
|
@ -914,14 +898,13 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error reading JSON data source output' in emails.get_latest('subject')
|
||||
assert 'Expecting value:' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error reading JSON data source output (Expecting value: line 1 column 1 (char 0))"
|
||||
)
|
||||
|
||||
datasource = {
|
||||
'type': 'geojson',
|
||||
|
@ -932,11 +915,10 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'error' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error loading JSON data source (error)"
|
||||
|
||||
datasource = {
|
||||
'type': 'geojson',
|
||||
|
@ -946,11 +928,10 @@ def test_geojson_datasource_bad_url(pub, http_requests, error_email, emails):
|
|||
}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error reading JSON data source output (err 1)' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.workflow_id is None
|
||||
assert logged_error.summary == "[DATASOURCE] Error reading JSON data source output (err 1)"
|
||||
|
||||
|
||||
def test_geojson_datasource_bad_url_scheme(pub, error_email, emails):
|
||||
|
@ -962,27 +943,24 @@ def test_geojson_datasource_bad_url_scheme(pub, error_email, emails):
|
|||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'invalid scheme in URL' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL foo://bar)"
|
||||
)
|
||||
|
||||
datasource = {'type': 'geojson', 'value': '/bla/blo', 'notify_on_errors': True, 'record_on_errors': True}
|
||||
assert data_sources.get_items(datasource) == []
|
||||
assert 'Error loading JSON data source' in emails.get_latest('subject')
|
||||
assert 'invalid scheme in URL' in emails.get_latest('subject')
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
|
||||
)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.workflow_id is None
|
||||
assert (
|
||||
logged_error.summary == "[DATASOURCE] Error loading JSON data source (invalid scheme in URL /bla/blo)"
|
||||
)
|
||||
|
||||
|
||||
def test_item_field_named_python_datasource(requests_pub):
|
||||
|
|
|
@ -193,8 +193,7 @@ def test_fc_login_page(caplog):
|
|||
}
|
||||
)
|
||||
)
|
||||
if pub.is_using_postgresql():
|
||||
assert 'user did not authorize login' in pub.loggederror_class.select(order_by='id')[-1].summary
|
||||
assert 'user did not authorize login' in pub.loggederror_class.select(order_by='id')[-1].summary
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.parse.urlencode(
|
||||
|
@ -204,8 +203,7 @@ def test_fc_login_page(caplog):
|
|||
}
|
||||
)
|
||||
)
|
||||
if pub.is_using_postgresql():
|
||||
assert 'whatever' in pub.loggederror_class.select(order_by='id')[-1].summary
|
||||
assert 'whatever' in pub.loggederror_class.select(order_by='id')[-1].summary
|
||||
|
||||
# Login existing user
|
||||
def logme(login_url):
|
||||
|
|
|
@ -1250,14 +1250,12 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert queryset.count == 4
|
||||
queryset = lazy_formdata.objects.filter_by('foo_foo').apply_filter_value('X')
|
||||
assert queryset.count == 0
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
queryset = lazy_formdata.objects.filter_by('unknown').apply_filter_value('X')
|
||||
assert queryset.count == 0
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid filter "unknown"'
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Invalid filter "unknown"'
|
||||
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_filter_value(
|
||||
datetime.date(2018, 7, 31).timetuple()
|
||||
|
@ -1273,33 +1271,29 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert queryset.count == 5
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_filter_value('not a date')
|
||||
assert queryset.count == 0
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid value "not a date" for filter "datefield"'
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid value "not a date" for filter "datefield"'
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
|
||||
datetime.date(2018, 7, 31).timetuple()
|
||||
)
|
||||
assert queryset.count == 6 # 1 + 5 null
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
|
||||
datetime.date(2018, 7, 31)
|
||||
)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
|
||||
datetime.datetime(2018, 7, 31)
|
||||
)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('2018-07-31')
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(None)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('still not a date')
|
||||
assert queryset.count == 0
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.summary == 'Invalid value "still not a date" for filter "datefield"'
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
|
||||
datetime.date(2018, 7, 31).timetuple()
|
||||
)
|
||||
assert queryset.count == 6 # 1 + 5 null
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(datetime.date(2018, 7, 31))
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(
|
||||
datetime.datetime(2018, 7, 31)
|
||||
)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('2018-07-31')
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value(None)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('datefield').apply_exclude_value('still not a date')
|
||||
assert queryset.count == 0
|
||||
assert pub.loggederror_class.count() == 3
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[2]
|
||||
assert logged_error.summary == 'Invalid value "still not a date" for filter "datefield"'
|
||||
|
||||
queryset = lazy_formdata.objects.filter_by('boolfield').apply_filter_value(True)
|
||||
assert queryset.count == 6
|
||||
|
@ -1311,9 +1305,8 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert queryset.count == 5
|
||||
queryset = lazy_formdata.objects.filter_by('boolfield').apply_filter_value(0)
|
||||
assert queryset.count == 5
|
||||
if pub.is_using_postgresql():
|
||||
queryset = lazy_formdata.objects.filter_by('boolfield').apply_exclude_value(0)
|
||||
assert queryset.count == 6
|
||||
queryset = lazy_formdata.objects.filter_by('boolfield').apply_exclude_value(0)
|
||||
assert queryset.count == 6
|
||||
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_filter_value('3')
|
||||
assert queryset.count == 7
|
||||
|
@ -1321,11 +1314,10 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert queryset.count == 7
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_filter_value('foobar')
|
||||
assert queryset.count == 0
|
||||
if pub.is_using_postgresql():
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('3')
|
||||
assert queryset.count == 4
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('foobar')
|
||||
assert queryset.count == 11
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('3')
|
||||
assert queryset.count == 4
|
||||
queryset = lazy_formdata.objects.filter_by('term1').apply_exclude_value('foobar')
|
||||
assert queryset.count == 11
|
||||
|
||||
queryset = lazy_formdata.objects.filter_by('email').apply_filter_value('bar')
|
||||
assert queryset.count == 0
|
||||
|
@ -1382,11 +1374,10 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert tmpl.render(context) == '7'
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|filter_value:"bar"|length}}')
|
||||
assert tmpl.render(context) == '7'
|
||||
if pub.is_using_postgresql():
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|length}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:"bar"|length}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
|
||||
pub.substitutions.feed(formdata)
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|filter_value:form_var_foo_foo|count}}')
|
||||
|
@ -1397,15 +1388,14 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert tmpl.render(context) == '5'
|
||||
tmpl = Template('{{form_objects|filter_by:"term1"|filter_value:form_var_term1|count}}')
|
||||
assert tmpl.render(context) == '7'
|
||||
if pub.is_using_postgresql():
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:form_var_foo_foo|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"datefield"|exclude_value:form_var_datefield|count}}')
|
||||
assert tmpl.render(context) == '6'
|
||||
tmpl = Template('{{form_objects|filter_by:"boolfield"|exclude_value:form_var_boolfield|count}}')
|
||||
assert tmpl.render(context) == '6'
|
||||
tmpl = Template('{{form_objects|filter_by:"term1"|exclude_value:form_var_term1|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"foo_foo"|exclude_value:form_var_foo_foo|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
tmpl = Template('{{form_objects|filter_by:"datefield"|exclude_value:form_var_datefield|count}}')
|
||||
assert tmpl.render(context) == '6'
|
||||
tmpl = Template('{{form_objects|filter_by:"boolfield"|exclude_value:form_var_boolfield|count}}')
|
||||
assert tmpl.render(context) == '6'
|
||||
tmpl = Template('{{form_objects|filter_by:"term1"|exclude_value:form_var_term1|count}}')
|
||||
assert tmpl.render(context) == '4'
|
||||
|
||||
tmpl = Template('{{form.objects|exclude_self|filter_by:"foo_foo"|filter_value:form_var_foo_foo|count}}')
|
||||
assert tmpl.render(context) == '6'
|
||||
|
@ -1494,10 +1484,9 @@ def test_lazy_formdata_queryset_filter(pub, variable_test_data):
|
|||
assert tmpl.render(context) == '0'
|
||||
tmpl = Template('{{form_objects|filter_by_internal_id:"%s"|count}}' % 'invalid value')
|
||||
assert tmpl.render(context) == '0'
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.summary == 'Invalid value "invalid value" for "filter_by_internal_id"'
|
||||
assert pub.loggederror_class.count() == 4
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[3]
|
||||
assert logged_error.summary == 'Invalid value "invalid value" for "filter_by_internal_id"'
|
||||
|
||||
# test |filter_by_number
|
||||
context = pub.substitutions.get_context_variables(mode='lazy')
|
||||
|
@ -1675,9 +1664,6 @@ def test_lazy_formdata_queryset_get_from_first(pub, variable_test_data):
|
|||
|
||||
|
||||
def test_lazy_formdata_queryset_order_by(pub, variable_test_data):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
|
||||
lazy_formdata = variable_test_data
|
||||
data_class = lazy_formdata._formdef.data_class()
|
||||
for i in range(6):
|
||||
|
@ -1828,14 +1814,13 @@ def test_lazy_global_forms(pub):
|
|||
custom_view4.store()
|
||||
tmpl = Template('{{forms|objects:"foobarlazy"|with_custom_view:"unknown-filter"|count}}')
|
||||
assert tmpl.render(context) == '0'
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Invalid filter "42"'
|
||||
assert logged_error.formdef_id == str(formdef.id)
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid filter "foobar"'
|
||||
assert logged_error.formdef_id == str(formdef.id)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[0]
|
||||
assert logged_error.summary == 'Invalid filter "42"'
|
||||
assert logged_error.formdef_id == str(formdef.id)
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Invalid filter "foobar"'
|
||||
assert logged_error.formdef_id == str(formdef.id)
|
||||
|
||||
|
||||
def test_lazy_variables(pub, variable_test_data):
|
||||
|
@ -2674,11 +2659,10 @@ def test_form_digest_error(pub):
|
|||
formdata.store()
|
||||
assert formdef.data_class().get(formdata.id).digests['default'] == 'ERROR'
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Could not render digest (default)'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
logged_error = pub.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Could not render digest (default)'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
|
||||
formdef.digest_templates = {
|
||||
'default': 'plop plop',
|
||||
|
@ -2690,11 +2674,10 @@ def test_form_digest_error(pub):
|
|||
formdata.store()
|
||||
assert formdef.data_class().get(formdata.id).digests['custom-view:foobar'] == 'ERROR'
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Could not render digest (custom view "foobar")'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert pub.loggederror_class.count() == 2
|
||||
logged_error = pub.loggederror_class.select(order_by='id')[1]
|
||||
assert logged_error.summary == 'Could not render digest (custom view "foobar")'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
|
||||
|
||||
def test_lazy_formdata_decimal_filter(pub):
|
||||
|
|
|
@ -121,13 +121,12 @@ def test_title_change(pub):
|
|||
assert FormDef.get(formdef.id).url_name == 'foo'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'bar'
|
||||
|
||||
if not pub.is_using_postgresql():
|
||||
# makes sure the internal_name doesn't change if there are submitted forms
|
||||
formdef.data_class()().store()
|
||||
formdef.name = 'baz'
|
||||
formdef.store()
|
||||
assert FormDef.get(formdef.id).name == 'baz'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
|
||||
# makes sure the internal_name doesn't change if there are submitted forms
|
||||
formdef.data_class()().store()
|
||||
formdef.name = 'baz'
|
||||
formdef.store()
|
||||
assert FormDef.get(formdef.id).name == 'baz'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
|
||||
|
||||
|
||||
def test_overlong_slug(pub):
|
||||
|
|
|
@ -224,10 +224,6 @@ def test_inactive_user(pub, user, app):
|
|||
|
||||
|
||||
def test_transient_data_removal(pub, app):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.session_manager.session_class.wipe()
|
||||
sql.TransientData.wipe()
|
||||
resp = app.get('/')
|
||||
|
@ -251,10 +247,6 @@ def test_transient_data_removal(pub, app):
|
|||
|
||||
|
||||
def test_magictoken_migration(pub, app):
|
||||
if not pub.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
pub.session_manager.session_class.wipe()
|
||||
sql.TransientData.wipe()
|
||||
resp = app.get('/')
|
||||
|
|
|
@ -85,7 +85,6 @@ def create_temporary_pub(sql_mode=True, legacy_theme_mode=False, lazy_mode=False
|
|||
pub.custom_view_class = sql.CustomView
|
||||
pub.snapshot_class = sql.Snapshot
|
||||
pub.loggederror_class = sql.LoggedError
|
||||
pub.is_using_postgresql = lambda: True
|
||||
else:
|
||||
pub.user_class = User
|
||||
pub.role_class = Role
|
||||
|
@ -93,7 +92,6 @@ def create_temporary_pub(sql_mode=True, legacy_theme_mode=False, lazy_mode=False
|
|||
pub.tracking_code_class = TrackingCode
|
||||
pub.session_class = sessions.BasicSession
|
||||
pub.custom_view_class = custom_views.CustomView
|
||||
pub.is_using_postgresql = lambda: False
|
||||
|
||||
pub.session_manager_class = sessions.StorageSessionManager
|
||||
pub.session_manager = pub.session_manager_class(session_class=pub.session_class)
|
||||
|
|
|
@ -373,10 +373,6 @@ def test_jump_count_condition(pub):
|
|||
|
||||
|
||||
def test_jump_bad_python_condition(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foobar'
|
||||
|
@ -433,19 +429,17 @@ def test_jump_django_conditions(two_pubs):
|
|||
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
|
||||
assert item.check_condition(formdata) is False
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
|
||||
item.condition = {'type': 'django', 'value': '~ invalid ~'}
|
||||
assert item.check_condition(formdata) is False
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to evaluate condition'
|
||||
assert logged_error.exception_class == 'TemplateSyntaxError'
|
||||
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
|
||||
assert logged_error.expression == '~ invalid ~'
|
||||
assert logged_error.expression_type == 'django'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to evaluate condition'
|
||||
assert logged_error.exception_class == 'TemplateSyntaxError'
|
||||
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
|
||||
assert logged_error.expression == '~ invalid ~'
|
||||
assert logged_error.expression_type == 'django'
|
||||
|
||||
|
||||
def test_check_auth(pub):
|
||||
|
@ -580,10 +574,6 @@ def test_dispatch_multi(pub):
|
|||
|
||||
|
||||
def test_dispatch_auto(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires sql')
|
||||
return
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.fields = [
|
||||
|
@ -671,10 +661,6 @@ def test_dispatch_auto(two_pubs):
|
|||
|
||||
|
||||
def test_dispatch_computed(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires sql')
|
||||
return
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
formdef.store()
|
||||
|
@ -1275,21 +1261,19 @@ def test_register_comment_attachment(two_pubs):
|
|||
two_pubs.substitutions.feed(formdata)
|
||||
item.perform(formdata)
|
||||
url1 = formdata.evolution[-1].parts[-1].content
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 1
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 1
|
||||
|
||||
two_pubs.substitutions.feed(formdata)
|
||||
item.comment = '{{ form_attachments.testfile.url }}'
|
||||
item.perform(formdata)
|
||||
url2 = formdata.evolution[-1].parts[-1].content
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 1
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 1
|
||||
|
||||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||||
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
||||
|
@ -1316,21 +1300,19 @@ def test_register_comment_attachment(two_pubs):
|
|||
item.comment = '[attachments.testfile.url]'
|
||||
item.perform(formdata)
|
||||
url3 = formdata.evolution[-1].parts[-1].content
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 2
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 2
|
||||
two_pubs.substitutions.feed(formdata)
|
||||
item.comment = '[form_attachments.testfile.url]'
|
||||
item.perform(formdata)
|
||||
url4 = formdata.evolution[-1].parts[-1].content
|
||||
assert url3 == url4
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 2
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.kind == 'deprecated_usage'
|
||||
assert error.occurences_count == 2
|
||||
|
||||
|
||||
def test_register_comment_with_attachment_file(pub):
|
||||
|
@ -1808,19 +1790,17 @@ def test_webservice_call(http_requests, two_pubs):
|
|||
item.perform(formdata)
|
||||
assert formdata.status == 'wf-sterr'
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
pub.loggederror_class.wipe()
|
||||
item.action_on_5xx = 'stdeleted' # removed status
|
||||
formdata.status = 'wf-st1'
|
||||
formdata.store()
|
||||
with pytest.raises(AbortActionException):
|
||||
item.perform(formdata)
|
||||
assert formdata.status == 'wf-st1' # unknown status acts like :stop
|
||||
if pub.is_using_postgresql():
|
||||
assert pub.loggederror_class.count() == 1
|
||||
error = pub.loggederror_class.select()[0]
|
||||
assert 'reference-to-invalid-status-stdeleted-in-workflow' in error.tech_id
|
||||
assert error.occurences_count == 1
|
||||
assert pub.loggederror_class.count() == 1
|
||||
error = pub.loggederror_class.select()[0]
|
||||
assert 'reference-to-invalid-status-stdeleted-in-workflow' in error.tech_id
|
||||
assert error.occurences_count == 1
|
||||
|
||||
item = WebserviceCallStatusItem()
|
||||
item.url = 'http://remote.example.net/xml'
|
||||
|
@ -2451,35 +2431,34 @@ def test_timeout(two_pubs):
|
|||
|
||||
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
formdata_id = formdata.id
|
||||
with mock.patch('wcs.wf.jump.JumpWorkflowStatusItem.check_condition') as must_jump:
|
||||
must_jump.return_value = False
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 0 # not enough time has passed
|
||||
|
||||
# check a lower than minimal delay is not considered
|
||||
jump.timeout = 5 * 50 # 5 minutes
|
||||
workflow.store()
|
||||
rewind(formdata, seconds=10 * 60)
|
||||
formdata.store()
|
||||
formdata_id = formdata.id
|
||||
with mock.patch('wcs.wf.jump.JumpWorkflowStatusItem.check_condition') as must_jump:
|
||||
must_jump.return_value = False
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 0 # not enough time has passed
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 0
|
||||
|
||||
# check a lower than minimal delay is not considered
|
||||
jump.timeout = 5 * 50 # 5 minutes
|
||||
workflow.store()
|
||||
rewind(formdata, seconds=10 * 60)
|
||||
formdata.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 0
|
||||
# but is executed once delay is reached
|
||||
rewind(formdata, seconds=10 * 60)
|
||||
formdata.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 1
|
||||
|
||||
# but is executed once delay is reached
|
||||
rewind(formdata, seconds=10 * 60)
|
||||
formdata.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 1
|
||||
|
||||
# check a templated timeout is considered as minimal delay for explicit evaluation
|
||||
jump.timeout = '{{ "0" }}'
|
||||
workflow.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 2
|
||||
# check a templated timeout is considered as minimal delay for explicit evaluation
|
||||
jump.timeout = '{{ "0" }}'
|
||||
workflow.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert must_jump.call_count == 2
|
||||
|
||||
# check there's no crash on workflow without jumps
|
||||
formdef = FormDef()
|
||||
|
@ -2530,18 +2509,16 @@ def test_timeout_with_humantime_template(two_pubs):
|
|||
formdata.store()
|
||||
formdata_id = formdata.id
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
|
||||
rewind(formdata, seconds=40 * 60)
|
||||
formdata.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert formdef.data_class().get(formdata_id).status == 'wf-st1' # no change
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in timeout value '30 plop' (computed from '{{ 30 }} plop')"
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in timeout value '30 plop' (computed from '{{ 30 }} plop')"
|
||||
|
||||
# template timeout value returning nothing
|
||||
jump.timeout = '{% if 1 %}{% endif %}'
|
||||
|
@ -2553,18 +2530,16 @@ def test_timeout_with_humantime_template(two_pubs):
|
|||
formdata.store()
|
||||
formdata_id = formdata.id
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
|
||||
rewind(formdata, seconds=40 * 60)
|
||||
formdata.store()
|
||||
_apply_timeouts(two_pubs)
|
||||
assert formdef.data_class().get(formdata_id).status == 'wf-st1' # no change
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in timeout value '' (computed from '{% if 1 %}{% endif %}')"
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Error in timeout value '' (computed from '{% if 1 %}{% endif %}')"
|
||||
|
||||
|
||||
def test_legacy_timeout(pub):
|
||||
|
@ -2663,10 +2638,6 @@ def test_timeout_with_mark(two_pubs):
|
|||
|
||||
|
||||
def test_jump_missing_previous_mark(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
Workflow.wipe()
|
||||
|
||||
|
@ -2847,15 +2818,14 @@ def test_sms_with_passerelle(two_pubs):
|
|||
assert json_payload['to'] == ['1234']
|
||||
assert json_payload['from'] == 'Passerelle'
|
||||
|
||||
if pub.is_using_postgresql():
|
||||
pub.loggederror_class.wipe()
|
||||
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
|
||||
mocked_secret_and_orig.return_value = ('secret', 'localhost')
|
||||
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
|
||||
mocked_http_post.return_value = (mock.Mock(headers={}), 400, '{"err": 1}', 'headers')
|
||||
item.perform(formdata)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
assert pub.loggederror_class.select()[0].summary == 'Could not send SMS'
|
||||
pub.loggederror_class.wipe()
|
||||
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
|
||||
mocked_secret_and_orig.return_value = ('secret', 'localhost')
|
||||
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
|
||||
mocked_http_post.return_value = (mock.Mock(headers={}), 400, '{"err": 1}', 'headers')
|
||||
item.perform(formdata)
|
||||
assert pub.loggederror_class.count() == 1
|
||||
assert pub.loggederror_class.select()[0].summary == 'Could not send SMS'
|
||||
|
||||
|
||||
def test_display_form(two_pubs):
|
||||
|
@ -3340,11 +3310,10 @@ def test_geolocate_action_enable_geolocation(two_pubs):
|
|||
formdef.change_workflow(workflow)
|
||||
assert formdef.geolocations
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
# change to current workflow
|
||||
workflow = Workflow(name='wf2')
|
||||
|
@ -3367,11 +3336,10 @@ def test_geolocate_action_enable_geolocation(two_pubs):
|
|||
formdef.refresh_from_storage()
|
||||
assert formdef.geolocations
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
|
||||
def test_geolocate_address(two_pubs):
|
||||
|
@ -3450,20 +3418,19 @@ def test_geolocate_address(two_pubs):
|
|||
formdata.geolocations = None
|
||||
item.perform(formdata)
|
||||
assert formdata.geolocations == {}
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== 'error in template for address string [syntax error in ezt template: unclosed block at line 1 and column 24]'
|
||||
)
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'TemplateError'
|
||||
assert (
|
||||
logged_error.exception_message
|
||||
== 'syntax error in ezt template: unclosed block at line 1 and column 24'
|
||||
)
|
||||
two_pubs.loggederror_class.wipe()
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== 'error in template for address string [syntax error in ezt template: unclosed block at line 1 and column 24]'
|
||||
)
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'TemplateError'
|
||||
assert (
|
||||
logged_error.exception_message
|
||||
== 'syntax error in ezt template: unclosed block at line 1 and column 24'
|
||||
)
|
||||
two_pubs.loggederror_class.wipe()
|
||||
|
||||
# check for None
|
||||
item.address_string = '=None'
|
||||
|
@ -3492,13 +3459,12 @@ def test_geolocate_address(two_pubs):
|
|||
http_get_page.side_effect = ConnectionError('some error')
|
||||
item.perform(formdata)
|
||||
assert formdata.geolocations == {}
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'error calling geocoding service [some error]'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ConnectionError'
|
||||
assert logged_error.exception_message == 'some error'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'error calling geocoding service [some error]'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ConnectionError'
|
||||
assert logged_error.exception_message == 'some error'
|
||||
|
||||
|
||||
def test_geolocate_image(pub):
|
||||
|
@ -3583,13 +3549,12 @@ def test_geolocate_map(two_pubs):
|
|||
formdata.geolocations = None
|
||||
formdata.data = {'2': '48.8337085'}
|
||||
item.map_variable = '=form_var_map'
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'error geolocating from map variable'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ValueError'
|
||||
assert logged_error.exception_message == 'not enough values to unpack (expected 2, got 1)'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'error geolocating from map variable'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ValueError'
|
||||
assert logged_error.exception_message == 'not enough values to unpack (expected 2, got 1)'
|
||||
|
||||
|
||||
def test_geolocate_overwrite(pub):
|
||||
|
@ -4581,36 +4546,32 @@ def test_set_backoffice_field(http_requests, two_pubs):
|
|||
formdata = formdef.data_class().get(formdata.id)
|
||||
assert formdata.data['bo1'] == ''
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
|
||||
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
|
||||
item.perform(formdata)
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to compute Python expression'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.expression == ' ~ invalid python ~'
|
||||
assert logged_error.expression_type == 'python'
|
||||
assert logged_error.exception_class == 'SyntaxError'
|
||||
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to compute Python expression'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.expression == ' ~ invalid python ~'
|
||||
assert logged_error.expression_type == 'python'
|
||||
assert logged_error.exception_class == 'SyntaxError'
|
||||
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
|
||||
item.perform(formdata)
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to compute template'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.expression == '{% if bad django %}'
|
||||
assert logged_error.expression_type == 'template'
|
||||
assert logged_error.exception_class == 'TemplateError'
|
||||
assert logged_error.exception_message.startswith('syntax error in Django template')
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == 'Failed to compute template'
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.expression == '{% if bad django %}'
|
||||
assert logged_error.expression_type == 'template'
|
||||
assert logged_error.exception_class == 'TemplateError'
|
||||
assert logged_error.exception_message.startswith('syntax error in Django template')
|
||||
|
||||
|
||||
def test_set_backoffice_field_map(http_requests, two_pubs):
|
||||
|
@ -4652,35 +4613,32 @@ def test_set_backoffice_field_map(http_requests, two_pubs):
|
|||
formdata = formdef.data_class().get(formdata.id)
|
||||
assert formdata.data.get('bo1') is None
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
|
||||
item.fields = [{'field_id': 'bo1', 'value': 'invalid value'}]
|
||||
item.perform(formdata)
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "Failed to set Map field (bo1), error: invalid coordinates 'invalid value' (missing ;)"
|
||||
)
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'SetValueError'
|
||||
assert logged_error.exception_message == "invalid coordinates 'invalid value' (missing ;)"
|
||||
two_pubs.loggederror_class.wipe()
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert (
|
||||
logged_error.summary
|
||||
== "Failed to set Map field (bo1), error: invalid coordinates 'invalid value' (missing ;)"
|
||||
)
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'SetValueError'
|
||||
assert logged_error.exception_message == "invalid coordinates 'invalid value' (missing ;)"
|
||||
two_pubs.loggederror_class.wipe()
|
||||
|
||||
item.fields = [{'field_id': 'bo1', 'value': 'XXX;YYY'}]
|
||||
item.perform(formdata)
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Failed to set Map field (bo1), error: invalid coordinates 'XXX;YYY'"
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'SetValueError'
|
||||
assert logged_error.exception_message == "invalid coordinates 'XXX;YYY'"
|
||||
two_pubs.loggederror_class.wipe()
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary == "Failed to set Map field (bo1), error: invalid coordinates 'XXX;YYY'"
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'SetValueError'
|
||||
assert logged_error.exception_message == "invalid coordinates 'XXX;YYY'"
|
||||
two_pubs.loggederror_class.wipe()
|
||||
|
||||
|
||||
def test_set_backoffice_field_decimal(http_requests, two_pubs):
|
||||
|
@ -4977,19 +4935,17 @@ def test_set_backoffice_field_file(http_requests, two_pubs):
|
|||
item.parent = st1
|
||||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
item.perform(formdata)
|
||||
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
||||
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary.startswith('Failed to convert')
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ValueError'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary.startswith('Failed to convert')
|
||||
assert logged_error.formdata_id == str(formdata.id)
|
||||
assert logged_error.exception_class == 'ValueError'
|
||||
|
||||
# check wrong field
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
|
@ -5207,8 +5163,7 @@ def test_set_backoffice_field_card_item(two_pubs):
|
|||
assert formdata.data['bo1_structured']['attr'] == 'attr1'
|
||||
|
||||
# reset, with unknown value
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
formdata.data = {}
|
||||
formdata.store()
|
||||
item.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
|
||||
|
@ -5217,10 +5172,9 @@ def test_set_backoffice_field_card_item(two_pubs):
|
|||
assert formdata.data.get('bo1') is None # invalid value is not stored
|
||||
assert formdata.data.get('bo1_display') is None
|
||||
assert formdata.data.get('bo1_structured') is None
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary.startswith('Failed to convert')
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.summary.startswith('Failed to convert')
|
||||
|
||||
# reset, and get empty value
|
||||
formdata.data = {}
|
||||
|
@ -5398,17 +5352,16 @@ def test_set_backoffice_field_items(two_pubs):
|
|||
assert len(formdata.data['bo1_structured']) == 1
|
||||
|
||||
# using an invalid value
|
||||
if two_pubs.is_using_postgresql():
|
||||
formdata.data = {}
|
||||
formdata.store()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
item.parent = st1
|
||||
item.fields = [{'field_id': 'bo1', 'value': "=Ellipsis"}]
|
||||
item.perform(formdata)
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert 'Failed to convert' in logged_error.summary
|
||||
formdata.data = {}
|
||||
formdata.store()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
item.parent = st1
|
||||
item.fields = [{'field_id': 'bo1', 'value': "=Ellipsis"}]
|
||||
item.perform(formdata)
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert 'Failed to convert' in logged_error.summary
|
||||
|
||||
# using a string with multiple values
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
|
@ -5473,21 +5426,18 @@ def test_set_backoffice_field_date(two_pubs):
|
|||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
||||
|
||||
# invalid values => do nothing
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
for value in ('plop', '={}', '=[]'):
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
item.parent = st1
|
||||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
item.perform(formdata)
|
||||
formdata = formdef.data_class().get(formdata.id)
|
||||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
assert two_pubs.loggederror_class.select()[0].summary.startswith('Failed to convert')
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
assert two_pubs.loggederror_class.select()[0].summary.startswith('Failed to convert')
|
||||
|
||||
# None : empty date
|
||||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||||
|
@ -5986,20 +5936,18 @@ def test_workflow_action_condition(two_pubs):
|
|||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
|
||||
|
||||
# bad condition
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
|
||||
workflow.store()
|
||||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
|
||||
assert logged_error.summary == 'Failed to evaluate condition'
|
||||
assert logged_error.exception_class == 'NameError'
|
||||
assert logged_error.exception_message == "name 'foobar' is not defined"
|
||||
assert logged_error.expression == 'foobar == barfoo'
|
||||
assert logged_error.expression_type == 'python'
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
logged_error = two_pubs.loggederror_class.select()[0]
|
||||
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
|
||||
assert logged_error.summary == 'Failed to evaluate condition'
|
||||
assert logged_error.exception_class == 'NameError'
|
||||
assert logged_error.exception_message == "name 'foobar' is not defined"
|
||||
assert logged_error.expression == 'foobar == barfoo'
|
||||
assert logged_error.expression_type == 'python'
|
||||
|
||||
|
||||
def test_workflow_field_migration(pub):
|
||||
|
@ -6517,10 +6465,6 @@ def test_call_external_workflow_use_caller_variable(pub):
|
|||
|
||||
|
||||
def test_call_external_workflow_manual_targeting(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
||||
|
@ -6701,10 +6645,6 @@ def test_form_update_after_backoffice_fields(sql_pub):
|
|||
|
||||
|
||||
def test_call_external_workflow_manual_queryset_targeting(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
||||
|
|
|
@ -104,11 +104,10 @@ def test_create_carddata(two_pubs):
|
|||
|
||||
assert carddef.data_class().count() == 1
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert any('form_var_undefined' in (error.exception_message or '') for error in errors)
|
||||
assert any('invalid date value' in (error.exception_message or '') for error in errors)
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert any('form_var_undefined' in (error.exception_message or '') for error in errors)
|
||||
assert any('invalid date value' in (error.exception_message or '') for error in errors)
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
today = datetime.date.today()
|
||||
|
@ -384,10 +383,9 @@ def test_create_carddata_with_map_field(two_pubs):
|
|||
assert carddef.data_class().count() == 1
|
||||
assert not carddef.data_class().select()[0].data.get('1')
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 1
|
||||
assert any('invalid coordinates' in (error.exception_message or '') for error in errors)
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 1
|
||||
assert any('invalid coordinates' in (error.exception_message or '') for error in errors)
|
||||
|
||||
# value from formdata
|
||||
create.mappings[0].expression = '{{ form_var_map }}'
|
||||
|
@ -742,10 +740,6 @@ def test_edit_carddata_with_linked_object(pub):
|
|||
|
||||
|
||||
def test_edit_carddata_manual_targeting(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
||||
|
@ -1142,10 +1136,6 @@ def test_assign_carddata_with_linked_object(pub):
|
|||
|
||||
|
||||
def test_assign_carddata_manual_targeting(two_pubs):
|
||||
if not two_pubs.is_using_postgresql():
|
||||
pytest.skip('this requires SQL')
|
||||
return
|
||||
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
two_pubs.user_class.wipe()
|
||||
|
|
|
@ -82,8 +82,7 @@ def test_create_formdata(two_pubs):
|
|||
formdata.just_created()
|
||||
|
||||
assert target_formdef.data_class().count() == 0
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
assert two_pubs.loggederror_class.count() == 0
|
||||
# check unconfigure action do nothing
|
||||
formdata.perform_workflow()
|
||||
assert target_formdef.data_class().count() == 0
|
||||
|
@ -94,28 +93,26 @@ def test_create_formdata(two_pubs):
|
|||
formdata.perform_workflow()
|
||||
assert target_formdef.data_class().count() == 1
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert 'form_var_toto_string' in errors[0].exception_message
|
||||
assert errors[1].summary == 'Missing field: unknown (1), unknown (2)'
|
||||
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
|
||||
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert 'form_var_toto_string' in errors[0].exception_message
|
||||
assert errors[1].summary == 'Missing field: unknown (1), unknown (2)'
|
||||
assert errors[0].formdata_id == str(target_formdef.data_class().select()[0].id)
|
||||
assert errors[1].formdata_id == str(target_formdef.data_class().select()[0].id)
|
||||
|
||||
if two_pubs.is_using_postgresql():
|
||||
# add field labels cache
|
||||
two_pubs.loggederror_class.wipe()
|
||||
target_formdef.data_class().wipe()
|
||||
create.formdef_slug = target_formdef.url_name
|
||||
create.cached_field_labels = {'0': 'field0', '1': 'field1', '2': 'field2'}
|
||||
wf.store()
|
||||
del source_formdef._workflow
|
||||
formdata.perform_workflow()
|
||||
assert target_formdef.data_class().count() == 1
|
||||
# add field labels cache
|
||||
two_pubs.loggederror_class.wipe()
|
||||
target_formdef.data_class().wipe()
|
||||
create.formdef_slug = target_formdef.url_name
|
||||
create.cached_field_labels = {'0': 'field0', '1': 'field1', '2': 'field2'}
|
||||
wf.store()
|
||||
del source_formdef._workflow
|
||||
formdata.perform_workflow()
|
||||
assert target_formdef.data_class().count() == 1
|
||||
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert errors[1].summary == 'Missing field: field1, field2'
|
||||
errors = two_pubs.loggederror_class.select()
|
||||
assert len(errors) == 2
|
||||
assert errors[1].summary == 'Missing field: field1, field2'
|
||||
|
||||
# no tracking code has been created
|
||||
created_formdata = target_formdef.data_class().select()[0]
|
||||
|
@ -274,8 +271,7 @@ def test_create_formdata_attach_to_history(two_pubs):
|
|||
|
||||
|
||||
def test_create_formdata_card_item_mapping(two_pubs):
|
||||
if two_pubs.is_using_postgresql():
|
||||
two_pubs.loggederror_class.wipe()
|
||||
two_pubs.loggederror_class.wipe()
|
||||
FormDef.wipe()
|
||||
CardDef.wipe()
|
||||
|
||||
|
@ -367,8 +363,7 @@ def test_create_formdata_card_item_mapping(two_pubs):
|
|||
target_formdata = target_formdef.data_class().select()[0]
|
||||
assert target_formdata.data.get('0') is None
|
||||
assert target_formdata.data.get('0_display') is None
|
||||
if two_pubs.is_using_postgresql():
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.exception_message == "unknown card value ('XXX')"
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
assert two_pubs.loggederror_class.count() == 1
|
||||
error = two_pubs.loggederror_class.select()[0]
|
||||
assert error.exception_message == "unknown card value ('XXX')"
|
||||
|
|
|
@ -539,7 +539,7 @@ class NamedDataSourcesDirectory(Directory):
|
|||
'categories': categories,
|
||||
'user_data_sources': user_data_sources,
|
||||
'has_chrono': has_chrono(get_publisher()),
|
||||
'has_users': get_publisher().is_using_postgresql(),
|
||||
'has_users': True,
|
||||
'agenda_data_sources': agenda_data_sources,
|
||||
'generated_data_sources': generated_data_sources,
|
||||
},
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
import difflib
|
||||
import io
|
||||
import tarfile
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import defaultdict
|
||||
|
||||
|
@ -604,7 +602,6 @@ class FormDefPage(Directory):
|
|||
'duplicate',
|
||||
'export',
|
||||
'anonymise',
|
||||
'archive',
|
||||
'enable',
|
||||
'workflow',
|
||||
'role',
|
||||
|
@ -932,8 +929,6 @@ class FormDefPage(Directory):
|
|||
r += htmltext('<li><a rel="popup" href="history/save">%s</a></li>') % _('Save snapshot')
|
||||
r += htmltext('<li><a href="history/">%s</a></li>') % _('History')
|
||||
r += htmltext('<li><a href="inspect">%s</a></li>') % _('Inspector')
|
||||
if not get_publisher().is_using_postgresql() and self.formdef_class == FormDef:
|
||||
r += htmltext('<li><a href="archive">%s</a></li>') % _('Archive')
|
||||
r += htmltext('</ul>')
|
||||
|
||||
r += htmltext('<ul>')
|
||||
|
@ -1276,8 +1271,6 @@ class FormDefPage(Directory):
|
|||
return redirect('../%s/' % self.formdefui.formdef.id)
|
||||
|
||||
def get_check_deletion_message(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
return None
|
||||
from wcs import sql
|
||||
|
||||
criterias = [
|
||||
|
@ -1570,105 +1563,6 @@ class FormDefPage(Directory):
|
|||
content_type='application/x-wcs-form',
|
||||
)
|
||||
|
||||
def archive(self):
|
||||
if get_publisher().is_using_postgresql():
|
||||
raise TraversalError()
|
||||
|
||||
if get_request().form.get('download'):
|
||||
return self.archive_download()
|
||||
|
||||
form = Form(enctype='multipart/form-data')
|
||||
|
||||
form.add(DateWidget, 'date', title=_('Archive forms handled before'))
|
||||
form.add(CheckboxWidget, 'not-done', title=_('Include forms that have not been handled'), value=False)
|
||||
form.add(CheckboxWidget, 'keep', title=_('Do not remove forms'), value=False)
|
||||
form.add_submit('submit', _('Submit'))
|
||||
form.add_submit('cancel', _('Cancel'))
|
||||
|
||||
if form.get_widget('cancel').parse():
|
||||
return redirect('.')
|
||||
|
||||
if not form.is_submitted() or form.has_errors():
|
||||
get_response().breadcrumb.append(('archive', _('Archive')))
|
||||
self.html_top(title=_('Archive Forms'))
|
||||
r = TemplateIO(html=True)
|
||||
r += htmltext('<h2>%s</h2>') % _('Archive Forms')
|
||||
r += form.render()
|
||||
return r.getvalue()
|
||||
else:
|
||||
return self.archive_submit(form)
|
||||
|
||||
def archive_submit(self, form):
|
||||
class Archiver:
|
||||
def __init__(self, formdef):
|
||||
self.formdef = formdef
|
||||
|
||||
def archive(self, job=None):
|
||||
all_forms = self.formdef.data_class().select()
|
||||
|
||||
if form.get_widget('not-done').parse() is False:
|
||||
not_endpoint_status = self.formdef.workflow.get_not_endpoint_status()
|
||||
not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status]
|
||||
all_forms = [x for x in all_forms if x.status not in not_endpoint_status_ids]
|
||||
|
||||
if form.get_widget('date').parse():
|
||||
date = form.get_widget('date').parse()
|
||||
date = time.strptime(date, misc.date_format())
|
||||
all_forms = [x for x in all_forms if x.last_update_time < date]
|
||||
|
||||
self.fd = io.BytesIO()
|
||||
with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t:
|
||||
t.add(self.formdef.get_object_filename(), 'formdef')
|
||||
for formdata in all_forms:
|
||||
t.add(
|
||||
formdata.get_object_filename(),
|
||||
'%s/%s' % (self.formdef.url_name, str(formdata.id)),
|
||||
)
|
||||
|
||||
if form.get_widget('keep').parse() is False:
|
||||
for f in all_forms:
|
||||
f.remove_self()
|
||||
|
||||
if job:
|
||||
job.file_content = self.fd.getvalue()
|
||||
job.store()
|
||||
|
||||
count = self.formdef.data_class().count()
|
||||
archiver = Archiver(self.formdef)
|
||||
if count > 100: # Arbitrary threshold
|
||||
job = get_response().add_after_job(
|
||||
_('Archiving forms'),
|
||||
archiver.archive,
|
||||
done_action_url=self.formdef.get_admin_url() + 'archive?job=%(job_id)s',
|
||||
done_action_label=_('Download Archive'),
|
||||
)
|
||||
job.store()
|
||||
return redirect(job.get_processing_url())
|
||||
else:
|
||||
archiver.archive()
|
||||
|
||||
response = get_response()
|
||||
response.set_content_type('application/x-wcs-archive')
|
||||
response.set_header(
|
||||
'content-disposition', 'attachment; filename=%s-archive.wcs' % self.formdef.url_name
|
||||
)
|
||||
return archiver.fd.getvalue()
|
||||
|
||||
def archive_download(self):
|
||||
try:
|
||||
job = AfterJob.get(get_request().form.get('download'))
|
||||
except KeyError:
|
||||
return redirect('.')
|
||||
|
||||
if not job.status == 'completed':
|
||||
raise TraversalError()
|
||||
response = get_response()
|
||||
response.set_content_type('application/x-wcs-archive')
|
||||
response.set_header(
|
||||
'content-disposition', 'attachment; filename=%s-archive.wcs' % self.formdef.url_name
|
||||
)
|
||||
return job.file_content
|
||||
|
||||
def enable(self):
|
||||
self.formdef.disabled = False
|
||||
self.formdef.store(comment=_('Enable'))
|
||||
|
|
|
@ -221,8 +221,7 @@ class LoggedErrorsDirectory(Directory):
|
|||
workflow_id=self.workflow_id,
|
||||
)
|
||||
links = ''
|
||||
if get_publisher().is_using_postgresql():
|
||||
links = pagination_links(offset, limit, total_count, load_js=False)
|
||||
links = pagination_links(offset, limit, total_count, load_js=False)
|
||||
return template.QommonTemplateResponse(
|
||||
templates=['wcs/backoffice/logged-errors.html'],
|
||||
context={
|
||||
|
|
|
@ -300,10 +300,9 @@ class UserFieldsFormDef(FormDef):
|
|||
self.publisher.cfg['users'] = users_cfg
|
||||
self.publisher.write_cfg()
|
||||
self.publisher._cached_user_fields_formdef = None
|
||||
if self.publisher.is_using_postgresql():
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
sql.do_user_table()
|
||||
sql.do_user_table()
|
||||
|
||||
|
||||
class UsersDirectory(Directory):
|
||||
|
|
233
wcs/api.py
233
wcs/api.py
|
@ -422,9 +422,6 @@ class ApiFormsDirectory(Directory):
|
|||
raise AccessForbiddenError('user not allowed to ignore roles')
|
||||
|
||||
def _q_index(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
raise TraversalError()
|
||||
|
||||
self.check_access()
|
||||
get_request()._user = get_user_from_api_query_string() or get_request().user
|
||||
|
||||
|
@ -491,8 +488,6 @@ class ApiFormsDirectory(Directory):
|
|||
return json.dumps({'data': output}, cls=misc.JSONEncoder)
|
||||
|
||||
def geojson(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
raise TraversalError()
|
||||
self.check_access()
|
||||
get_request()._user = get_user_from_api_query_string() or get_request().user
|
||||
return ManagementDirectory().geojson()
|
||||
|
@ -743,37 +738,33 @@ class ApiFormdefsDirectory(Directory):
|
|||
if include_count:
|
||||
# we include the count of submitted forms so it's possible to sort
|
||||
# them by "popularity"
|
||||
if get_publisher().is_using_postgresql():
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
# 4 * number of submitted forms of last 2 days
|
||||
# + 2 * number of submitted forms of last 8 days
|
||||
# + 1 * number of submitted forms of last 30 days
|
||||
# exclude drafts
|
||||
criterias = [Equal('formdef_id', formdef.id), StrictNotEqual('status', 'draft')]
|
||||
d_now = datetime.datetime.now()
|
||||
count = 4 * sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=2),
|
||||
include_start=True,
|
||||
criterias=criterias,
|
||||
)
|
||||
count += 2 * sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=8),
|
||||
include_start=True,
|
||||
period_end=d_now - datetime.timedelta(days=2),
|
||||
include_end=False,
|
||||
criterias=criterias,
|
||||
)
|
||||
count += sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=30),
|
||||
include_start=True,
|
||||
period_end=d_now - datetime.timedelta(days=8),
|
||||
include_end=False,
|
||||
criterias=criterias,
|
||||
)
|
||||
else:
|
||||
# naive count
|
||||
count = formdef.data_class().count()
|
||||
# 4 * number of submitted forms of last 2 days
|
||||
# + 2 * number of submitted forms of last 8 days
|
||||
# + 1 * number of submitted forms of last 30 days
|
||||
# exclude drafts
|
||||
criterias = [Equal('formdef_id', formdef.id), StrictNotEqual('status', 'draft')]
|
||||
d_now = datetime.datetime.now()
|
||||
count = 4 * sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=2),
|
||||
include_start=True,
|
||||
criterias=criterias,
|
||||
)
|
||||
count += 2 * sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=8),
|
||||
include_start=True,
|
||||
period_end=d_now - datetime.timedelta(days=2),
|
||||
include_end=False,
|
||||
criterias=criterias,
|
||||
)
|
||||
count += sql.get_period_total(
|
||||
period_start=d_now - datetime.timedelta(days=30),
|
||||
include_start=True,
|
||||
period_end=d_now - datetime.timedelta(days=8),
|
||||
include_end=False,
|
||||
criterias=criterias,
|
||||
)
|
||||
formdict['count'] = count
|
||||
|
||||
formdict['functions'] = {}
|
||||
|
@ -923,104 +914,79 @@ class ApiUserDirectory(Directory):
|
|||
if category_slugs:
|
||||
categories = Category.select([Contains('url_name', category_slugs)])
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
order_by = 'receipt_time'
|
||||
if get_request().form.get('sort') == 'desc':
|
||||
order_by = '-receipt_time'
|
||||
if get_query_flag('include-accessible'):
|
||||
user_roles = user.get_roles()
|
||||
criterias = [
|
||||
order_by = 'receipt_time'
|
||||
if get_request().form.get('sort') == 'desc':
|
||||
order_by = '-receipt_time'
|
||||
if get_query_flag('include-accessible'):
|
||||
user_roles = user.get_roles()
|
||||
criterias = [
|
||||
Or(
|
||||
[
|
||||
Intersects('concerned_roles_array', user_roles),
|
||||
Equal('user_id', str(user.id)),
|
||||
]
|
||||
)
|
||||
]
|
||||
else:
|
||||
criterias = [Equal('user_id', str(user.id))]
|
||||
if category_slugs:
|
||||
criterias.append(Contains('category_id', [c.id for c in categories]))
|
||||
|
||||
status_criteria = get_request().form.get('status') or 'all'
|
||||
if status_criteria == 'open':
|
||||
criterias.append(Equal('is_at_endpoint', False))
|
||||
elif status_criteria == 'done':
|
||||
criterias.append(Equal('is_at_endpoint', True))
|
||||
elif status_criteria == 'all':
|
||||
pass
|
||||
else:
|
||||
return HttpResponseBadRequest('invalid status parameter value')
|
||||
|
||||
if include_drafts:
|
||||
disabled_formdef_ids = [formdef.id for formdef in FormDef.select() if formdef.is_disabled()]
|
||||
if disabled_formdef_ids:
|
||||
criterias.append(
|
||||
Or(
|
||||
[
|
||||
Intersects('concerned_roles_array', user_roles),
|
||||
Equal('user_id', str(user.id)),
|
||||
StrictNotEqual('status', 'draft'),
|
||||
NotContains('formdef_id', disabled_formdef_ids),
|
||||
]
|
||||
)
|
||||
]
|
||||
else:
|
||||
criterias = [Equal('user_id', str(user.id))]
|
||||
if category_slugs:
|
||||
criterias.append(Contains('category_id', [c.id for c in categories]))
|
||||
|
||||
status_criteria = get_request().form.get('status') or 'all'
|
||||
if status_criteria == 'open':
|
||||
criterias.append(Equal('is_at_endpoint', False))
|
||||
elif status_criteria == 'done':
|
||||
criterias.append(Equal('is_at_endpoint', True))
|
||||
elif status_criteria == 'all':
|
||||
pass
|
||||
else:
|
||||
return HttpResponseBadRequest('invalid status parameter value')
|
||||
|
||||
if include_drafts:
|
||||
disabled_formdef_ids = [formdef.id for formdef in FormDef.select() if formdef.is_disabled()]
|
||||
if disabled_formdef_ids:
|
||||
criterias.append(
|
||||
Or(
|
||||
[
|
||||
StrictNotEqual('status', 'draft'),
|
||||
NotContains('formdef_id', disabled_formdef_ids),
|
||||
]
|
||||
)
|
||||
)
|
||||
else:
|
||||
criterias.append(StrictNotEqual('status', 'draft'))
|
||||
|
||||
if not include_non_drafts:
|
||||
criterias.append(Equal('status', 'draft'))
|
||||
|
||||
user_forms = sql.AnyFormData.select(
|
||||
criterias,
|
||||
limit=misc.get_int_or_400(get_request().form.get('limit')),
|
||||
offset=misc.get_int_or_400(get_request().form.get('offset')),
|
||||
order_by=order_by,
|
||||
)
|
||||
if get_request().form.get('full') == 'on':
|
||||
# load full objects
|
||||
formdefs = {x.formdef_id: x.formdef for x in user_forms}
|
||||
formdef_user_forms = {}
|
||||
for formdef_id, formdef in formdefs.items():
|
||||
formdef_user_forms.update(
|
||||
{
|
||||
(formdef_id, x.id): x
|
||||
for x in formdef.data_class().select(
|
||||
[Contains('id', [x.id for x in user_forms if x.formdef_id == formdef_id])]
|
||||
)
|
||||
}
|
||||
)
|
||||
# and put them back in order
|
||||
sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms]
|
||||
user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples]
|
||||
else:
|
||||
# prefetch evolutions to avoid individual loads when computing
|
||||
# formdata.get_visible_status().
|
||||
sql.AnyFormData.load_all_evolutions(user_forms)
|
||||
)
|
||||
else:
|
||||
if get_query_flag('include-accessible'):
|
||||
return HttpResponseBadRequest('not supported')
|
||||
formdefs = FormDef.select()
|
||||
user_forms = []
|
||||
for formdef in formdefs:
|
||||
user_forms.extend(formdef.data_class().get_with_indexed_value('user_id', user.id))
|
||||
if category_slugs:
|
||||
user_forms = [f for f in user_forms if f.formdef.category_id in [c.id for c in categories]]
|
||||
criterias.append(StrictNotEqual('status', 'draft'))
|
||||
|
||||
status_criteria = get_request().form.get('status') or 'all'
|
||||
if status_criteria == 'open':
|
||||
user_forms = [x for x in user_forms if not x.is_at_endpoint_status()]
|
||||
elif status_criteria == 'done':
|
||||
user_forms = [x for x in user_forms if x.is_at_endpoint_status()]
|
||||
elif status_criteria == 'all':
|
||||
pass
|
||||
else:
|
||||
return HttpResponseBadRequest('invalid status parameter value')
|
||||
if not include_non_drafts:
|
||||
criterias.append(Equal('status', 'draft'))
|
||||
|
||||
typed_none = time.gmtime(-(10**10))
|
||||
user_forms.sort(key=lambda x: x.receipt_time or typed_none)
|
||||
if get_request().form.get('sort') == 'desc':
|
||||
user_forms.reverse()
|
||||
user_forms = sql.AnyFormData.select(
|
||||
criterias,
|
||||
limit=misc.get_int_or_400(get_request().form.get('limit')),
|
||||
offset=misc.get_int_or_400(get_request().form.get('offset')),
|
||||
order_by=order_by,
|
||||
)
|
||||
if get_request().form.get('full') == 'on':
|
||||
# load full objects
|
||||
formdefs = {x.formdef_id: x.formdef for x in user_forms}
|
||||
formdef_user_forms = {}
|
||||
for formdef_id, formdef in formdefs.items():
|
||||
formdef_user_forms.update(
|
||||
{
|
||||
(formdef_id, x.id): x
|
||||
for x in formdef.data_class().select(
|
||||
[Contains('id', [x.id for x in user_forms if x.formdef_id == formdef_id])]
|
||||
)
|
||||
}
|
||||
)
|
||||
# and put them back in order
|
||||
sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms]
|
||||
user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples]
|
||||
else:
|
||||
# prefetch evolutions to avoid individual loads when computing
|
||||
# formdata.get_visible_status().
|
||||
sql.AnyFormData.load_all_evolutions(user_forms)
|
||||
return user_forms
|
||||
|
||||
def drafts(self):
|
||||
|
@ -1056,16 +1022,10 @@ class ApiUserDirectory(Directory):
|
|||
raise AccessForbiddenError('user not allowed to query data from others')
|
||||
# mark forms that are readable by querying user
|
||||
user_roles = set(query_user.get_roles())
|
||||
if get_publisher().is_using_postgresql():
|
||||
# use concerned_roles_array attribute that was saved in the
|
||||
# table.
|
||||
for form in forms:
|
||||
form.readable = bool(set(form.concerned_roles_array).intersection(user_roles))
|
||||
else:
|
||||
# recomputed concerned roles.
|
||||
for form in forms:
|
||||
concerned_roles_array = [str(x) for x in form.concerned_roles if x]
|
||||
form.readable = bool(set(concerned_roles_array).intersection(user_roles))
|
||||
# use concerned_roles_array attribute that was saved in the
|
||||
# table.
|
||||
for form in forms:
|
||||
form.readable = bool(set(form.concerned_roles_array).intersection(user_roles))
|
||||
# ignore confidential forms
|
||||
forms = [x for x in forms if x.readable or not x.formdef.skip_from_360_view]
|
||||
|
||||
|
@ -1132,8 +1092,7 @@ class ApiUsersDirectory(Directory):
|
|||
for field in formdef.fields:
|
||||
if field.type in ('string', 'text', 'email'):
|
||||
criteria_fields.append(st.ILike('f%s' % field.id, query))
|
||||
if get_publisher().is_using_postgresql():
|
||||
criteria_fields.append(st.FtsMatch(query))
|
||||
criteria_fields.append(st.FtsMatch(query))
|
||||
criterias.append(st.Or(criteria_fields))
|
||||
|
||||
def as_dict(user):
|
||||
|
|
|
@ -213,9 +213,6 @@ class CardDefPage(FormDefPage):
|
|||
if self.formdef.is_used():
|
||||
return _('Deletion is not possible as it is still used as datasource.')
|
||||
|
||||
if not get_publisher().is_using_postgresql():
|
||||
return None
|
||||
|
||||
criterias = [
|
||||
StrictNotEqual('status', 'draft'),
|
||||
Null('anonymised'),
|
||||
|
|
|
@ -182,23 +182,14 @@ class ManagementDirectory(Directory):
|
|||
forms_without_pending_stuff = []
|
||||
forms_with_pending_stuff = []
|
||||
|
||||
using_postgresql = get_publisher().is_using_postgresql()
|
||||
if using_postgresql:
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
actionable_counts = sql.get_actionable_counts(user_roles)
|
||||
total_counts = sql.get_total_counts(user_roles)
|
||||
actionable_counts = sql.get_actionable_counts(user_roles)
|
||||
total_counts = sql.get_total_counts(user_roles)
|
||||
|
||||
def append_form_entry(formdef):
|
||||
if using_postgresql:
|
||||
count_forms = total_counts.get(formdef.id) or 0
|
||||
waiting_forms_count = actionable_counts.get(formdef.id) or 0
|
||||
else:
|
||||
formdef_data_class = formdef.data_class()
|
||||
count_forms = formdef_data_class.count() - len(
|
||||
formdef_data_class.get_ids_with_indexed_value('status', 'draft')
|
||||
)
|
||||
waiting_forms_count = formdef_data_class.get_actionable_count(user_roles)
|
||||
count_forms = total_counts.get(formdef.id) or 0
|
||||
waiting_forms_count = actionable_counts.get(formdef.id) or 0
|
||||
if waiting_forms_count == 0:
|
||||
forms_without_pending_stuff.append((formdef, waiting_forms_count, count_forms))
|
||||
else:
|
||||
|
@ -211,11 +202,7 @@ class ManagementDirectory(Directory):
|
|||
|
||||
def top_action_links(r):
|
||||
r += htmltext('<span class="actions">')
|
||||
if (
|
||||
get_publisher().is_using_postgresql()
|
||||
and get_publisher().get_site_option('postgresql_views') != 'false'
|
||||
):
|
||||
r += htmltext('<a href="listing">%s</a>') % _('Global View')
|
||||
r += htmltext('<a href="listing">%s</a>') % _('Global View')
|
||||
for formdef in formdefs:
|
||||
if formdef.geolocations:
|
||||
r += htmltext(' <a href="map">%s</a>') % _('Map View')
|
||||
|
@ -261,12 +248,12 @@ class ManagementDirectory(Directory):
|
|||
|
||||
def lookup(self):
|
||||
query = get_request().form.get('query', '').strip()
|
||||
if get_publisher().is_using_postgresql():
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
formdatas = sql.AnyFormData.select([Equal('id_display', query)])
|
||||
if formdatas:
|
||||
return redirect(formdatas[0].get_url(backoffice=True))
|
||||
|
||||
formdatas = sql.AnyFormData.select([Equal('id_display', query)])
|
||||
if formdatas:
|
||||
return redirect(formdatas[0].get_url(backoffice=True))
|
||||
if any(x for x in FormDef.select(lightweight=True) if x.enable_tracking_codes):
|
||||
try:
|
||||
tracking_code = get_publisher().tracking_code_class.get(query)
|
||||
|
@ -433,27 +420,14 @@ class ManagementDirectory(Directory):
|
|||
period_start = parsed_values.get('period_start')
|
||||
period_end = parsed_values.get('period_end')
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
from wcs import sql
|
||||
from wcs import sql
|
||||
|
||||
formdef_totals = sql.get_formdef_totals(period_start, period_end, criterias)
|
||||
counts = {str(x): y for x, y in formdef_totals}
|
||||
else:
|
||||
for formdef in formdefs:
|
||||
values = formdef.data_class().select(criterias)
|
||||
counts[formdef.id] = len(values)
|
||||
|
||||
do_graphs = False
|
||||
if (
|
||||
get_publisher().is_using_postgresql()
|
||||
and get_publisher().get_site_option('postgresql_views') != 'false'
|
||||
):
|
||||
do_graphs = True
|
||||
formdef_totals = sql.get_formdef_totals(period_start, period_end, criterias)
|
||||
counts = {str(x): y for x, y in formdef_totals}
|
||||
|
||||
r += htmltext('<p>%s %s</p>') % (_('Total count:'), sum(counts.values()))
|
||||
|
||||
if do_graphs:
|
||||
r += htmltext('<div class="splitcontent-left">')
|
||||
r += htmltext('<div class="splitcontent-left">')
|
||||
cats = Category.select()
|
||||
for cat in cats:
|
||||
category_formdefs = [x for x in formdefs if x.category_id == cat.id]
|
||||
|
@ -462,11 +436,10 @@ class ManagementDirectory(Directory):
|
|||
category_formdefs = [x for x in formdefs if x.category_id is None]
|
||||
r += self.category_global_stats(_('Misc'), category_formdefs, counts)
|
||||
|
||||
if do_graphs:
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('<div class="splitcontent-right">')
|
||||
r += do_graphs_section(period_start, period_end, criterias=[StrictNotEqual('status', 'draft')])
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('<div class="splitcontent-right">')
|
||||
r += do_graphs_section(period_start, period_end, criterias=[StrictNotEqual('status', 'draft')])
|
||||
r += htmltext('</div>')
|
||||
|
||||
return r.getvalue()
|
||||
|
||||
|
@ -597,9 +570,6 @@ class ManagementDirectory(Directory):
|
|||
return r.getvalue()
|
||||
|
||||
def listing(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
raise errors.TraversalError()
|
||||
|
||||
get_response().add_javascript(['wcs.listing.js'])
|
||||
from wcs import sql
|
||||
|
||||
|
@ -731,8 +701,6 @@ class ManagementDirectory(Directory):
|
|||
return rt.getvalue()
|
||||
|
||||
def count(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
raise errors.TraversalError()
|
||||
if not (FormDef.exists()):
|
||||
return misc.json_response({'count': 0})
|
||||
from wcs import sql
|
||||
|
@ -754,8 +722,6 @@ class ManagementDirectory(Directory):
|
|||
return json.dumps(geojson_formdatas(formdatas, fields=fields), cls=misc.JSONEncoder)
|
||||
|
||||
def map(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
raise errors.TraversalError()
|
||||
get_response().add_javascript(['wcs.listing.js', 'qommon.map.js'])
|
||||
html_top('management', _('Global Map'))
|
||||
r = TemplateIO(html=True)
|
||||
|
@ -1051,9 +1017,8 @@ class FormPage(Directory):
|
|||
'user-id',
|
||||
'user-function',
|
||||
'submission-agent-id',
|
||||
'date',
|
||||
]
|
||||
if get_publisher().is_using_postgresql():
|
||||
types.append('date')
|
||||
return types
|
||||
|
||||
def get_filter_sidebar(
|
||||
|
@ -1200,8 +1165,6 @@ class FormPage(Directory):
|
|||
('pending', C_('formdata|Open'), None),
|
||||
('done', _('Done'), None),
|
||||
]
|
||||
if mode == 'stats' and not get_publisher().is_using_postgresql():
|
||||
filters = [x for x in filters if x[0] != 'waiting']
|
||||
for status in waitpoint_status:
|
||||
filters.append((status.id, status.name, status.colour))
|
||||
for filter_id, filter_label, filter_colour in filters:
|
||||
|
@ -1282,73 +1245,47 @@ class FormPage(Directory):
|
|||
elif filter_field.type in ('item', 'items'):
|
||||
filter_field.required = False
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
# Get options from existing formdatas.
|
||||
# This allows for options that don't appear anymore in the
|
||||
# data source to be listed (for example because the field
|
||||
# is using a parametrized URL depending on unavailable
|
||||
# variables, or simply returning different results now).
|
||||
display_mode = 'select'
|
||||
if filter_field.type == 'item' and filter_field.get_display_mode() == 'autocomplete':
|
||||
display_mode = 'select2'
|
||||
# Get options from existing formdatas.
|
||||
# This allows for options that don't appear anymore in the
|
||||
# data source to be listed (for example because the field
|
||||
# is using a parametrized URL depending on unavailable
|
||||
# variables, or simply returning different results now).
|
||||
display_mode = 'select'
|
||||
if filter_field.type == 'item' and filter_field.get_display_mode() == 'autocomplete':
|
||||
display_mode = 'select2'
|
||||
|
||||
if display_mode == 'select':
|
||||
options = self.get_item_filter_options(
|
||||
filter_field,
|
||||
selected_filter,
|
||||
selected_filter_operator,
|
||||
criterias,
|
||||
)
|
||||
options = [(x[0], x[1], x[0]) for x in options]
|
||||
options.insert(0, (None, '', ''))
|
||||
attrs = {'data-refresh-options': str(filter_field.contextual_id)}
|
||||
else:
|
||||
options = [(filter_field_value, filter_field_value or '', filter_field_value or '')]
|
||||
attrs = {'data-remote-options': str(filter_field.contextual_id)}
|
||||
get_response().add_javascript(
|
||||
['jquery.js', '../../i18n.js', 'qommon.forms.js', 'select2.js']
|
||||
)
|
||||
get_response().add_css_include('select2.css')
|
||||
if self.view and self.view.visibility == 'datasource':
|
||||
options.append(('{}', _('custom value'), '{}'))
|
||||
if filter_field_value and filter_field_value not in [x[0] for x in options]:
|
||||
options.append((filter_field_value, filter_field_value, filter_field_value))
|
||||
attrs['data-allow-template'] = 'true'
|
||||
|
||||
widget = SingleSelectWidget(
|
||||
filter_field_key,
|
||||
title=filter_field.label,
|
||||
options=options,
|
||||
value=filter_field_value,
|
||||
render_br=False,
|
||||
attrs=attrs,
|
||||
if display_mode == 'select':
|
||||
options = self.get_item_filter_options(
|
||||
filter_field,
|
||||
selected_filter,
|
||||
selected_filter_operator,
|
||||
criterias,
|
||||
)
|
||||
r += render_widget(widget, operators)
|
||||
|
||||
options = [(x[0], x[1], x[0]) for x in options]
|
||||
options.insert(0, (None, '', ''))
|
||||
attrs = {'data-refresh-options': str(filter_field.contextual_id)}
|
||||
else:
|
||||
# In pickle environments, get options from data source
|
||||
options = filter_field.get_options()
|
||||
if options:
|
||||
if len(options[0]) == 2:
|
||||
options = [(x[0], x[1], x[0]) for x in options]
|
||||
options.insert(0, (None, '', ''))
|
||||
widget = SingleSelectWidget(
|
||||
filter_field_key,
|
||||
title=filter_field.label,
|
||||
options=options,
|
||||
value=filter_field_value,
|
||||
render_br=False,
|
||||
)
|
||||
r += render_widget(widget, operators)
|
||||
else:
|
||||
# and fall back on a string widget if there are none.
|
||||
widget = StringWidget(
|
||||
filter_field_key,
|
||||
title=filter_field.label,
|
||||
value=filter_field_value,
|
||||
render_br=False,
|
||||
)
|
||||
r += render_widget(widget, operators)
|
||||
options = [(filter_field_value, filter_field_value or '', filter_field_value or '')]
|
||||
attrs = {'data-remote-options': str(filter_field.contextual_id)}
|
||||
get_response().add_javascript(
|
||||
['jquery.js', '../../i18n.js', 'qommon.forms.js', 'select2.js']
|
||||
)
|
||||
get_response().add_css_include('select2.css')
|
||||
if self.view and self.view.visibility == 'datasource':
|
||||
options.append(('{}', _('custom value'), '{}'))
|
||||
if filter_field_value and filter_field_value not in [x[0] for x in options]:
|
||||
options.append((filter_field_value, filter_field_value, filter_field_value))
|
||||
attrs['data-allow-template'] = 'true'
|
||||
|
||||
widget = SingleSelectWidget(
|
||||
filter_field_key,
|
||||
title=filter_field.label,
|
||||
options=options,
|
||||
value=filter_field_value,
|
||||
render_br=False,
|
||||
attrs=attrs,
|
||||
)
|
||||
r += render_widget(widget, operators)
|
||||
|
||||
elif filter_field.type == 'bool':
|
||||
options = [(None, '', ''), (True, _('Yes'), 'true'), (False, _('No'), 'false')]
|
||||
|
@ -1421,19 +1358,17 @@ class FormPage(Directory):
|
|||
if limit:
|
||||
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
if order_by is None:
|
||||
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
|
||||
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
|
||||
if order_by is None:
|
||||
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
|
||||
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
r += htmltext('<h3>%s</h3>') % self.search_label
|
||||
if get_request().form.get('q'):
|
||||
q = force_text(get_request().form.get('q'))
|
||||
r += htmltext('<input class="inline-input" name="q" value="%s">') % force_str(q)
|
||||
else:
|
||||
r += htmltext('<input class="inline-input" name="q">')
|
||||
r += htmltext('<button class="side-button">%s</button>') % _('Search')
|
||||
r += htmltext('<h3>%s</h3>') % self.search_label
|
||||
if get_request().form.get('q'):
|
||||
q = force_text(get_request().form.get('q'))
|
||||
r += htmltext('<input class="inline-input" name="q" value="%s">') % force_str(q)
|
||||
else:
|
||||
r += htmltext('<input class="inline-input" name="q">')
|
||||
r += htmltext('<button class="side-button">%s</button>') % _('Search')
|
||||
|
||||
r += self.get_filter_sidebar(
|
||||
selected_filter=selected_filter,
|
||||
|
@ -1664,18 +1599,14 @@ class FormPage(Directory):
|
|||
yield FakeField('last_update_time', 'last_update_time', _('Last Modified'))
|
||||
|
||||
# user fields
|
||||
if not get_publisher().is_using_postgresql():
|
||||
# full name of user, this will load individual user objects to get it
|
||||
yield FakeField('user-label', 'user-label', _('User Label'))
|
||||
else:
|
||||
# user-label field but as a custom field, to get full name of user
|
||||
# using a sql join clause.
|
||||
yield UserLabelRelatedField()
|
||||
for field in get_publisher().user_class.get_fields():
|
||||
if not hasattr(field, 'get_view_value'):
|
||||
continue
|
||||
field.has_relations = True
|
||||
yield UserRelatedField(field)
|
||||
# user-label field but as a custom field, to get full name of user
|
||||
# using a sql join clause.
|
||||
yield UserLabelRelatedField()
|
||||
for field in get_publisher().user_class.get_fields():
|
||||
if not hasattr(field, 'get_view_value'):
|
||||
continue
|
||||
field.has_relations = True
|
||||
yield UserRelatedField(field)
|
||||
|
||||
for field in self.formdef.iter_fields(include_block_fields=True):
|
||||
if getattr(field, 'block_field', None):
|
||||
|
@ -1683,8 +1614,6 @@ class FormPage(Directory):
|
|||
# not yet
|
||||
continue
|
||||
yield field
|
||||
if not get_publisher().is_using_postgresql():
|
||||
continue
|
||||
if field.key == 'block':
|
||||
continue
|
||||
if getattr(field, 'block_field', None):
|
||||
|
@ -2145,14 +2074,9 @@ class FormPage(Directory):
|
|||
selected_filter_operator = self.get_filter_operator_from_query()
|
||||
criterias = self.get_criterias_from_query()
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
# only enable pagination in SQL mode, as we do not have sorting in
|
||||
# the other case.
|
||||
limit = misc.get_int_or_400(
|
||||
get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20)
|
||||
)
|
||||
else:
|
||||
limit = misc.get_int_or_400(get_request().form.get('limit', 0))
|
||||
limit = misc.get_int_or_400(
|
||||
get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20)
|
||||
)
|
||||
offset = misc.get_int_or_400(get_request().form.get('offset', 0))
|
||||
order_by = misc.get_order_by_or_400(get_request().form.get('order_by'))
|
||||
if self.view and not order_by:
|
||||
|
@ -2251,8 +2175,7 @@ class FormPage(Directory):
|
|||
def submit_multi(self, action, selected_filter, selected_filter_operator, query, criterias):
|
||||
item_ids = get_request().form['select[]']
|
||||
if '_all' in item_ids:
|
||||
if get_publisher().is_using_postgresql():
|
||||
criterias.append(Null('anonymised'))
|
||||
criterias.append(Null('anonymised'))
|
||||
item_ids = FormDefUI(self.formdef).get_listing_item_ids(
|
||||
selected_filter,
|
||||
selected_filter_operator,
|
||||
|
@ -2444,8 +2367,7 @@ class FormPage(Directory):
|
|||
offset=offset,
|
||||
limit=limit,
|
||||
)[0]
|
||||
if get_publisher().is_using_postgresql():
|
||||
self.formdef.data_class().load_all_evolutions(items)
|
||||
self.formdef.data_class().load_all_evolutions(items)
|
||||
digest_key = 'default'
|
||||
if self.view and isinstance(self.formdef, CardDef):
|
||||
view_digest_key = 'custom-view:%s' % self.view.get_url_slug()
|
||||
|
@ -2455,26 +2377,25 @@ class FormPage(Directory):
|
|||
output = []
|
||||
prefetched_users = None
|
||||
prefetched_roles = None
|
||||
if get_publisher().is_using_postgresql:
|
||||
prefetched_users = {
|
||||
str(x.id): x
|
||||
for x in get_publisher().user_class.get_ids(
|
||||
[x.user_id for x in items if x.user_id], ignore_errors=True
|
||||
)
|
||||
if x is not None
|
||||
}
|
||||
role_ids = set((self.formdef.workflow_roles or {}).values())
|
||||
for filled in items:
|
||||
if filled.workflow_roles:
|
||||
for value in filled.workflow_roles.values():
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
role_ids |= set(value)
|
||||
prefetched_roles = {
|
||||
str(x.id): x
|
||||
for x in get_publisher().role_class.get_ids(role_ids, ignore_errors=True)
|
||||
if x is not None
|
||||
}
|
||||
prefetched_users = {
|
||||
str(x.id): x
|
||||
for x in get_publisher().user_class.get_ids(
|
||||
[x.user_id for x in items if x.user_id], ignore_errors=True
|
||||
)
|
||||
if x is not None
|
||||
}
|
||||
role_ids = set((self.formdef.workflow_roles or {}).values())
|
||||
for filled in items:
|
||||
if filled.workflow_roles:
|
||||
for value in filled.workflow_roles.values():
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
role_ids |= set(value)
|
||||
prefetched_roles = {
|
||||
str(x.id): x
|
||||
for x in get_publisher().role_class.get_ids(role_ids, ignore_errors=True)
|
||||
if x is not None
|
||||
}
|
||||
for filled in items:
|
||||
data = filled.get_json_export_dict(
|
||||
include_files=False,
|
||||
|
@ -2717,7 +2638,6 @@ class FormPage(Directory):
|
|||
get_response().filter['sidebar'] = self.get_formdata_sidebar() + self.get_stats_sidebar(
|
||||
selected_filter
|
||||
)
|
||||
do_graphs = get_publisher().is_using_postgresql()
|
||||
|
||||
displayed_criterias = None
|
||||
if selected_filter and selected_filter != 'all':
|
||||
|
@ -2749,10 +2669,9 @@ class FormPage(Directory):
|
|||
criterias = [StrictNotEqual('status', 'draft')] + displayed_criterias
|
||||
|
||||
values = self.formdef.data_class().select(criterias)
|
||||
if get_publisher().is_using_postgresql():
|
||||
# load all evolutions in a single batch, to avoid as many query as
|
||||
# there are formdata when computing resolution times statistics.
|
||||
self.formdef.data_class().load_all_evolutions(values)
|
||||
# load all evolutions in a single batch, to avoid as many query as
|
||||
# there are formdata when computing resolution times statistics.
|
||||
self.formdef.data_class().load_all_evolutions(values)
|
||||
|
||||
r += htmltext('<div id="statistics">')
|
||||
if displayed_criterias:
|
||||
|
@ -2764,8 +2683,7 @@ class FormPage(Directory):
|
|||
if criteria_label:
|
||||
r += htmltext('<li>%s</li>') % criteria_label
|
||||
r += htmltext('</ul></div>')
|
||||
if do_graphs:
|
||||
r += htmltext('<div class="splitcontent-left">')
|
||||
r += htmltext('<div class="splitcontent-left">')
|
||||
|
||||
no_forms = len(values)
|
||||
r += htmltext('<div class="bo-block">')
|
||||
|
@ -2799,12 +2717,11 @@ class FormPage(Directory):
|
|||
r += stats_times
|
||||
r += htmltext('</div>')
|
||||
|
||||
if do_graphs:
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('<div class="splitcontent-right">')
|
||||
criterias.append(Equal('formdef_id', int(self.formdef.id)))
|
||||
r += do_graphs_section(criterias=criterias)
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('</div>')
|
||||
r += htmltext('<div class="splitcontent-right">')
|
||||
criterias.append(Equal('formdef_id', int(self.formdef.id)))
|
||||
r += do_graphs_section(criterias=criterias)
|
||||
r += htmltext('</div>')
|
||||
|
||||
r += htmltext('</div>') # id="statistics"
|
||||
|
||||
|
@ -3165,11 +3082,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
|
|||
if formdata.formdef.lateral_template:
|
||||
r += htmltext('<div data-async-url="%slateral-block"></div>' % formdata.get_url(backoffice=True))
|
||||
|
||||
if (
|
||||
not isinstance(formdata.formdef, CardDef)
|
||||
and formdata.user_id
|
||||
and get_publisher().is_using_postgresql()
|
||||
):
|
||||
if not isinstance(formdata.formdef, CardDef) and formdata.user_id:
|
||||
r += htmltext(
|
||||
'<div data-async-url="%suser-pending-forms"></div>' % formdata.get_url(backoffice=True)
|
||||
)
|
||||
|
|
|
@ -202,15 +202,9 @@ class FormFillPage(PublicFormFillPage):
|
|||
return
|
||||
|
||||
data_class = self.formdef.data_class()
|
||||
if get_publisher().is_using_postgresql():
|
||||
has = bool(
|
||||
data_class.count([StrictNotEqual('status', 'draft'), Equal('user_id', formdata.user_id)])
|
||||
)
|
||||
else:
|
||||
has = any(
|
||||
x for x in data_class.get_with_indexed_value('user_id', formdata.user_id) if not x.is_draft()
|
||||
)
|
||||
context['user_has_already_one_such_form'] = has
|
||||
context['user_has_already_one_such_form'] = bool(
|
||||
data_class.count([StrictNotEqual('status', 'draft'), Equal('user_id', formdata.user_id)])
|
||||
)
|
||||
|
||||
def get_sidebar(self, data):
|
||||
r = TemplateIO(html=True)
|
||||
|
|
|
@ -60,7 +60,7 @@ class CardDef(FormDef):
|
|||
# carddef
|
||||
if data_class._formdef is self:
|
||||
return data_class
|
||||
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
|
||||
if (not mode == 'files') or mode == 'sql':
|
||||
from . import sql
|
||||
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
|
|
|
@ -51,70 +51,66 @@ class CmdDeleteTenant(Command):
|
|||
deletion_date = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
|
||||
os.rename(pub.app_dir, pub.app_dir + '_removed_%s.invalid' % deletion_date)
|
||||
|
||||
# do this only if the wcs has a postgresql configuration
|
||||
if pub.is_using_postgresql():
|
||||
postgresql_cfg = {}
|
||||
for k, v in pub.cfg['postgresql'].items():
|
||||
if v and isinstance(v, str):
|
||||
postgresql_cfg[k] = v
|
||||
postgresql_cfg = {}
|
||||
for k, v in pub.cfg['postgresql'].items():
|
||||
if v and isinstance(v, str):
|
||||
postgresql_cfg[k] = v
|
||||
|
||||
# if there's a createdb-connection-params, we can do a DROP DATABASE with
|
||||
# the option --force-drop, rename it if not
|
||||
createdb_cfg = pub.cfg['postgresql'].get('createdb-connection-params', {})
|
||||
createdb = True
|
||||
if not createdb_cfg:
|
||||
createdb_cfg = postgresql_cfg
|
||||
createdb = False
|
||||
if 'database' in createdb_cfg:
|
||||
createdb_cfg['dbname'] = createdb_cfg.pop('database')
|
||||
try:
|
||||
pgconn = psycopg2.connect(**createdb_cfg)
|
||||
except psycopg2.Error as e:
|
||||
print(
|
||||
'failed to connect to postgresql (%s)' % psycopg2.errorcodes.lookup(e.pgcode),
|
||||
file=sys.stderr,
|
||||
)
|
||||
return
|
||||
# if there's a createdb-connection-params, we can do a DROP DATABASE with
|
||||
# the option --force-drop, rename it if not
|
||||
createdb_cfg = pub.cfg['postgresql'].get('createdb-connection-params', {})
|
||||
createdb = True
|
||||
if not createdb_cfg:
|
||||
createdb_cfg = postgresql_cfg
|
||||
createdb = False
|
||||
if 'database' in createdb_cfg:
|
||||
createdb_cfg['dbname'] = createdb_cfg.pop('database')
|
||||
try:
|
||||
pgconn = psycopg2.connect(**createdb_cfg)
|
||||
except psycopg2.Error as e:
|
||||
print(
|
||||
'failed to connect to postgresql (%s)' % psycopg2.errorcodes.lookup(e.pgcode),
|
||||
file=sys.stderr,
|
||||
)
|
||||
return
|
||||
|
||||
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pgconn.cursor()
|
||||
dbname = postgresql_cfg.get('dbname') or postgresql_cfg.get('database')
|
||||
try:
|
||||
if createdb:
|
||||
if options.force_drop:
|
||||
cur.execute('DROP DATABASE %s' % dbname)
|
||||
else:
|
||||
cur.execute(
|
||||
'ALTER DATABASE %s RENAME TO removed_%s_%s' % (dbname, deletion_date, dbname)
|
||||
)
|
||||
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pgconn.cursor()
|
||||
dbname = postgresql_cfg.get('dbname') or postgresql_cfg.get('database')
|
||||
try:
|
||||
if createdb:
|
||||
if options.force_drop:
|
||||
cur.execute('DROP DATABASE %s' % dbname)
|
||||
else:
|
||||
cur.execute(
|
||||
"""SELECT table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_type = 'BASE TABLE'"""
|
||||
)
|
||||
|
||||
tables_names = [x[0] for x in cur.fetchall()]
|
||||
|
||||
if options.force_drop:
|
||||
for table_name in tables_names:
|
||||
cur.execute('DROP TABLE %s CASCADE' % table_name)
|
||||
|
||||
else:
|
||||
schema_name = 'removed_%s_%s' % (deletion_date, dbname)
|
||||
cur.execute("CREATE SCHEMA %s" % schema_name[:63])
|
||||
for table_name in tables_names:
|
||||
cur.execute('ALTER TABLE %s SET SCHEMA %s' % (table_name, schema_name[:63]))
|
||||
|
||||
except psycopg2.Error as e:
|
||||
print(
|
||||
'failed to alter database %s: (%s)' % (dbname, psycopg2.errorcodes.lookup(e.pgcode)),
|
||||
file=sys.stderr,
|
||||
cur.execute('ALTER DATABASE %s RENAME TO removed_%s_%s' % (dbname, deletion_date, dbname))
|
||||
else:
|
||||
cur.execute(
|
||||
"""SELECT table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_type = 'BASE TABLE'"""
|
||||
)
|
||||
return
|
||||
|
||||
cur.close()
|
||||
tables_names = [x[0] for x in cur.fetchall()]
|
||||
|
||||
if options.force_drop:
|
||||
for table_name in tables_names:
|
||||
cur.execute('DROP TABLE %s CASCADE' % table_name)
|
||||
|
||||
else:
|
||||
schema_name = 'removed_%s_%s' % (deletion_date, dbname)
|
||||
cur.execute("CREATE SCHEMA %s" % schema_name[:63])
|
||||
for table_name in tables_names:
|
||||
cur.execute('ALTER TABLE %s SET SCHEMA %s' % (table_name, schema_name[:63]))
|
||||
|
||||
except psycopg2.Error as e:
|
||||
print(
|
||||
'failed to alter database %s: (%s)' % (dbname, psycopg2.errorcodes.lookup(e.pgcode)),
|
||||
file=sys.stderr,
|
||||
)
|
||||
return
|
||||
|
||||
cur.close()
|
||||
|
||||
|
||||
CmdDeleteTenant.register()
|
||||
|
|
|
@ -28,18 +28,6 @@ def rebuild_vhost_indexes(pub, destroy=False):
|
|||
if destroy:
|
||||
Role.destroy_indexes()
|
||||
Role.rebuild_indexes()
|
||||
if pub.is_using_postgresql():
|
||||
return
|
||||
from wcs.users import User
|
||||
|
||||
if destroy:
|
||||
User.destroy_indexes()
|
||||
User.rebuild_indexes()
|
||||
for formdef in FormDef.select():
|
||||
formdata = formdef.data_class()
|
||||
if destroy:
|
||||
formdata.destroy_indexes()
|
||||
formdata.rebuild_indexes()
|
||||
|
||||
|
||||
class CmdRebuildIndexes(Command):
|
||||
|
|
|
@ -424,16 +424,13 @@ class FormData(StorableObject):
|
|||
|
||||
@classmethod
|
||||
def get_actionable_count(cls, user_roles):
|
||||
if get_publisher().is_using_postgresql():
|
||||
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
|
||||
criterias = [
|
||||
Intersects('actions_roles_array', user_roles),
|
||||
Contains('status', statuses),
|
||||
Null('anonymised'),
|
||||
]
|
||||
return cls.count(criterias)
|
||||
else:
|
||||
return len(cls.get_actionable_ids(user_roles))
|
||||
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
|
||||
criterias = [
|
||||
Intersects('actions_roles_array', user_roles),
|
||||
Contains('status', statuses),
|
||||
Null('anonymised'),
|
||||
]
|
||||
return cls.count(criterias)
|
||||
|
||||
@classmethod
|
||||
def get_actionable_ids_criteria(cls, user_roles):
|
||||
|
@ -442,17 +439,7 @@ class FormData(StorableObject):
|
|||
|
||||
@classmethod
|
||||
def get_actionable_ids(cls, user_roles):
|
||||
if get_publisher().is_using_postgresql():
|
||||
return cls.keys([cls.get_actionable_ids_criteria(user_roles)])
|
||||
else:
|
||||
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
|
||||
actions_ids = set()
|
||||
for role in user_roles:
|
||||
actions_ids |= set(cls.get_ids_with_indexed_value('actions_roles', str(role)))
|
||||
open_ids = []
|
||||
for status_id in statuses:
|
||||
open_ids.extend(cls.get_ids_with_indexed_value('status', status_id))
|
||||
return list(actions_ids.intersection(open_ids))
|
||||
return cls.keys([cls.get_actionable_ids_criteria(user_roles)])
|
||||
|
||||
@classmethod
|
||||
def get_submission_channels(cls):
|
||||
|
|
|
@ -44,7 +44,7 @@ from .qommon.cron import CronJob
|
|||
from .qommon.form import Form, HtmlWidget, UploadedFile
|
||||
from .qommon.misc import JSONEncoder, get_as_datetime, is_attachment, is_upload, simplify, xml_node_text
|
||||
from .qommon.publisher import get_publisher_class
|
||||
from .qommon.storage import Equal, StorableObject, StrictNotEqual, fix_key
|
||||
from .qommon.storage import Equal, StorableObject, fix_key
|
||||
from .qommon.substitution import Substitutions
|
||||
from .qommon.template import Template
|
||||
from .roles import logged_users_role
|
||||
|
@ -329,7 +329,7 @@ class FormDef(StorableObject):
|
|||
@classmethod
|
||||
def remove_object(cls, id):
|
||||
super().remove_object(id)
|
||||
if get_publisher().is_using_postgresql() and cls == FormDef:
|
||||
if cls == FormDef:
|
||||
# recreate global views so they don't reference formdata from
|
||||
# deleted formefs
|
||||
from . import sql
|
||||
|
@ -352,7 +352,7 @@ class FormDef(StorableObject):
|
|||
# formdef
|
||||
if data_class._formdef is self:
|
||||
return data_class
|
||||
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
|
||||
if (not mode == 'files') or mode == 'sql':
|
||||
from . import sql
|
||||
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
|
@ -411,8 +411,7 @@ class FormDef(StorableObject):
|
|||
@classmethod
|
||||
def get_new_id(cls, create=False):
|
||||
id = super().get_new_id(create=False)
|
||||
if get_publisher().is_using_postgresql():
|
||||
id = cls.get_sql_new_id(id_start=int(id))
|
||||
id = cls.get_sql_new_id(id_start=int(id))
|
||||
if create:
|
||||
objects_dir = cls.get_objects_dir()
|
||||
object_filename = os.path.join(objects_dir, fix_key(id))
|
||||
|
@ -443,8 +442,7 @@ class FormDef(StorableObject):
|
|||
@classmethod
|
||||
def wipe(cls):
|
||||
super().wipe()
|
||||
if get_publisher().is_using_postgresql():
|
||||
cls.sql_wipe()
|
||||
cls.sql_wipe()
|
||||
|
||||
@classmethod
|
||||
def sql_wipe(cls):
|
||||
|
@ -457,6 +455,9 @@ class FormDef(StorableObject):
|
|||
if self.url_name is None:
|
||||
# set url name if it's not yet there
|
||||
self.url_name = self.get_new_url_name()
|
||||
|
||||
object_only = kwargs.pop('object_only', False)
|
||||
|
||||
new_internal_identifier = self.get_new_internal_identifier()
|
||||
if not self.internal_identifier:
|
||||
self.internal_identifier = new_internal_identifier
|
||||
|
@ -466,9 +467,8 @@ class FormDef(StorableObject):
|
|||
# or if there are not yet any submitted forms (or if site
|
||||
# is using the SQL storage as internal identifier is not used
|
||||
# in that mode.
|
||||
if self.id is None or get_publisher().is_using_postgresql() or not (self.data_class().exists()):
|
||||
if self.id is None or not self.data_class().exists():
|
||||
self.internal_identifier = new_internal_identifier
|
||||
object_only = kwargs.pop('object_only', False)
|
||||
|
||||
if not object_only:
|
||||
self.update_relations()
|
||||
|
@ -487,9 +487,6 @@ class FormDef(StorableObject):
|
|||
self.store_related_custom_views()
|
||||
|
||||
def update_storage(self):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
return
|
||||
|
||||
from . import sql
|
||||
|
||||
actions = sql.do_formdef_tables(self, rebuild_views=True, rebuild_global_views=True)
|
||||
|
@ -1919,7 +1916,6 @@ class FormDef(StorableObject):
|
|||
status.id in status_mapping for status in old_workflow.possible_status
|
||||
), 'a status was not mapped'
|
||||
|
||||
unmapped_status_suffix = '-invalid-%s' % str(self.workflow_id or 'default')
|
||||
mapping = {}
|
||||
for old_status, new_status in status_mapping.items():
|
||||
mapping['wf-%s' % old_status] = 'wf-%s' % new_status
|
||||
|
@ -1927,28 +1923,9 @@ class FormDef(StorableObject):
|
|||
|
||||
if any(x[0] != x[1] for x in mapping.items()):
|
||||
# if there are status changes, update all formdatas (except drafts)
|
||||
if get_publisher().is_using_postgresql():
|
||||
from . import sql
|
||||
from . import sql
|
||||
|
||||
sql.formdef_remap_statuses(self, mapping)
|
||||
else:
|
||||
|
||||
def map_status(status):
|
||||
if status is None:
|
||||
return None
|
||||
elif status in mapping:
|
||||
return mapping[status]
|
||||
elif '-invalid-' in status:
|
||||
return status
|
||||
else:
|
||||
return '%s%s' % (status, unmapped_status_suffix)
|
||||
|
||||
for formdata in self.data_class().select([StrictNotEqual('status', 'draft')]):
|
||||
formdata.status = map_status(formdata.status)
|
||||
if formdata.evolution:
|
||||
for evo in formdata.evolution:
|
||||
evo.status = map_status(evo.status)
|
||||
formdata.store()
|
||||
sql.formdef_remap_statuses(self, mapping)
|
||||
|
||||
self.workflow = new_workflow
|
||||
if new_workflow.has_action('geolocate') and not self.geolocations:
|
||||
|
|
|
@ -46,15 +46,12 @@ class FormDefUI:
|
|||
):
|
||||
# noqa pylint: disable=too-many-arguments
|
||||
|
||||
using_postgresql = get_publisher().is_using_postgresql()
|
||||
|
||||
if not items:
|
||||
if offset and not limit:
|
||||
limit = int(get_publisher().get_site_option('default-page-size') or 20)
|
||||
if not criterias:
|
||||
criterias = []
|
||||
if using_postgresql:
|
||||
criterias.append(Null('anonymised'))
|
||||
criterias.append(Null('anonymised'))
|
||||
items, total_count = self.get_listing_items(
|
||||
fields,
|
||||
selected_filter,
|
||||
|
@ -101,7 +98,7 @@ class FormDefUI:
|
|||
r += htmltext('</colgroup>')
|
||||
|
||||
r += htmltext('<thead><tr>')
|
||||
if self.formdef.workflow.criticality_levels and using_postgresql:
|
||||
if self.formdef.workflow.criticality_levels:
|
||||
r += htmltext(
|
||||
'<th class="criticality-level-cell" data-field-sort-key="criticality_level"><span></span></th>'
|
||||
)
|
||||
|
@ -127,7 +124,7 @@ class FormDefUI:
|
|||
else:
|
||||
field_sort_key = 'f%s' % f.contextual_id
|
||||
|
||||
if field_sort_key and using_postgresql:
|
||||
if field_sort_key:
|
||||
r += htmltext('<th data-field-sort-key="%s">') % field_sort_key
|
||||
else:
|
||||
r += htmltext('<th>')
|
||||
|
@ -144,9 +141,8 @@ class FormDefUI:
|
|||
r += htmltext('</tbody>')
|
||||
r += htmltext('</table>')
|
||||
|
||||
if get_publisher().is_using_postgresql():
|
||||
# add links to paginate
|
||||
r += pagination_links(offset, limit, total_count)
|
||||
# add links to paginate
|
||||
r += pagination_links(offset, limit, total_count)
|
||||
|
||||
return r.getvalue()
|
||||
|
||||
|
@ -159,96 +155,6 @@ class FormDefUI:
|
|||
user=None,
|
||||
criterias=None,
|
||||
anonymise=False,
|
||||
):
|
||||
if get_publisher().is_using_postgresql():
|
||||
return self.get_listing_item_ids_sql(
|
||||
selected_filter, selected_filter_operator, query, order_by, user, criterias, anonymise
|
||||
)
|
||||
|
||||
def get_all_except_drafts():
|
||||
item_ids = formdata_class.keys()
|
||||
drafts = formdata_class.get_ids_with_indexed_value('status', 'draft')
|
||||
return [x for x in item_ids if x not in drafts]
|
||||
|
||||
formdata_class = self.formdef.data_class()
|
||||
# used for postgresql mode only
|
||||
if selected_filter == 'all':
|
||||
if selected_filter_operator == 'ne':
|
||||
# nothing
|
||||
item_ids = []
|
||||
else:
|
||||
# all except drafts
|
||||
item_ids = get_all_except_drafts()
|
||||
elif selected_filter == 'waiting':
|
||||
user_roles = [logged_users_role().id] + user.get_roles()
|
||||
waiting_item_ids = formdata_class.get_actionable_ids(user_roles)
|
||||
if selected_filter_operator == 'ne':
|
||||
# select all ids except drafts
|
||||
item_ids = get_all_except_drafts()
|
||||
# exclude waiting ids
|
||||
item_ids = [x for x in item_ids if x not in waiting_item_ids]
|
||||
else:
|
||||
# only waiting ids
|
||||
item_ids = waiting_item_ids
|
||||
else:
|
||||
# build selected status list
|
||||
applied_filters = []
|
||||
if selected_filter == 'pending':
|
||||
applied_filters = ['wf-%s' % x.id for x in self.formdef.workflow.get_not_endpoint_status()]
|
||||
elif selected_filter == 'done':
|
||||
applied_filters = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
|
||||
else:
|
||||
applied_filters = ['wf-%s' % selected_filter]
|
||||
|
||||
filtered_item_ids = []
|
||||
for status_id in applied_filters:
|
||||
filtered_item_ids.extend(
|
||||
formdata_class.get_ids_with_indexed_value(
|
||||
'status',
|
||||
status_id,
|
||||
)
|
||||
)
|
||||
if selected_filter_operator == 'ne':
|
||||
# select all ids except drafts
|
||||
item_ids = get_all_except_drafts()
|
||||
# exclude selected status list
|
||||
item_ids = [x for x in item_ids if x not in filtered_item_ids]
|
||||
else:
|
||||
# only selected status list
|
||||
item_ids = filtered_item_ids
|
||||
|
||||
if query:
|
||||
query_ids = formdata_class.get_ids_from_query(query)
|
||||
item_ids = list(set(item_ids).intersection(query_ids))
|
||||
|
||||
if criterias:
|
||||
select_ids = formdata_class.keys(clause=criterias)
|
||||
item_ids = list(set(item_ids).intersection(select_ids))
|
||||
|
||||
if item_ids and not anonymise:
|
||||
# as we are in the backoffice, we don't have to care about the
|
||||
# situation where the user is the submitter, and we limit ourselves
|
||||
# to consider treating roles.
|
||||
if not user.is_admin:
|
||||
user_roles = set(user.get_roles())
|
||||
concerned_ids = set()
|
||||
for role in user_roles:
|
||||
concerned_ids |= set(
|
||||
formdata_class.get_ids_with_indexed_value('concerned_roles', str(role))
|
||||
)
|
||||
item_ids = list(set(item_ids).intersection(concerned_ids))
|
||||
|
||||
return item_ids
|
||||
|
||||
def get_listing_item_ids_sql(
|
||||
self,
|
||||
selected_filter,
|
||||
selected_filter_operator,
|
||||
query,
|
||||
order_by,
|
||||
user,
|
||||
criterias,
|
||||
anonymise,
|
||||
):
|
||||
formdata_class = self.formdef.data_class()
|
||||
criterias = [] or criterias[:]
|
||||
|
@ -354,9 +260,6 @@ class FormDefUI:
|
|||
criterias=criterias,
|
||||
anonymise=anonymise,
|
||||
)
|
||||
if not get_publisher().is_using_postgresql():
|
||||
item_ids.sort(key=int)
|
||||
item_ids.reverse()
|
||||
|
||||
total_count = len(item_ids)
|
||||
|
||||
|
@ -364,8 +267,7 @@ class FormDefUI:
|
|||
offset = 0
|
||||
|
||||
kwargs = {}
|
||||
if get_publisher().is_using_postgresql():
|
||||
kwargs['fields'] = fields
|
||||
kwargs['fields'] = fields
|
||||
|
||||
if limit:
|
||||
items = formdata_class.get_ids(item_ids[offset : offset + limit], keep_order=True, **kwargs)
|
||||
|
|
|
@ -132,9 +132,6 @@ class WcsPublisher(QommonPublisher):
|
|||
|
||||
DeprecationsScanAfterJob().execute()
|
||||
|
||||
def is_using_postgresql(self):
|
||||
return True
|
||||
|
||||
def has_postgresql_config(self):
|
||||
return bool(self.cfg.get('postgresql', {}))
|
||||
|
||||
|
@ -467,10 +464,9 @@ class WcsPublisher(QommonPublisher):
|
|||
|
||||
def cleanup(self):
|
||||
self._cached_user_fields_formdef = None
|
||||
if self.is_using_postgresql():
|
||||
from . import sql
|
||||
from . import sql
|
||||
|
||||
sql.cleanup_connection()
|
||||
sql.cleanup_connection()
|
||||
|
||||
@contextmanager
|
||||
def complex_data(self):
|
||||
|
|
|
@ -84,8 +84,7 @@ def cron_worker(publisher, now, job_name=None, delayed_jobs=None):
|
|||
jobs = delayed_jobs
|
||||
else:
|
||||
# reindex user and formdata if needed (should only be run once)
|
||||
if publisher.is_using_postgresql():
|
||||
publisher.reindex_sql()
|
||||
publisher.reindex_sql()
|
||||
|
||||
jobs = []
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import random
|
|||
import string
|
||||
|
||||
from django.utils.timezone import make_aware, now
|
||||
from quixote import get_publisher
|
||||
|
||||
from .storage import Equal, Less, StorableObject
|
||||
|
||||
|
@ -75,16 +74,5 @@ class Token(StorableObject):
|
|||
|
||||
@classmethod
|
||||
def clean(cls):
|
||||
if get_publisher().is_using_postgresql():
|
||||
# noqa pylint: disable=unexpected-keyword-arg
|
||||
cls.wipe(clause=[Less('expiration', now())])
|
||||
else:
|
||||
for token_id in cls.keys():
|
||||
try:
|
||||
cls.get(token_id) # will run migrate, will check expiration
|
||||
except KeyError:
|
||||
pass
|
||||
except AttributeError:
|
||||
# old python2 tokens:
|
||||
# AttributeError: module 'builtins' has no attribute 'unicode'
|
||||
cls.remove_object(token_id)
|
||||
# noqa pylint: disable=unexpected-keyword-arg
|
||||
cls.wipe(clause=[Less('expiration', now())])
|
||||
|
|
13
wcs/sql.py
13
wcs/sql.py
|
@ -1581,9 +1581,6 @@ def do_meta_table(conn=None, cur=None, insert_current_sql_level=True):
|
|||
|
||||
@guard_postgres
|
||||
def redo_views(conn, cur, formdef, rebuild_global_views=False):
|
||||
if get_publisher().get_site_option('postgresql_views') == 'false':
|
||||
return
|
||||
|
||||
if formdef.id is None:
|
||||
return
|
||||
|
||||
|
@ -1961,9 +1958,13 @@ class SqlMixin:
|
|||
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
|
||||
sql_statement += ' LIMIT 1'
|
||||
conn, cur = get_connection_and_cursor()
|
||||
cur.execute(sql_statement, parameters)
|
||||
check = cur.fetchone()
|
||||
result = check is not None
|
||||
try:
|
||||
cur.execute(sql_statement, parameters)
|
||||
except psycopg2.errors.UndefinedTable:
|
||||
result = False
|
||||
else:
|
||||
check = cur.fetchone()
|
||||
result = check is not None
|
||||
conn.commit()
|
||||
cur.close()
|
||||
return result
|
||||
|
|
|
@ -19,7 +19,6 @@ import collections
|
|||
from django.http import HttpResponseBadRequest, HttpResponseForbidden, JsonResponse
|
||||
from django.urls import reverse
|
||||
from django.views.generic import View
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs import sql
|
||||
from wcs.api_utils import is_url_signed
|
||||
|
@ -42,9 +41,6 @@ class RestrictedView(View):
|
|||
|
||||
class IndexView(RestrictedView):
|
||||
def get(self, request, *args, **kwargs):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
return JsonResponse({'data': [], 'err': 0})
|
||||
|
||||
categories = Category.select()
|
||||
categories.sort(key=lambda x: misc.simplify(x.name))
|
||||
category_options = [{'id': '_all', 'label': C_('categories|All')}] + [
|
||||
|
@ -306,8 +302,6 @@ class FormsCountView(RestrictedView):
|
|||
for status in waitpoint_status:
|
||||
options.append((status.id, status.name))
|
||||
elif field.type in ('item', 'items'):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
continue
|
||||
options = form_page.get_item_filter_options(field, selected_filter='all', anonymised=True)
|
||||
if not options:
|
||||
continue
|
||||
|
|
|
@ -188,9 +188,6 @@ class User(StorableObject):
|
|||
|
||||
@classmethod
|
||||
def get_users_with_roles(cls, included_roles=None, excluded_roles=None, order_by=None):
|
||||
if not get_publisher().is_using_postgresql():
|
||||
return []
|
||||
|
||||
from wcs import sql
|
||||
|
||||
criterias = [sql.Null('deleted_timestamp')]
|
||||
|
|
|
@ -69,10 +69,7 @@ class LazyFormDefObjectsManager:
|
|||
)
|
||||
|
||||
def order_by(self, attribute):
|
||||
if get_publisher().is_using_postgresql():
|
||||
field = self.get_field(attribute)
|
||||
else:
|
||||
field = None # no support for field search
|
||||
field = self.get_field(attribute)
|
||||
return self._clone(self._criterias, order_by=field or attribute)
|
||||
|
||||
def limit(self, limit):
|
||||
|
|
|
@ -340,28 +340,25 @@ def _apply_timeouts(publisher, **kwargs):
|
|||
) if job else contextlib.ExitStack():
|
||||
formdata_class = formdef.data_class()
|
||||
for status_id in status_ids:
|
||||
if publisher.is_using_postgresql():
|
||||
# get minimum delay for jumps in this status
|
||||
delay = math.inf
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
|
||||
if Template.is_template_string(jump_action.timeout):
|
||||
delay = 0
|
||||
break
|
||||
delay = min(delay, int(jump_action.timeout))
|
||||
# limit delay to minimal delay
|
||||
if delay < JUMP_TIMEOUT_INTERVAL * 60:
|
||||
delay = JUMP_TIMEOUT_INTERVAL * 60
|
||||
# get minimum delay for jumps in this status
|
||||
delay = math.inf
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
|
||||
if Template.is_template_string(jump_action.timeout):
|
||||
delay = 0
|
||||
break
|
||||
delay = min(delay, int(jump_action.timeout))
|
||||
# limit delay to minimal delay
|
||||
if delay < JUMP_TIMEOUT_INTERVAL * 60:
|
||||
delay = JUMP_TIMEOUT_INTERVAL * 60
|
||||
|
||||
criterias = [
|
||||
Equal('status', status_id),
|
||||
LessOrEqual(
|
||||
'last_update_time',
|
||||
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
|
||||
),
|
||||
]
|
||||
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
|
||||
else:
|
||||
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
|
||||
criterias = [
|
||||
Equal('status', status_id),
|
||||
LessOrEqual(
|
||||
'last_update_time',
|
||||
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
|
||||
),
|
||||
]
|
||||
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
|
||||
|
||||
for formdata in formdatas:
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
|
||||
|
|
Loading…
Reference in New Issue