sql: update wcs_all_forms category column on category change (#87800)
gitea/wcs/pipeline/head Build queued... Details

This commit is contained in:
Frédéric Péters 2024-03-05 18:28:55 +01:00
parent 5ec12c0c0e
commit 781e4e4c52
4 changed files with 71 additions and 1 deletions

View File

@ -17,6 +17,7 @@ import wcs.sql_criterias as st
from wcs import fields, sql
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.formdata import Evolution
from wcs.formdef import FormDef
@ -1576,6 +1577,51 @@ def test_all_forms_user_name_change(pub, formdef):
conn.commit()
def test_all_forms_category_change(pub, formdef):
Category.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.store()
formdata = formdef.data_class()()
formdata.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('SELECT category_id FROM wcs_all_forms WHERE formdef_id = %s', (formdef.id,))
row = cur.fetchone()
assert row[0] is None
category = Category()
category.name = 'Test'
category.store()
formdef.category_id = category.id
formdef.store()
cur.execute('SELECT category_id FROM wcs_all_forms WHERE formdef_id = %s', (formdef.id,))
row = cur.fetchone()
assert row[0] == int(category.id)
category2 = Category()
category2.name = 'Test2'
category2.store()
formdef.category_id = category2.id
formdef.store()
cur.execute('SELECT category_id FROM wcs_all_forms WHERE formdef_id = %s', (formdef.id,))
row = cur.fetchone()
assert row[0] == int(category2.id)
formdef.category_id = None
formdef.store()
cur.execute('SELECT category_id FROM wcs_all_forms WHERE formdef_id = %s', (formdef.id,))
row = cur.fetchone()
assert row[0] is None
cur.close()
conn.commit()
def test_views_fts(pub):
drop_formdef_tables()
_, cur = sql.get_connection_and_cursor()
@ -2375,7 +2421,7 @@ def test_migration_59_all_forms_table(pub):
formdata.store()
conn, cur = sql.get_connection_and_cursor()
cur.execute('DROP TABLE wcs_all_forms')
cur.execute('DROP TABLE wcs_all_forms CASCADE')
cur.execute(
'DROP TRIGGER %s ON %s' % (sql.get_formdef_trigger_name(formdef), sql.get_formdef_table_name(formdef))
)

View File

@ -144,6 +144,10 @@ class CardDef(FormDef):
self.roles = self.backoffice_submission_roles
return super().store(comment=comment, *args, **kwargs)
def update_category_reference(self):
# only relevant for formdefs
pass
@classmethod
def get_carddefs_as_data_source(cls):
carddefs_by_id = {}

View File

@ -477,6 +477,14 @@ class FormDef(StorableObject):
self.update_storage()
self.store_related_custom_views()
self.update_searchable_formdefs_table()
self.update_category_reference()
def update_category_reference(self):
if getattr(self, '_onload_category_id', None) != self.category_id:
from . import sql
sql.update_global_view_formdef_category(self)
self._onload_category_id = self.category_id
def has_captcha_enabled(self):
return self.has_captcha and get_publisher().has_site_option('formdef-captcha-option')
@ -2032,6 +2040,8 @@ class FormDef(StorableObject):
del odict['_custom_views']
if '_import_orig_slug' in odict:
del odict['_import_orig_slug']
if '_onload_category_id' in odict:
del odict['_onload_category_id']
return odict
def __setstate__(self, dict):
@ -2046,6 +2056,7 @@ class FormDef(StorableObject):
@classmethod
def storage_load(cls, fd, **kwargs):
o = super().storage_load(fd)
o._onload_category_id = o.category_id # keep track of category, to update wcs_all_forms if changed
if kwargs.get('lightweight'):
o.fields = Ellipsis
return o

View File

@ -1508,6 +1508,15 @@ def drop_global_views(conn, cur):
cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
def update_global_view_formdef_category(formdef):
_, cur = get_connection_and_cursor()
with cur:
cur.execute(
'''UPDATE wcs_all_forms set category_id = %s WHERE formdef_id = %s''',
(formdef.category_id, formdef.id),
)
def do_global_views(conn, cur):
# recreate global views
# XXX TODO: make me dynamic, please ?