From e153d74089ef9ebc73bd415afebc161777949a65 Mon Sep 17 00:00:00 2001 From: Pierre Ducroquet Date: Tue, 5 Mar 2024 10:42:12 +0100 Subject: [PATCH] wcs_search_tokens: new FTS mechanism with fuzzy-match (#86527) introduce a new mechanism to implement FTS with fuzzy-match. This is made possible by adding and maintaining a table of the FTS tokens, wcs_search_tokens, fed with searchable_formdefs and wcs_all_forms. When a query is issued, its tokens are matched against the tokens with a fuzzy match when no direct match is found, and the query is then rebuilt. --- wcs/publisher.py | 1 + wcs/qommon/publisher.py | 8 ++ wcs/sql.py | 170 +++++++++++++++++++++++++++++++++++++++- wcs/sql_criterias.py | 5 ++ 4 files changed, 181 insertions(+), 3 deletions(-) diff --git a/wcs/publisher.py b/wcs/publisher.py index de67314c9..45bcf3198 100644 --- a/wcs/publisher.py +++ b/wcs/publisher.py @@ -455,6 +455,7 @@ class WcsPublisher(QommonPublisher): for _formdef in FormDef.select() + CardDef.select(): sql.do_formdef_tables(_formdef) sql.migrate_global_views(conn, cur) + sql.init_search_tokens() cur.close() def record_deprecated_usage(self, *args, **kwargs): diff --git a/wcs/qommon/publisher.py b/wcs/qommon/publisher.py index 1270e5514..bb7faff07 100644 --- a/wcs/qommon/publisher.py +++ b/wcs/qommon/publisher.py @@ -690,6 +690,11 @@ class QommonPublisher(Publisher): for error in self.loggederror_class.select(clause=clauses): self.loggederror_class.remove_object(error.id) + def clean_search_tokens(self, **kwargs): + from wcs import sql + + sql.purge_obsolete_search_tokens() + @classmethod def register_cronjobs(cls): cls.register_cronjob(CronJob(cls.clean_sessions, minutes=[0], name='clean_sessions')) @@ -702,6 +707,9 @@ class QommonPublisher(Publisher): cls.register_cronjob( CronJob(cls.clean_loggederrors, hours=[3], minutes=[0], name='clean_loggederrors') ) + cls.register_cronjob( + CronJob(cls.clean_search_tokens, weekdays=[0], hours=[1], minutes=[0], name='clean_search_tokens') + ) _initialized = False diff --git a/wcs/sql.py b/wcs/sql.py index bb80c4714..57c59b756 100644 --- a/wcs/sql.py +++ b/wcs/sql.py @@ -96,6 +96,20 @@ SQL_TYPE_MAPPING = { } +def _table_exists(cur, table_name): + cur.execute('SELECT 1 FROM pg_class WHERE relname = %s;', (table_name,)) + rows = cur.fetchall() + return len(rows) > 0 + + +def _trigger_exists(cur, table_name, trigger_name): + cur.execute( + 'SELECT 1 FROM pg_trigger WHERE tgrelid = %s::regclass AND tgname = %s;', (table_name, trigger_name) + ) + rows = cur.fetchall() + return len(rows) > 0 + + class WcsPgConnection(psycopg2.extensions.connection): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1579,6 +1593,8 @@ def do_global_views(conn, cur): % (name, category.id) ) + init_search_tokens_triggers(cur) + def clean_global_views(conn, cur): # Purge of any dead data @@ -1671,11 +1687,154 @@ def init_global_table(conn=None, cur=None): endpoint_status=endpoint_status_filter, ) ) + init_search_tokens_data(cur) if own_conn: cur.close() +def init_search_tokens(conn=None, cur=None): + own_cur = False + if cur is None: + own_cur = True + conn, cur = get_connection_and_cursor() + + # Create table + cur.execute('CREATE TABLE IF NOT EXISTS wcs_search_tokens(token TEXT PRIMARY KEY);') + + # Create triggers + init_search_tokens_triggers(cur) + + # Fill table + init_search_tokens_data(cur) + + # Index at the end, small performance trick... not that useful, but it's free... + cur.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm;') + cur.execute( + 'CREATE INDEX IF NOT EXISTS wcs_search_tokens_trgm ON wcs_search_tokens USING gin(token gin_trgm_ops);' + ) + + # And last: functions to use this brand new table + cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_or (tsquery) (sfunc=tsquery_or, stype=tsquery);') + cur.execute('CREATE OR REPLACE AGGREGATE tsquery_agg_and (tsquery) (sfunc=tsquery_and, stype=tsquery);') + cur.execute( + r"""CREATE OR REPLACE FUNCTION public.wcs_tsquery(text) + RETURNS tsquery + LANGUAGE sql + STABLE +AS $function$ +with + tokenized as (select unnest(regexp_split_to_array($1, '\s+')) w), + super_tokenized as ( + select w, + coalesce((select plainto_tsquery(perfect.token) from wcs_search_tokens perfect where perfect.token = plainto_tsquery(w)::text), + tsquery_agg_or(plainto_tsquery(partial.token) order by partial.token <-> w desc), + plainto_tsquery(w)) tokens + from tokenized + left join wcs_search_tokens partial on partial.token % w and w not similar to '%[0-9]{2,}%' + group by w) +select tsquery_agg_and(tokens) from super_tokenized; +$function$;""" + ) + + if own_cur: + cur.close() + + +def init_search_tokens_triggers(cur): + # We define only appending triggers, ie on INSERT and UPDATE. + # It would be far heavier to maintain deletions here, and having extra data has + # no or marginal side effect on search performances, and absolutely no impact + # on search results. + # Instead, a weekly cron job will delete obsolete entries, thus making it sure no + # personal data is kept uselessly. + + # First part: the appending function + cur.execute( + """CREATE OR REPLACE FUNCTION wcs_search_tokens_trigger_fn () + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + INSERT INTO wcs_search_tokens SELECT unnest(tsvector_to_array(NEW.fts)) ON CONFLICT(token) DO NOTHING; + RETURN NEW; +END; +$function$;""" + ) + + if not (_table_exists(cur, 'wcs_search_tokens')): + # abort trigger creation if tokens table doesn't exist yet + return + + if _table_exists(cur, 'wcs_all_forms') and not _trigger_exists( + cur, 'wcs_all_forms', 'wcs_all_forms_fts_trg_upd' + ): + # Second part: insert and update triggers for wcs_all_forms + cur.execute( + """CREATE TRIGGER wcs_all_forms_fts_trg_ins + AFTER INSERT ON wcs_all_forms + FOR EACH ROW WHEN (NEW.fts IS NOT NULL) + EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();""" + ) + cur.execute( + """CREATE TRIGGER wcs_all_forms_fts_trg_upd + AFTER UPDATE OF fts ON wcs_all_forms + FOR EACH ROW WHEN (NEW.fts IS NOT NULL) + EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();""" + ) + + if _table_exists(cur, 'searchable_formdefs') and not _trigger_exists( + cur, 'searchable_formdefs', 'searchable_formdefs_fts_trg_upd' + ): + # Third part: insert and update triggers for searchable_formdefs + cur.execute( + """CREATE TRIGGER searchable_formdefs_fts_trg_ins + AFTER INSERT ON searchable_formdefs + FOR EACH ROW WHEN (NEW.fts IS NOT NULL) + EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();""" + ) + cur.execute( + """CREATE TRIGGER searchable_formdefs_fts_trg_upd + AFTER UPDATE OF fts ON searchable_formdefs + FOR EACH ROW WHEN (NEW.fts IS NOT NULL) + EXECUTE PROCEDURE wcs_search_tokens_trigger_fn();""" + ) + + +def init_search_tokens_data(cur): + if not (_table_exists(cur, 'wcs_search_tokens')): + # abort table data initialization if tokens table doesn't exist yet + return + + if _table_exists(cur, 'wcs_all_forms'): + cur.execute( + """INSERT INTO wcs_search_tokens + SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms + ON CONFLICT(token) DO NOTHING;""" + ) + if _table_exists(cur, 'searchable_formdefs'): + cur.execute( + """INSERT INTO wcs_search_tokens + SELECT unnest(tsvector_to_array(fts)) FROM searchable_formdefs + ON CONFLICT(token) DO NOTHING;""" + ) + + +def purge_obsolete_search_tokens(cur=None): + own_cur = False + if cur is None: + own_cur = True + _, cur = get_connection_and_cursor() + + cur.execute( + """DELETE FROM wcs_search_tokens + WHERE token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms) + AND token NOT IN (SELECT unnest(tsvector_to_array(fts)) FROM wcs_all_forms);""" + ) + if own_cur: + cur.close() + + class SqlMixin: _table_name = None _numerical_id = True @@ -4796,7 +4955,6 @@ class SearchableFormDef(SqlMixin): % (cls._table_name, cls._table_name) ) cls.do_indexes(cur) - cur.close() from wcs.carddef import CardDef from wcs.formdef import FormDef @@ -4805,6 +4963,8 @@ class SearchableFormDef(SqlMixin): CardDef.select(ignore_errors=True), FormDef.select(ignore_errors=True) ): cls.update(obj=objectdef) + init_search_tokens(cur) + cur.close() @classmethod def update(cls, obj=None, removed_obj_type=None, removed_obj_id=None): @@ -4842,7 +5002,7 @@ class SearchableFormDef(SqlMixin): def search(cls, obj_type, string): _, cur = get_connection_and_cursor() cur.execute( - 'SELECT object_id FROM searchable_formdefs WHERE fts @@ plainto_tsquery(%s)', + 'SELECT object_id FROM searchable_formdefs WHERE fts @@ wcs_tsquery(%s)', (FtsMatch.get_fts_value(string),), ) ids = [x[0] for x in cur.fetchall()] @@ -5107,7 +5267,7 @@ def get_period_total( # latest migration, number + description (description is not used # programmaticaly but will make sure git conflicts if two migrations are # separately added with the same number) -SQL_LEVEL = (105, 'change test result json structure') +SQL_LEVEL = (106, 'improved fts method') def migrate_global_views(conn, cur): @@ -5440,6 +5600,10 @@ def migrate(): for formdef in FormDef.select() + CardDef.select(): do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False) + if sql_level < 106: + # 106: new fts mechanism with tokens table + init_search_tokens() + if sql_level != SQL_LEVEL[0]: cur.execute( '''UPDATE wcs_meta SET value = %s, updated_at=NOW() WHERE key = %s''', diff --git a/wcs/sql_criterias.py b/wcs/sql_criterias.py index 37f189a87..d7d25ba40 100644 --- a/wcs/sql_criterias.py +++ b/wcs/sql_criterias.py @@ -373,6 +373,11 @@ class FtsMatch(Criteria): return 'fts @@ plainto_tsquery(%%(c%s)s)' % id(self.value) +class WcsFtsMatch(FtsMatch): + def as_sql(self): + return 'fts @@ wcs_tsquery(%%(c%s)s)' % id(self.value) + + class ElementEqual(Criteria): def __init__(self, attribute, key, value, **kwargs): super().__init__(attribute, value)