sql: migrate views on startup (#6877)
This introduces a "SQL level" to sql.py, that should be bumped whenever incompatible SQL changes have to be run at startup, it relies on a new key/value table (wcs_meta) to hold the database current sql level.
This commit is contained in:
parent
7f6972789e
commit
929c40d7d5
|
@ -53,8 +53,7 @@ def setup_module(module):
|
|||
]
|
||||
formdef.store()
|
||||
|
||||
sql.do_user_table()
|
||||
sql.do_tracking_code_table()
|
||||
pub.initialize_sql()
|
||||
|
||||
conn.close()
|
||||
|
||||
|
@ -689,3 +688,66 @@ def test_sql_criteria_ilike():
|
|||
|
||||
assert [x.id for x in data_class.select([st.ILike('f3', 'bar')], order_by='id')] == range(21, 51)
|
||||
assert [x.id for x in data_class.select([st.ILike('f3', 'BAR')], order_by='id')] == range(21, 51)
|
||||
|
||||
def table_exists(cur, table_name):
|
||||
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
|
||||
WHERE table_name = %s''', (table_name,))
|
||||
return bool(cur.fetchone()[0] == 1)
|
||||
|
||||
def column_exists_in_table(cur, table_name, column_name):
|
||||
cur.execute('''SELECT COUNT(*) FROM information_schema.columns
|
||||
WHERE table_name = %s
|
||||
AND column_name = %s''', (table_name, column_name))
|
||||
return bool(cur.fetchone()[0] == 1)
|
||||
|
||||
@postgresql
|
||||
def test_sql_level():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
cur.execute('DROP TABLE wcs_meta')
|
||||
assert sql.get_sql_level(conn, cur) == 0
|
||||
assert sql.get_sql_level(conn, cur) == sql.SQL_LEVEL
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
@postgresql
|
||||
def test_migration_1_tracking_code():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
cur.execute('DROP TABLE wcs_meta')
|
||||
cur.execute('DROP TABLE tracking_codes')
|
||||
sql.migrate()
|
||||
assert table_exists(cur, 'tracking_codes')
|
||||
assert table_exists(cur, 'wcs_meta')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
@postgresql
|
||||
def test_migration_2_formdef_id_in_views():
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
|
||||
cur.execute('DROP VIEW wcs_all_forms')
|
||||
|
||||
view_names = []
|
||||
cur.execute('''SELECT table_name FROM information_schema.views
|
||||
WHERE table_name LIKE %s''', ('wcs\_view\_%',))
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
view_names.append(row[0])
|
||||
|
||||
fake_formdef = FormDef()
|
||||
common_fields = sql.get_view_fields(fake_formdef)
|
||||
# remove formdef_id for the purpose of this test
|
||||
common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
|
||||
|
||||
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
|
||||
', '.join([y[1] for y in common_fields]), x) for x in view_names])
|
||||
assert not 'formdef_id' in union
|
||||
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
|
||||
|
||||
sql.migrate()
|
||||
|
||||
assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
|
|
@ -109,7 +109,8 @@ class CmdConvertToSql(Command):
|
|||
errors = []
|
||||
for formdef in FormDef.select():
|
||||
print ('converting %s' % formdef.name).ljust(num_columns-1)
|
||||
sql.do_formdef_tables(formdef, rebuild_views=True)
|
||||
sql.do_formdef_tables(formdef, rebuild_views=True,
|
||||
rebuild_global_views=True)
|
||||
data_class = formdef.data_class(mode='files')
|
||||
count = data_class.count()
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ class CmdStart(Command):
|
|||
pub.app_dir = tenant_path
|
||||
pub.set_config()
|
||||
if pub.is_using_postgresql():
|
||||
pub.initialize_sql()
|
||||
pub.migrate_sql()
|
||||
pub.cleanup()
|
||||
quixote.cleanup()
|
||||
|
||||
|
|
|
@ -233,7 +233,8 @@ class FormDef(StorableObject):
|
|||
t = StorableObject.store(self)
|
||||
if get_publisher().is_using_postgresql():
|
||||
import sql
|
||||
sql.do_formdef_tables(self, rebuild_views=True)
|
||||
sql.do_formdef_tables(self, rebuild_views=True,
|
||||
rebuild_global_views=True)
|
||||
return t
|
||||
|
||||
def get_category(self):
|
||||
|
|
|
@ -212,6 +212,19 @@ class WcsPublisher(StubWcsPublisher):
|
|||
sql.get_connection(new=True)
|
||||
sql.do_user_table()
|
||||
sql.do_tracking_code_table()
|
||||
sql.do_meta_table()
|
||||
from formdef import FormDef
|
||||
conn, cur = sql.get_connection_and_cursor()
|
||||
sql.drop_views(None, conn, cur)
|
||||
for formdef in FormDef.select():
|
||||
sql.do_formdef_tables(formdef)
|
||||
sql.migrate_global_views(conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
def migrate_sql(self):
|
||||
import sql
|
||||
sql.migrate()
|
||||
|
||||
def cleanup(self):
|
||||
if self.is_using_postgresql():
|
||||
|
|
142
wcs/sql.py
142
wcs/sql.py
|
@ -267,7 +267,7 @@ def guard_postgres(func):
|
|||
return f
|
||||
|
||||
@guard_postgres
|
||||
def do_formdef_tables(formdef, rebuild_views=False):
|
||||
def do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False):
|
||||
if formdef.id is None:
|
||||
return []
|
||||
conn, cur = get_connection_and_cursor()
|
||||
|
@ -347,13 +347,13 @@ def do_formdef_tables(formdef, rebuild_views=False):
|
|||
for field in (existing_fields - needed_fields):
|
||||
cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
if rebuild_views or len(existing_fields - needed_fields):
|
||||
# views may have been dropped when dropping columns, so we recreate
|
||||
# them even if not asked to.
|
||||
redo_views(formdef)
|
||||
redo_views(conn, cur, formdef, rebuild_global_views=rebuild_global_views)
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
actions = []
|
||||
if not 'concerned_roles_array' in existing_fields:
|
||||
|
@ -449,35 +449,45 @@ def do_tracking_code_table():
|
|||
cur.close()
|
||||
|
||||
@guard_postgres
|
||||
def redo_views(formdef):
|
||||
def do_meta_table(conn=None, cur=None):
|
||||
own_conn = False
|
||||
if not conn:
|
||||
own_conn = True
|
||||
conn, cur = get_connection_and_cursor()
|
||||
|
||||
created = False
|
||||
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
|
||||
WHERE table_name = %s''', ('wcs_meta',))
|
||||
if cur.fetchone()[0] == 0:
|
||||
cur.execute('''CREATE TABLE wcs_meta (id serial PRIMARY KEY,
|
||||
key varchar,
|
||||
value varchar)''')
|
||||
cur.execute('''INSERT INTO wcs_meta (id, key, value)
|
||||
VALUES (DEFAULT, %s, %s)''', ('sql_level', str(SQL_LEVEL)))
|
||||
created = True
|
||||
|
||||
if own_conn:
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
return created
|
||||
|
||||
|
||||
@guard_postgres
|
||||
def redo_views(conn, cur, formdef, rebuild_global_views=False):
|
||||
if get_publisher().get_site_option('postgresql_views') == 'false':
|
||||
return
|
||||
|
||||
if formdef.id is None:
|
||||
return
|
||||
|
||||
conn, cur = get_connection_and_cursor()
|
||||
drop_views(formdef, conn, cur)
|
||||
do_views(formdef, conn, cur)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
do_views(formdef, conn, cur, rebuild_global_views=rebuild_global_views)
|
||||
|
||||
@guard_postgres
|
||||
def drop_views(formdef, conn, cur):
|
||||
# remove the global views
|
||||
cur.execute('''SELECT table_name FROM information_schema.views
|
||||
WHERE table_name LIKE %s''', ('wcs\_category\_%',))
|
||||
view_names = []
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
view_names.append(row[0])
|
||||
|
||||
for view_name in view_names:
|
||||
cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
|
||||
|
||||
cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
|
||||
drop_global_views(conn, cur)
|
||||
|
||||
if formdef:
|
||||
# remove the form view itself
|
||||
|
@ -497,21 +507,23 @@ def drop_views(formdef, conn, cur):
|
|||
for view_name in view_names:
|
||||
cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
|
||||
|
||||
def get_view_fields(formdef):
|
||||
view_fields = []
|
||||
view_fields.append(("int '%s'" % (formdef.category_id or 0), 'category_id'))
|
||||
view_fields.append(("int '%s'" % (formdef.id or 0), 'formdef_id'))
|
||||
for field in ('id', 'user_id', 'user_hash', 'receipt_time', 'status', 'id_display'):
|
||||
view_fields.append((field, field))
|
||||
return view_fields
|
||||
|
||||
|
||||
@guard_postgres
|
||||
def do_views(formdef, conn, cur):
|
||||
def do_views(formdef, conn, cur, rebuild_global_views=True):
|
||||
# create new view
|
||||
table_name = get_formdef_table_name(formdef)
|
||||
view_name = 'wcs_view_%s_%s' % (formdef.id,
|
||||
get_name_as_sql_identifier(formdef.url_name)[:40])
|
||||
|
||||
view_fields = []
|
||||
view_fields.append(("int '%s'" % (formdef.category_id or 0), 'category_id'))
|
||||
view_fields.append(("int '%s'" % (formdef.id or 0), 'formdef_id'))
|
||||
|
||||
for field in ('id', 'user_id', 'user_hash', 'receipt_time', 'status', 'id_display'):
|
||||
view_fields.append((field, field))
|
||||
|
||||
common_fields = view_fields[:]
|
||||
view_fields = get_view_fields(formdef)
|
||||
|
||||
column_names = {}
|
||||
for field in formdef.fields:
|
||||
|
@ -549,6 +561,26 @@ def do_views(formdef, conn, cur):
|
|||
cur.execute('''CREATE VIEW %s AS SELECT %s FROM %s''' % (
|
||||
view_name, fields_list, table_name))
|
||||
|
||||
if rebuild_global_views:
|
||||
do_global_views(conn, cur) # recreate global views
|
||||
|
||||
|
||||
def drop_global_views(conn, cur):
|
||||
cur.execute('''SELECT table_name FROM information_schema.views
|
||||
WHERE table_name LIKE %s''', ('wcs\_category\_%',))
|
||||
view_names = []
|
||||
while True:
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
break
|
||||
view_names.append(row[0])
|
||||
|
||||
for view_name in view_names:
|
||||
cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
|
||||
|
||||
cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
|
||||
|
||||
def do_global_views(conn, cur):
|
||||
# recreate global views
|
||||
view_names = []
|
||||
cur.execute('''SELECT table_name FROM information_schema.views
|
||||
|
@ -559,6 +591,13 @@ def do_views(formdef, conn, cur):
|
|||
break
|
||||
view_names.append(row[0])
|
||||
|
||||
if not view_names:
|
||||
return
|
||||
|
||||
from wcs.formdef import FormDef
|
||||
fake_formdef = FormDef()
|
||||
common_fields = get_view_fields(fake_formdef)
|
||||
|
||||
union = ' UNION '.join(['''SELECT %s FROM %s''' % (
|
||||
', '.join([y[1] for y in common_fields]), x) for x in view_names])
|
||||
cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
|
||||
|
@ -1517,3 +1556,42 @@ def get_yearly_totals(period_start=None, period_end=None, criterias=None):
|
|||
cur.close()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
SQL_LEVEL = 2
|
||||
|
||||
def migrate_global_views(conn, cur):
|
||||
cur.execute('''SELECT COUNT(*) FROM information_schema.tables
|
||||
WHERE table_name = %s''', ('wcs_all_forms',))
|
||||
existing_fields = set([x[0] for x in cur.fetchall()])
|
||||
if 'formdef_id' not in existing_fields:
|
||||
drop_global_views(conn, cur)
|
||||
do_global_views(conn, cur)
|
||||
|
||||
@guard_postgres
|
||||
def get_sql_level(conn, cur):
|
||||
if do_meta_table(conn, cur):
|
||||
# table has just been created, return sql_level as 0
|
||||
return 0
|
||||
cur.execute('''SELECT value FROM wcs_meta WHERE key = %s''', ('sql_level', ))
|
||||
sql_level = int(cur.fetchone()[0])
|
||||
return sql_level
|
||||
|
||||
@guard_postgres
|
||||
def migrate():
|
||||
conn, cur = get_connection_and_cursor()
|
||||
sql_level = get_sql_level(conn, cur)
|
||||
if sql_level < 1: # 1: introduction of tracking_code table
|
||||
do_tracking_code_table()
|
||||
if sql_level < 2: # 2: introduction of formdef_id in views
|
||||
drop_views(None, conn, cur)
|
||||
from wcs.formdef import FormDef
|
||||
for formdef in FormDef.select():
|
||||
redo_views(conn, cur, formdef, rebuild_global_views=False)
|
||||
migrate_global_views(conn, cur)
|
||||
|
||||
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (
|
||||
str(SQL_LEVEL), 'sql_level'))
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
|
Loading…
Reference in New Issue