sql: switch to autocommit (#81133) #667
|
@ -3278,7 +3278,6 @@ def test_management_views_with_no_formdefs(pub):
|
|||
|
||||
conn, cur = get_connection_and_cursor()
|
||||
drop_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
|
|
@ -42,7 +42,6 @@ def test_backoffice_statistics_with_no_formdefs(pub):
|
|||
|
||||
conn, cur = get_connection_and_cursor()
|
||||
drop_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import configparser
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -84,37 +83,30 @@ def nocache(settings):
|
|||
|
||||
@pytest.fixture
|
||||
def sql_queries(monkeypatch):
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
|
||||
import wcs.sql
|
||||
|
||||
queries = []
|
||||
wcs.sql.cleanup_connection()
|
||||
old_connect = psycopg2.connect
|
||||
|
||||
def connect(*args, **kwargs):
|
||||
conn = old_connect(*args, **kwargs)
|
||||
mocked_conn = mock.Mock(wraps=conn)
|
||||
old_cursor = conn.cursor
|
||||
class LoggingCursor(psycopg2.extensions.cursor):
|
||||
"""A cursor that logs queries using its connection logging facilities."""
|
||||
|
||||
def cursor(*args, **kwargs):
|
||||
cur = old_cursor(*args, **kwargs)
|
||||
mocked_cur = mock.MagicMock(wraps=cur)
|
||||
old_execute = cur.execute
|
||||
def execute(self, query, vars=None):
|
||||
queries.append(query)
|
||||
return super().execute(query, vars)
|
||||
|
||||
def execute(*args, **kwargs):
|
||||
queries.append(args[0])
|
||||
return old_execute(*args, **kwargs)
|
||||
class MyLoggingConnection(wcs.sql.WcsPgConnection):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.cursor_factory = LoggingCursor
|
||||
|
||||
mocked_cur.execute = execute
|
||||
return mocked_cur
|
||||
|
||||
mocked_conn.cursor = cursor
|
||||
return mocked_conn
|
||||
|
||||
monkeypatch.setattr(psycopg2, 'connect', connect)
|
||||
backup_original_class = wcs.sql.WcsPgConnection
|
||||
wcs.sql.WcsPgConnection = MyLoggingConnection
|
||||
yield queries
|
||||
wcs.sql.cleanup_connection()
|
||||
wcs.sql.WcsPgConnection = backup_original_class
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -554,6 +554,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
|
|||
'user': 'fred',
|
||||
'dbname': 'tests_wcs_wcs_example_net',
|
||||
'application_name': 'wcs',
|
||||
'connection_factory': sql.WcsPgConnection,
|
||||
}
|
||||
assert pub.initialize_sql.call_count == 1
|
||||
|
||||
|
@ -565,6 +566,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
|
|||
'user': 'fred',
|
||||
'dbname': 'tests_wcs_wcs_example_net',
|
||||
'application_name': 'wcs',
|
||||
'connection_factory': sql.WcsPgConnection,
|
||||
}
|
||||
|
||||
pub.cfg['postgresql']['database-template-name'] = 'very_long_' * 10 + '%s'
|
||||
|
@ -575,6 +577,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
|
|||
'user': 'fred',
|
||||
'dbname': 'very_long_very_long_very_long_c426_ng_very_long_wcs_example_net',
|
||||
'application_name': 'wcs',
|
||||
'connection_factory': sql.WcsPgConnection,
|
||||
}
|
||||
assert len(connect.call_args_list[1][1]['dbname']) == 63
|
||||
assert pub.initialize_sql.call_count == 2
|
||||
|
@ -587,6 +590,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
|
|||
'user': 'fred',
|
||||
'dbname': 'test_2_wcs_example_net',
|
||||
'application_name': 'wcs',
|
||||
'connection_factory': sql.WcsPgConnection,
|
||||
}
|
||||
assert pub.initialize_sql.call_count == 3
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ def test_activate_language_all_views(pub):
|
|||
|
||||
|
||||
def test_translation_columns(pub):
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
assert not column_exists_in_table(cur, 'translatable_messages', 'string_de')
|
||||
assert not column_exists_in_table(cur, 'translatable_messages', 'string_fr')
|
||||
pub.cfg['language'] = {'language': 'en', 'multilinguism': True, 'languages': ['fr', 'de']}
|
||||
|
@ -46,7 +46,6 @@ def test_translation_columns(pub):
|
|||
pub.cfg['language'] = {'language': 'en', 'multilinguism': True, 'languages': ['fr']}
|
||||
assert column_exists_in_table(cur, 'translatable_messages', 'string_de')
|
||||
assert column_exists_in_table(cur, 'translatable_messages', 'string_fr')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
|
||||
|
|
|
@ -238,10 +238,9 @@ def get_logs(hostname=None):
|
|||
|
||||
|
||||
def get_sql_cron_statuses():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
cur.execute("SELECT key, value FROM wcs_meta WHERE key LIKE 'cron-status-%'")
|
||||
rows = cur.fetchall()
|
||||
conn.commit()
|
||||
cur.close()
|
||||
return dict(rows)
|
||||
|
||||
|
@ -458,9 +457,8 @@ def test_cron_command_rewind_jobs(settings, freezer):
|
|||
def job3(pub, job=None):
|
||||
jobs.append('job3')
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
cur.execute("DELETE FROM wcs_meta WHERE key LIKE 'cron%%'")
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
@classmethod
|
||||
|
@ -478,24 +476,22 @@ def test_cron_command_rewind_jobs(settings, freezer):
|
|||
assert jobs == []
|
||||
|
||||
# write down a past datetime in database
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
cur.execute(
|
||||
"UPDATE wcs_meta SET created_at = %s WHERE key LIKE 'cron%%'",
|
||||
(localtime() - datetime.timedelta(hours=1),),
|
||||
)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
call_command('cron')
|
||||
assert sorted(jobs) == ['job1', 'job3']
|
||||
|
||||
# since past day
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
cur.execute(
|
||||
"UPDATE wcs_meta SET created_at = %s WHERE key LIKE 'cron%%'",
|
||||
(localtime() - datetime.timedelta(days=1),),
|
||||
)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
jobs = []
|
||||
call_command('cron')
|
||||
|
|
|
@ -278,10 +278,9 @@ def test_magictoken_migration(pub, app):
|
|||
session.magictokens[transient_data.id] = transient_data.data
|
||||
sql.TransientData.wipe()
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
sql_statement = 'UPDATE sessions SET session_data = %s WHERE id = %s'
|
||||
cur.execute(sql_statement, (bytearray(pickle.dumps(session.__dict__, protocol=2)), session.id))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
# and get back to submitting form, it should run migration
|
||||
|
|
|
@ -524,6 +524,51 @@ def test_sql_rollback_on_error(formdef):
|
|||
data_class.wipe()
|
||||
|
||||
|
||||
def test_sql_atomic_rollback():
|
||||
import psycopg2.extensions
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
|
||||
# check the basic behavior first, enter/exit transaction
|
||||
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
|
||||
|
||||
with sql.atomic():
|
||||
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
|
||||
cur.execute('SELECT 1;')
|
||||
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_INTRANS
|
||||
|
||||
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
|
||||
|
||||
# now we can exit with a failure
|
||||
with pytest.raises(psycopg2.Error):
|
||||
with sql.atomic():
|
||||
cur.execute('SELECT 1/0;')
|
||||
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
|
||||
|
||||
# Do some changes in a sub transaction
|
||||
with sql.atomic():
|
||||
with sql.atomic():
|
||||
cur.execute('CREATE TEMPORARY TABLE atomic_test_temp_ok();')
|
||||
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
|
||||
assert cur.fetchone()[0] == 1
|
||||
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
|
||||
assert cur.fetchone()[0] == 1
|
||||
try:
|
||||
with sql.atomic():
|
||||
cur.execute('CREATE TEMPORARY TABLE atomic_test_temp_nok();')
|
||||
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_nok'")
|
||||
assert cur.fetchone()[0] == 1
|
||||
cur.execute('SELECT 1/0;')
|
||||
except psycopg2.Error:
|
||||
pass
|
||||
# now make sure it's gone
|
||||
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_nok'")
|
||||
assert cur.fetchone()[0] == 0
|
||||
# and our change from the first part should still be visible
|
||||
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
|
||||
assert cur.fetchone()[0] == 1
|
||||
|
||||
|
||||
def test_sql_get_ids_with_indexed_value_dict(formdef):
|
||||
data_class = formdef.data_class(mode='sql')
|
||||
data_class.wipe()
|
||||
|
|
|
@ -3528,9 +3528,8 @@ def test_geolocate_action_enable_geolocation(pub):
|
|||
formdef.change_workflow(workflow)
|
||||
assert formdef.geolocations
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
# change to current workflow
|
||||
|
@ -3554,9 +3553,8 @@ def test_geolocate_action_enable_geolocation(pub):
|
|||
formdef.refresh_from_storage()
|
||||
assert formdef.geolocations
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
|
||||
|
@ -4547,7 +4545,7 @@ def test_form_update_after_backoffice_fields(pub):
|
|||
formdef.workflow_id = wf.id
|
||||
formdef.store()
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
_, cur = sql.get_connection_and_cursor()
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'fbo1')
|
||||
|
||||
wf.backoffice_fields_formdef.fields = [
|
||||
|
@ -4568,7 +4566,6 @@ def test_form_update_after_backoffice_fields(pub):
|
|||
assert not column_exists_in_table(cur, formdef.table_name, 'fbo1')
|
||||
assert column_exists_in_table(cur, formdef.table_name, 'fbo3')
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
|
||||
|
|
|
@ -280,8 +280,8 @@ class FormDef(StorableObject):
|
|||
# recreate global views so they don't reference formdata from
|
||||
# deleted formefs
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
sql.clean_global_views(conn, cur)
|
||||
conn.commit()
|
||||
with sql.atomic():
|
||||
sql.clean_global_views(conn, cur)
|
||||
cur.close()
|
||||
|
||||
@property
|
||||
|
|
|
@ -425,33 +425,33 @@ class WcsPublisher(QommonPublisher):
|
|||
from . import sql
|
||||
|
||||
sql.get_connection(new=True)
|
||||
sql.do_session_table()
|
||||
sql.do_user_table()
|
||||
sql.do_role_table()
|
||||
sql.do_tracking_code_table()
|
||||
sql.do_custom_views_table()
|
||||
sql.do_transient_data_table()
|
||||
sql.do_snapshots_table()
|
||||
sql.do_loggederrors_table()
|
||||
sql.do_tokens_table()
|
||||
sql.WorkflowTrace.do_table()
|
||||
sql.Audit.do_table()
|
||||
sql.TestDef.do_table()
|
||||
sql.TestResult.do_table()
|
||||
sql.Application.do_table()
|
||||
sql.ApplicationElement.do_table()
|
||||
sql.SearchableFormDef.do_table()
|
||||
sql.do_meta_table()
|
||||
from .carddef import CardDef
|
||||
from .formdef import FormDef
|
||||
with sql.atomic():
|
||||
sql.do_session_table()
|
||||
sql.do_user_table()
|
||||
sql.do_role_table()
|
||||
sql.do_tracking_code_table()
|
||||
sql.do_custom_views_table()
|
||||
sql.do_transient_data_table()
|
||||
sql.do_snapshots_table()
|
||||
sql.do_loggederrors_table()
|
||||
sql.do_tokens_table()
|
||||
sql.WorkflowTrace.do_table()
|
||||
sql.Audit.do_table()
|
||||
sql.TestDef.do_table()
|
||||
sql.TestResult.do_table()
|
||||
sql.Application.do_table()
|
||||
sql.ApplicationElement.do_table()
|
||||
sql.SearchableFormDef.do_table()
|
||||
sql.do_meta_table()
|
||||
from .carddef import CardDef
|
||||
from .formdef import FormDef
|
||||
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
sql.drop_views(None, conn, cur)
|
||||
for _formdef in FormDef.select() + CardDef.select():
|
||||
sql.do_formdef_tables(_formdef)
|
||||
sql.migrate_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
sql.drop_views(None, conn, cur)
|
||||
for _formdef in FormDef.select() + CardDef.select():
|
||||
sql.do_formdef_tables(_formdef)
|
||||
sql.migrate_global_views(conn, cur)
|
||||
cur.close()
|
||||
|
||||
def record_deprecated_usage(self, *args, **kwargs):
|
||||
return self.record_error(context='[DEPRECATED]', deprecated_usage=True, *args, **kwargs)
|
||||
|
|
785
wcs/sql.py
785
wcs/sql.py
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue