sql: switch to autocommit (#81133) #667

Merged
fpeters merged 4 commits from wip/81133-psycopg-autocommit into main 2023-11-24 15:05:03 +01:00
12 changed files with 444 additions and 509 deletions

View File

@ -3278,7 +3278,6 @@ def test_management_views_with_no_formdefs(pub):
conn, cur = get_connection_and_cursor()
drop_global_views(conn, cur)
conn.commit()
cur.close()
app = login(get_app(pub))

View File

@ -42,7 +42,6 @@ def test_backoffice_statistics_with_no_formdefs(pub):
conn, cur = get_connection_and_cursor()
drop_global_views(conn, cur)
conn.commit()
cur.close()
app = login(get_app(pub))

View File

@ -1,6 +1,5 @@
import configparser
import os
from unittest import mock
import pytest
@ -84,37 +83,30 @@ def nocache(settings):
@pytest.fixture
def sql_queries(monkeypatch):
import psycopg2
import psycopg2.extensions
import wcs.sql
queries = []
wcs.sql.cleanup_connection()
old_connect = psycopg2.connect
def connect(*args, **kwargs):
conn = old_connect(*args, **kwargs)
mocked_conn = mock.Mock(wraps=conn)
old_cursor = conn.cursor
class LoggingCursor(psycopg2.extensions.cursor):
"""A cursor that logs queries using its connection logging facilities."""
def cursor(*args, **kwargs):
cur = old_cursor(*args, **kwargs)
mocked_cur = mock.MagicMock(wraps=cur)
old_execute = cur.execute
def execute(self, query, vars=None):
queries.append(query)
return super().execute(query, vars)
def execute(*args, **kwargs):
queries.append(args[0])
return old_execute(*args, **kwargs)
class MyLoggingConnection(wcs.sql.WcsPgConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cursor_factory = LoggingCursor
mocked_cur.execute = execute
return mocked_cur
mocked_conn.cursor = cursor
return mocked_conn
monkeypatch.setattr(psycopg2, 'connect', connect)
backup_original_class = wcs.sql.WcsPgConnection
wcs.sql.WcsPgConnection = MyLoggingConnection
yield queries
wcs.sql.cleanup_connection()
wcs.sql.WcsPgConnection = backup_original_class
@pytest.fixture

View File

@ -554,6 +554,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
'user': 'fred',
'dbname': 'tests_wcs_wcs_example_net',
'application_name': 'wcs',
'connection_factory': sql.WcsPgConnection,
}
assert pub.initialize_sql.call_count == 1
@ -565,6 +566,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
'user': 'fred',
'dbname': 'tests_wcs_wcs_example_net',
'application_name': 'wcs',
'connection_factory': sql.WcsPgConnection,
}
pub.cfg['postgresql']['database-template-name'] = 'very_long_' * 10 + '%s'
@ -575,6 +577,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
'user': 'fred',
'dbname': 'very_long_very_long_very_long_c426_ng_very_long_wcs_example_net',
'application_name': 'wcs',
'connection_factory': sql.WcsPgConnection,
}
assert len(connect.call_args_list[1][1]['dbname']) == 63
assert pub.initialize_sql.call_count == 2
@ -587,6 +590,7 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
'user': 'fred',
'dbname': 'test_2_wcs_example_net',
'application_name': 'wcs',
'connection_factory': sql.WcsPgConnection,
}
assert pub.initialize_sql.call_count == 3

View File

@ -34,7 +34,7 @@ def test_activate_language_all_views(pub):
def test_translation_columns(pub):
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
assert not column_exists_in_table(cur, 'translatable_messages', 'string_de')
assert not column_exists_in_table(cur, 'translatable_messages', 'string_fr')
pub.cfg['language'] = {'language': 'en', 'multilinguism': True, 'languages': ['fr', 'de']}
@ -46,7 +46,6 @@ def test_translation_columns(pub):
pub.cfg['language'] = {'language': 'en', 'multilinguism': True, 'languages': ['fr']}
assert column_exists_in_table(cur, 'translatable_messages', 'string_de')
assert column_exists_in_table(cur, 'translatable_messages', 'string_fr')
conn.commit()
cur.close()

View File

@ -238,10 +238,9 @@ def get_logs(hostname=None):
def get_sql_cron_statuses():
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
cur.execute("SELECT key, value FROM wcs_meta WHERE key LIKE 'cron-status-%'")
rows = cur.fetchall()
conn.commit()
cur.close()
return dict(rows)
@ -458,9 +457,8 @@ def test_cron_command_rewind_jobs(settings, freezer):
def job3(pub, job=None):
jobs.append('job3')
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
cur.execute("DELETE FROM wcs_meta WHERE key LIKE 'cron%%'")
conn.commit()
cur.close()
@classmethod
@ -478,24 +476,22 @@ def test_cron_command_rewind_jobs(settings, freezer):
assert jobs == []
# write down a past datetime in database
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
cur.execute(
"UPDATE wcs_meta SET created_at = %s WHERE key LIKE 'cron%%'",
(localtime() - datetime.timedelta(hours=1),),
)
conn.commit()
cur.close()
call_command('cron')
assert sorted(jobs) == ['job1', 'job3']
# since past day
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
cur.execute(
"UPDATE wcs_meta SET created_at = %s WHERE key LIKE 'cron%%'",
(localtime() - datetime.timedelta(days=1),),
)
conn.commit()
cur.close()
jobs = []
call_command('cron')

