sql: allow deploying/running in a specific postgresql schema (#50829) #906

Open
fpeters wants to merge 2 commits from wip/50829-sql-schema into main
7 changed files with 306 additions and 138 deletions

View File

@ -83,18 +83,17 @@ def nocache(settings):
@pytest.fixture
def sql_queries(monkeypatch):
import psycopg2.extensions
import wcs.sql
queries = []
wcs.sql.cleanup_connection()
class LoggingCursor(psycopg2.extensions.cursor):
class LoggingCursor(wcs.sql.WcsPgCursor):
"""A cursor that logs queries using its connection logging facilities."""
def execute(self, query, vars=None):
queries.append(query)
if not query.startswith('SET search_path'):
queries.append(query)
return super().execute(query, vars)
class MyLoggingConnection(wcs.sql.WcsPgConnection):

View File

@ -33,6 +33,8 @@ NEW_WCS_BASE_TENANT = 'wcsteststenant%d' % random.randint(0, 100000)
NEW_WCS_TENANT = '%s.net' % NEW_WCS_BASE_TENANT
NEW_WCS_DB_NAME = '%s_net' % NEW_WCS_BASE_TENANT
SCHEMA_WCS_DB_NAME = 'wcstestschema%d' % random.randint(0, 100000)
HOBO_JSON = {
'services': [
@ -85,14 +87,13 @@ HOBO_JSON = {
}
@pytest.fixture
def setuptest():
def do_setuptest(use_schema=False):
cleanup_connection()
createdb_cfg = CONFIG['postgresql'].get('createdb-connection-params')
conn = psycopg2.connect(**createdb_cfg)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME, SCHEMA_WCS_DB_NAME):
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
pub = create_temporary_pub()
@ -105,20 +106,39 @@ def setuptest():
os.mkdir(skeleton_dir)
with open(os.path.join(skeleton_dir, 'publik.zip'), 'wb') as f:
with zipfile.ZipFile(f, 'w') as z:
z.writestr('config.json', json.dumps(CONFIG))
cfg = copy.deepcopy(CONFIG)
if use_schema:
del cfg['postgresql']['createdb-connection-params']
cfg['postgresql']['database'] = SCHEMA_WCS_DB_NAME
cfg['postgresql']['schema'] = '__auto__'
z.writestr('config.json', json.dumps(cfg))
z.writestr('site-options.cfg', '[options]\npostgresql = true')
if use_schema:
cursor.execute(f'CREATE DATABASE {SCHEMA_WCS_DB_NAME}')
yield pub, hobo_cmd
clean_temporary_pub()
if os.path.exists(CompatWcsPublisher.APP_DIR):
shutil.rmtree(CompatWcsPublisher.APP_DIR)
cleanup_connection()
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME, SCHEMA_WCS_DB_NAME, SCHEMA_WCS_DB_NAME):
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
conn.close()
@pytest.fixture
def setuptest():
yield from do_setuptest(use_schema=False)
@pytest.fixture
def setuptest_with_schema():
yield from do_setuptest(use_schema=True)
def database_exists(database):
res = False
cleanup_connection()
@ -133,6 +153,21 @@ def database_exists(database):
return res
def schema_exists(database, schema):
res = False
cleanup_connection()
createdb_cfg = CONFIG['postgresql'].get('createdb-connection-params')
createdb_cfg['database'] = database
conn = psycopg2.connect(**createdb_cfg)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', (schema,))
if cursor.fetchall():
res = True
conn.close()
return res
def test_deploy(setuptest):
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
@ -284,3 +319,22 @@ def test_deploy_url_change_old_tenant_dir(setuptest):
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
# check that sp configuration is updated
assert publisher.cfg['sp']['saml2_providerid'] == 'http://%s/saml/metadata' % NEW_WCS_TENANT
def test_deploy_schema_name(setuptest_with_schema):
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert database_exists(SCHEMA_WCS_DB_NAME)
cleanup()
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert schema_exists(SCHEMA_WCS_DB_NAME, WCS_DB_NAME)

View File

@ -524,7 +524,7 @@ def test_sql_rollback_on_error(formdef):
data_class.wipe()
def test_sql_atomic_rollback():
def test_sql_atomic_rollback(pub):
import psycopg2.extensions
conn, cur = sql.get_connection_and_cursor()
@ -2856,3 +2856,46 @@ def test_sql_data_views(pub_with_views, formdef_class):
assert column_exists_in_table(cur, f'{prefix}_test', 'geoloc_base_x')
conn.commit()
cur.close()
def test_sql_schema(pub):
pub.set_tenant_by_hostname('example.net')
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [fields.StringField(id='1', label='x')]
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {'1': 'value'}
formdata.store()
pub.cfg['postgresql']['schema'] = 'other'
pub.write_cfg()
sql.cleanup_connection()
with pytest.raises(psycopg2.errors.UndefinedTable):
assert formdef.data_class().count() == 1
_, cur = sql.get_connection_and_cursor()
cur.execute('ALTER SCHEMA public RENAME TO other')
cur.close()
sql.cleanup_connection()
assert formdef.data_class().count() == 1
pub.cfg['postgresql']['schema'] = '__auto__'
pub.write_cfg()
sql.cleanup_connection()
with pytest.raises(psycopg2.errors.UndefinedTable):
assert formdef.data_class().count() == 1
_, cur = sql.get_connection_and_cursor()
cur.execute('ALTER SCHEMA other RENAME TO example_net')
cur.close()
sql.cleanup_connection()
assert formdef.data_class().count() == 1

View File

@ -18,6 +18,7 @@ import collections
from django.core.management.base import CommandError
from django.db.backends.postgresql.client import DatabaseClient
from quixote import get_publisher
from . import TenantCommand
@ -32,6 +33,17 @@ class PsqlDatabaseClient(DatabaseClient):
self.connection.settings_dict[key.upper()] = value
super().__init__(self.connection)
@classmethod
def settings_to_cmd_args_env(cls, settings_dict, parameters):
args, env = super().settings_to_cmd_args_env(settings_dict, parameters)
schema = settings_dict.get('SCHEMA', 'public')
if schema == '__auto__':
schema = get_publisher().tenant.db_schema_name
if not env:
env = {}
env['PGOPTIONS'] = f'--search_path={schema}'
return args, env
class Command(TenantCommand):
def add_arguments(self, parser):

View File

@ -593,6 +593,8 @@ class Command(TenantCommand):
database_name = (database_template_name % {'domain_database_name': domain_database_name}).strip(
'_'
)
elif pub.cfg['postgresql'].get('schema') == '__auto__':
database_name = pub.cfg['postgresql'].get('database', 'wcs')
else:
# legacy way to create a database name, if it contained an
# underscore character, use the first part as a prefix
@ -611,6 +613,8 @@ class Command(TenantCommand):
if 'database' in createdb_cfg:
createdb_cfg['dbname'] = createdb_cfg.pop('database')
schema = createdb_cfg.pop('schema', None)
try:
pgconn = psycopg2.connect(**createdb_cfg)
except psycopg2.Error as e:
@ -621,38 +625,42 @@ class Command(TenantCommand):
pgconn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pgconn.cursor()
new_database = True
try:
cur.execute('''CREATE DATABASE %s''' % database_name)
except psycopg2.Error as e:
if e.pgcode == psycopg2.errorcodes.DUPLICATE_DATABASE:
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
AND table_name = 'wcs_meta'"""
)
if cur.fetchall():
new_database = False
else:
print(
'failed to create database (%s)' % psycopg2.errorcodes.lookup(e.pgcode), file=sys.stderr
)
return
if schema == '__auto__':
cur.execute(f'CREATE SCHEMA {pub.tenant.db_schema_name}')
must_init = True
else:
cur.close()
must_init = True
try:
cur.execute('''CREATE DATABASE %s''' % database_name)
except psycopg2.Error as e:
if e.pgcode == psycopg2.errorcodes.DUPLICATE_DATABASE:
cur.execute(
"""SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
AND table_name = 'wcs_meta'"""
)
if cur.fetchall():
must_init = False
else:
print(
'failed to create database (%s)' % psycopg2.errorcodes.lookup(e.pgcode),
file=sys.stderr,
)
return
else:
cur.close()
pub.cfg['postgresql']['database'] = database_name
pub.cfg['postgresql']['database'] = database_name
pub.write_cfg()
pub.set_config(skip_sql=False)
if not new_database:
return
# create tables etc.
pub.initialize_sql()
if must_init:
# create tables etc.
pub.initialize_sql()
@classmethod
def shared_secret(cls, secret1, secret2):

View File

@ -71,6 +71,15 @@ class Tenant:
def __init__(self, directory):
self.directory = directory
self.hostname = os.path.basename(directory)
self.db_schema_name = self.get_schema_name(self.hostname)
@classmethod
def get_schema_name(cls, hostname):
schema = hostname.replace('.', '_').replace('-', '_')
if len(schema) > 63:
digest = hashlib.md5(schema.encode()).hexdigest()[:4]
schema = '%s_%s_%s' % (schema[:29], digest, schema[-28:])
return schema
class SiteOptionsBoolean:

View File

@ -95,11 +95,21 @@ SQL_TYPE_MAPPING = {
}
class WcsPgCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.connection.has_set_schema:
self.execute(f'SET search_path = {self.connection.schema}')
bdauvergne marked this conversation as resolved Outdated

Si je comprends bien, cela veut dire qu'à chaque fois qu'on demande un curseur, un appel à SET search_path est fait ? Je ne pense pas qu'on veuille que ce soit aussi massif, cela augmenterait la latence de l'application en multipliant les allers-retours avec le PostgreSQL. Ne peut-on pas travailler au niveau d'un middleware wsgi comme dans django ?

Si je comprends bien, cela veut dire qu'à chaque fois qu'on demande un curseur, un appel à `SET search_path` est fait ? Je ne pense pas qu'on veuille que ce soit aussi massif, cela augmenterait la latence de l'application en multipliant les allers-retours avec le PostgreSQL. Ne peut-on pas travailler au niveau d'un middleware wsgi comme dans django ?

Oui à chaque fois, parce qu'en regardant ce qui était produit par authentic/combo/etc. c'est ce qu'il me semblait se passer (mais peut-être c'est ma configuration en local qui rate un truc), extrait :

2023-12-13 16:12:06 CET [359046-161] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET search_path = authentic_fred_local_0d_be,public
2023-12-13 16:12:06 CET [359046-162] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET application_name = authentic_fred_local_0d_be
2023-12-13 16:12:06 CET [359046-163] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attribute"."disabled" AND "authentic2_attribute"."name" = 'address') LIMIT 21
2023-12-13 16:12:06 CET [359046-164] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET search_path = authentic_fred_local_0d_be,public
2023-12-13 16:12:06 CET [359046-165] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET application_name = authentic_fred_local_0d_be
2023-12-13 16:12:06 CET [359046-166] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attribute"."disabled" AND "authentic2_attribute"."name" = 'zipcode') LIMIT 21
2023-12-13 16:12:06 CET [359046-167] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET search_path = authentic_fred_local_0d_be,public
2023-12-13 16:12:06 CET [359046-168] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SET application_name = authentic_fred_local_0d_be
2023-12-13 16:12:06 CET [359046-169] fred@authentic (authentic_fred_local_0d_be) LOG:  statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attrib
ute"."disabled" AND "authentic2_attribute"."name" = 'city') LIMIT 21

De là j'imaginais les instructions "SET ..." gratuites (surtout que là il y en a chaque fois 2), que c'était une sorte de changement de "mode opératoire" pour les vraies instructions qui suivaient. Si ça n'est pas le cas, et s'il n'y a pas de "truc" pour transmettre en un coup à la fois le SET et la requête, ça va être un peu répétitif mais assez facile, il suffit d'ajouter le schéma à toutes les requêtes et je peux faire les choses ainsi.

middleware wsgi

Non, on a dépassé ça quand on arrive au moment où le tenant est déterminé.

Oui à chaque fois, parce qu'en regardant ce qui était produit par authentic/combo/etc. c'est ce qu'il me semblait se passer (mais peut-être c'est ma configuration en local qui rate un truc), extrait : ``` 2023-12-13 16:12:06 CET [359046-161] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET search_path = authentic_fred_local_0d_be,public 2023-12-13 16:12:06 CET [359046-162] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET application_name = authentic_fred_local_0d_be 2023-12-13 16:12:06 CET [359046-163] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attribute"."disabled" AND "authentic2_attribute"."name" = 'address') LIMIT 21 2023-12-13 16:12:06 CET [359046-164] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET search_path = authentic_fred_local_0d_be,public 2023-12-13 16:12:06 CET [359046-165] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET application_name = authentic_fred_local_0d_be 2023-12-13 16:12:06 CET [359046-166] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attribute"."disabled" AND "authentic2_attribute"."name" = 'zipcode') LIMIT 21 2023-12-13 16:12:06 CET [359046-167] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET search_path = authentic_fred_local_0d_be,public 2023-12-13 16:12:06 CET [359046-168] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SET application_name = authentic_fred_local_0d_be 2023-12-13 16:12:06 CET [359046-169] fred@authentic (authentic_fred_local_0d_be) LOG: statement: SELECT "authentic2_attribute"."id", "authentic2_attribute"."label", "authentic2_attribute"."description", "authentic2_attribute"."name", "authentic2_attribute"."required", "authentic2_attribute"."asked_on_registration", "authentic2_attribute"."user_editable", "authentic2_attribute"."user_visible", "authentic2_attribute"."multiple", "authentic2_attribute"."kind", "authentic2_attribute"."disabled", "authentic2_attribute"."searchable", "authentic2_attribute"."required_on_login", "authentic2_attribute"."scopes", "authentic2_attribute"."order" FROM "authentic2_attribute" WHERE (NOT "authentic2_attrib ute"."disabled" AND "authentic2_attribute"."name" = 'city') LIMIT 21 ``` De là j'imaginais les instructions "SET ..." gratuites (surtout que là il y en a chaque fois 2), que c'était une sorte de changement de "mode opératoire" pour les vraies instructions qui suivaient. Si ça n'est pas le cas, et s'il n'y a pas de "truc" pour transmettre en un coup à la fois le SET et la requête, ça va être un peu répétitif mais assez facile, il suffit d'ajouter le schéma à toutes les requêtes et je peux faire les choses ainsi. > middleware wsgi Non, on a dépassé ça quand on arrive au moment où le tenant est déterminé.

On a sur le SaaS une configuration particulière pour ne pas appeler set search_path= à chaque curseur:

bdauvergne@front1.test:/etc$ sudo grep -Ri TENANT_LIMIT_SET_C
welco/settings.d/connection.py:TENANT_LIMIT_SET_CALLS = True
authentic2-multitenant/settings.d/connection.py:TENANT_LIMIT_SET_CALLS = True

au niveau de django-tenant-schemas ça pose un booléen sur l'objet connection dès que le schéma est défini pour ne pas répéter mais on devrait bouger ça dans debian_common_config.py.

On a sur le SaaS une configuration particulière pour ne pas appeler set search_path= à chaque curseur: ``` bdauvergne@front1.test:/etc$ sudo grep -Ri TENANT_LIMIT_SET_C welco/settings.d/connection.py:TENANT_LIMIT_SET_CALLS = True authentic2-multitenant/settings.d/connection.py:TENANT_LIMIT_SET_CALLS = True ``` au niveau de django-tenant-schemas ça pose un booléen sur l'objet connection dès que le schéma est défini pour ne pas répéter mais on devrait bouger ça dans debian_common_config.py.
self.connection.has_set_schema = True
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._wcs_in_transaction = False
self._wcs_savepoints = []
self.cursor_factory = WcsPgCursor
self.has_set_schema = False
class Atomic(ContextDecorator):
@ -138,6 +148,7 @@ class Atomic(ContextDecorator):
last_savepoint = conn._wcs_savepoints.pop()
cursor.execute("ROLLBACK TO SAVEPOINT \"%s\";" % last_savepoint)
cursor.execute("RELEASE SAVEPOINT \"%s\";" % last_savepoint)
conn.has_set_schema = False # replay it in case it was cancelled
def commit(self):
conn = get_connection()
@ -315,9 +326,13 @@ def get_connection(new=False):
if 'database' in postgresql_cfg:
postgresql_cfg['dbname'] = postgresql_cfg.pop('database')
postgresql_cfg['application_name'] = getattr(publisher, 'sql_application_name', None)
schema = get_cfg('postgresql', {}).get('schema') or 'public'
if schema == '__auto__':
schema = publisher.tenant.db_schema_name
try:
pgconn = psycopg2.connect(connection_factory=WcsPgConnection, **postgresql_cfg)
pgconn.autocommit = True
pgconn.schema = schema
except psycopg2.Error:
if new:
raise
@ -381,13 +396,16 @@ def get_formdef_trigger_name(formdef):
def get_formdef_new_id(id_start):
new_id = id_start
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
while True:
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('formdata\\_%s\\_%%' % new_id,),
(
conn.schema,
'formdata\\_%s\\_%%' % new_id,
),
)
if cur.fetchone()[0] == 0:
break
@ -398,13 +416,16 @@ def get_formdef_new_id(id_start):
def get_carddef_new_id(id_start):
new_id = id_start
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
while True:
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('carddata\\_%s\\_%%' % new_id,),
(
conn.schema,
'carddata\\_%s\\_%%' % new_id,
),
)
if cur.fetchone()[0] == 0:
break
@ -414,12 +435,15 @@ def get_carddef_new_id(id_start):
def formdef_wipe():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
cur.execute(
'''SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('formdata\\_%%\\_%%',),
(
conn.schema,
'formdata\\_%%\\_%%',
),
)
for table_name in [x[0] for x in cur.fetchall()]:
cur.execute('DELETE FROM %s' % table_name) # Force trigger execution
@ -433,12 +457,15 @@ def formdef_wipe():
def carddef_wipe():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
cur.execute(
'''SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('carddata\\_%%\\_%%',),
(
conn.schema,
'carddata\\_%%\\_%%',
),
)
for table_name in [x[0] for x in cur.fetchall()]:
cur.execute('DELETE FROM %s' % table_name) # Force trigger execution
@ -476,9 +503,12 @@ def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(
conn.schema,
table_name,
),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -511,9 +541,12 @@ def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(
conn.schema,
table_name,
),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -580,10 +613,10 @@ def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild
# migrations on _evolutions table
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = '%s_evolutions'
'''
% table_name
WHERE table_schema = %s
AND table_name = %s
''',
(conn.schema, f'{table_name}_evolutions'),
)
evo_existing_fields = {x[0] for x in cur.fetchall()}
if 'last_jump_datetime' not in evo_existing_fields:
@ -775,14 +808,14 @@ def do_formdef_indexes(formdef, created, conn, cur):
def do_user_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'users'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -804,9 +837,9 @@ def do_user_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -887,14 +920,14 @@ def do_user_table():
def do_role_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'roles'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -912,9 +945,9 @@ def do_role_table():
cur.execute('ALTER TABLE roles ALTER COLUMN uuid TYPE VARCHAR')
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -936,14 +969,14 @@ def migrate_legacy_roles():
def do_tracking_code_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'tracking_codes'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -954,9 +987,9 @@ def do_tracking_code_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -970,14 +1003,14 @@ def do_tracking_code_table():
def do_session_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'sessions'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -990,9 +1023,9 @@ def do_session_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -1011,14 +1044,14 @@ def do_session_table():
def do_transient_data_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = TransientData._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -1031,9 +1064,9 @@ def do_transient_data_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
needed_fields = {x[0] for x in TransientData._table_static_fields}
@ -1046,14 +1079,14 @@ def do_transient_data_table():
def do_custom_views_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'custom_views'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -1074,9 +1107,9 @@ def do_custom_views_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -1104,14 +1137,14 @@ def do_custom_views_table():
def do_snapshots_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'snapshots'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -1132,9 +1165,9 @@ def do_snapshots_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -1158,14 +1191,14 @@ def do_snapshots_table():
def do_loggederrors_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = 'loggederrors'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -1192,9 +1225,9 @@ def do_loggederrors_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -1221,14 +1254,14 @@ def do_loggederrors_table():
def do_tokens_table():
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = Token._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -1241,9 +1274,9 @@ def do_tokens_table():
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -1282,10 +1315,11 @@ def do_meta_table(conn=None, cur=None, insert_current_sql_level=True):
'''SELECT COUNT(t.*), COUNT(i.*) FROM information_schema.tables t
LEFT JOIN pg_indexes i
ON (i.schemaname, i.tablename, i.indexname) = (table_schema, table_name, %s)
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(
'wcs_meta_key',
conn.schema,
'wcs_meta',
),
)
@ -1316,9 +1350,12 @@ def do_meta_table(conn=None, cur=None, insert_current_sql_level=True):
else:
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
('wcs_meta',),
(
conn.schema,
'wcs_meta',
),
)
existing_fields = {x[0] for x in cur.fetchall()}
if 'created_at' not in existing_fields:
@ -1352,17 +1389,17 @@ def drop_views(formdef, conn, cur):
view_prefix = 'wcs\\_%s\\_view\\_%s\\_%%' % (formdef.data_sql_prefix, formdef.id)
cur.execute(
'''SELECT table_name FROM information_schema.views
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
(view_prefix,),
(conn.schema, view_prefix),
)
else:
# if there's no formdef specified, remove all form views
cur.execute(
'''SELECT table_name FROM information_schema.views
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('wcs\\_view\\_%',),
(conn.schema, 'wcs\\_view\\_%'),
)
view_names = []
while True:
@ -1489,9 +1526,12 @@ def do_views(formdef, conn, cur, rebuild_global_views=True):
def drop_global_views(conn, cur):
cur.execute(
'''SELECT table_name FROM information_schema.views
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name LIKE %s''',
('wcs\\_category\\_%',),
(
conn.schema,
'wcs\\_category\\_%',
),
)
view_names = []
while True:
@ -1557,9 +1597,12 @@ def do_global_views(conn, cur):
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
('wcs_all_forms',),
(
conn.schema,
'wcs_all_forms',
),
)
existing_fields = {x[0] for x in cur.fetchall()}
if 'statistics_data' not in existing_fields:
@ -3828,9 +3871,9 @@ class TranslatableMessage(SqlMixin):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -3846,9 +3889,9 @@ class TranslatableMessage(SqlMixin):
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -3939,9 +3982,9 @@ class TestDef(SqlMixin):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -3959,9 +4002,9 @@ class TestDef(SqlMixin):
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -4041,9 +4084,9 @@ class TestResult(SqlMixin):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -4150,9 +4193,9 @@ class WorkflowTrace(SqlMixin):
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -4296,14 +4339,14 @@ class Audit(SqlMixin):
@classmethod
def do_table(cls):
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = cls._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -4323,9 +4366,9 @@ class Audit(SqlMixin):
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
existing_fields = {x[0] for x in cur.fetchall()}
@ -4409,14 +4452,14 @@ class Application(SqlMixin):
@classmethod
def do_table(cls):
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = cls._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -4497,14 +4540,14 @@ class ApplicationElement(SqlMixin):
@classmethod
def do_table(cls):
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
table_name = cls._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(table_name,),
(conn.schema, table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(
@ -4682,13 +4725,13 @@ class SearchableFormDef(SqlMixin):
@classmethod
@atomic
def do_table(cls):
_, cur = get_connection_and_cursor()
conn, cur = get_connection_and_cursor()
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
WHERE table_schema = %s
AND table_name = %s''',
(cls._table_name,),
(conn.schema, cls._table_name),
)
if cur.fetchone()[0] == 0:
cur.execute(