View File

@ -278,10 +278,9 @@ def test_magictoken_migration(pub, app):
session.magictokens[transient_data.id] = transient_data.data
sql.TransientData.wipe()
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
sql_statement = 'UPDATE sessions SET session_data = %s WHERE id = %s'
cur.execute(sql_statement, (bytearray(pickle.dumps(session.__dict__, protocol=2)), session.id))
conn.commit()
cur.close()
# and get back to submitting form, it should run migration

View File

@ -524,6 +524,51 @@ def test_sql_rollback_on_error(formdef):
data_class.wipe()
def test_sql_atomic_rollback():
import psycopg2.extensions
conn, cur = sql.get_connection_and_cursor()
# check the basic behavior first, enter/exit transaction
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
with sql.atomic():
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
cur.execute('SELECT 1;')
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_INTRANS
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
# now we can exit with a failure
with pytest.raises(psycopg2.Error):
with sql.atomic():
cur.execute('SELECT 1/0;')
assert conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE
# Do some changes in a sub transaction
with sql.atomic():
with sql.atomic():
cur.execute('CREATE TEMPORARY TABLE atomic_test_temp_ok();')
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
assert cur.fetchone()[0] == 1
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
assert cur.fetchone()[0] == 1
try:
with sql.atomic():
cur.execute('CREATE TEMPORARY TABLE atomic_test_temp_nok();')
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_nok'")
assert cur.fetchone()[0] == 1
cur.execute('SELECT 1/0;')
except psycopg2.Error:
pass
# now make sure it's gone
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_nok'")
assert cur.fetchone()[0] == 0
# and our change from the first part should still be visible
cur.execute("SELECT count(*) FROM pg_class WHERE relname = 'atomic_test_temp_ok'")
assert cur.fetchone()[0] == 1
def test_sql_get_ids_with_indexed_value_dict(formdef):
data_class = formdef.data_class(mode='sql')
data_class.wipe()

View File

@ -3528,9 +3528,8 @@ def test_geolocate_action_enable_geolocation(pub):
formdef.change_workflow(workflow)
assert formdef.geolocations
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
# change to current workflow
@ -3554,9 +3553,8 @@ def test_geolocate_action_enable_geolocation(pub):
formdef.refresh_from_storage()
assert formdef.geolocations
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'geoloc_base')
conn.commit()
cur.close()
@ -4547,7 +4545,7 @@ def test_form_update_after_backoffice_fields(pub):
formdef.workflow_id = wf.id
formdef.store()
conn, cur = sql.get_connection_and_cursor()
_, cur = sql.get_connection_and_cursor()
assert column_exists_in_table(cur, formdef.table_name, 'fbo1')
wf.backoffice_fields_formdef.fields = [
@ -4568,7 +4566,6 @@ def test_form_update_after_backoffice_fields(pub):
assert not column_exists_in_table(cur, formdef.table_name, 'fbo1')
assert column_exists_in_table(cur, formdef.table_name, 'fbo3')
conn.commit()
cur.close()

View File

@ -280,8 +280,8 @@ class FormDef(StorableObject):
# recreate global views so they don't reference formdata from
# deleted formefs
conn, cur = sql.get_connection_and_cursor()
sql.clean_global_views(conn, cur)
conn.commit()
with sql.atomic():
sql.clean_global_views(conn, cur)
cur.close()
@property

View File

@ -425,33 +425,33 @@ class WcsPublisher(QommonPublisher):
from . import sql
sql.get_connection(new=True)
sql.do_session_table()
sql.do_user_table()
sql.do_role_table()
sql.do_tracking_code_table()
sql.do_custom_views_table()
sql.do_transient_data_table()
sql.do_snapshots_table()
sql.do_loggederrors_table()
sql.do_tokens_table()
sql.WorkflowTrace.do_table()
sql.Audit.do_table()
sql.TestDef.do_table()
sql.TestResult.do_table()
sql.Application.do_table()
sql.ApplicationElement.do_table()
sql.SearchableFormDef.do_table()
sql.do_meta_table()
from .carddef import CardDef
from .formdef import FormDef
with sql.atomic():
sql.do_session_table()
sql.do_user_table()
sql.do_role_table()
sql.do_tracking_code_table()
sql.do_custom_views_table()
sql.do_transient_data_table()
sql.do_snapshots_table()
sql.do_loggederrors_table()
sql.do_tokens_table()
sql.WorkflowTrace.do_table()
sql.Audit.do_table()
sql.TestDef.do_table()
sql.TestResult.do_table()
sql.Application.do_table()
sql.ApplicationElement.do_table()
sql.SearchableFormDef.do_table()
sql.do_meta_table()
from .carddef import CardDef
from .formdef import FormDef
conn, cur = sql.get_connection_and_cursor()
sql.drop_views(None, conn, cur)
for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur)
conn.commit()
cur.close()
conn, cur = sql.get_connection_and_cursor()
sql.drop_views(None, conn, cur)
for _formdef in FormDef.select() + CardDef.select():
sql.do_formdef_tables(_formdef)
sql.migrate_global_views(conn, cur)
cur.close()
def record_deprecated_usage(self, *args, **kwargs):
return self.record_error(context='[DEPRECATED]', deprecated_usage=True, *args, **kwargs)

File diff suppressed because it is too large Load Diff