custom_user: search email by subtring or trigram in FTS search (#50732)

This commit is contained in:
Benjamin Dauvergne 2021-02-01 15:50:20 +01:00
parent 62654a29a7
commit e45f693512
3 changed files with 78 additions and 1 deletions

View File

@ -47,10 +47,18 @@ class UserQuerySet(models.QuerySet):
return wrap_qs(self.none())
if '@' in search and len(search.split()) == 1:
qs = self.filter(email__iexact=search).order_by('last_name', 'first_name')
qs = self.filter(email__icontains=search).order_by(Unaccent('last_name'), Unaccent('first_name'))
if qs.exists():
return wrap_qs(qs)
# not match, search by trigrams
qs = self.annotate(lower_email=Lower('email'))
value = Lower(Value(search))
qs = qs.filter(lower_email__trigram_similar=value)
qs = qs.annotate(dist=TrigramDistance('lower_email', value))
qs = qs.order_by('dist', 'last_name', 'first_name')
return qs
try:
guid = uuid.UUID(search)
except ValueError:

View File

@ -0,0 +1,66 @@
from django.db import migrations, transaction
from django.db.migrations.operations.base import Operation
from django.db.utils import InternalError, OperationalError, ProgrammingError
class SafeExtensionOperation(Operation):
reversible = True
def state_forwards(self, app_label, state):
pass
def database_forwards(self, app_label, schema_editor, from_state, to_state):
if schema_editor.connection.vendor != 'postgresql':
return
try:
with transaction.atomic():
try:
schema_editor.execute('CREATE EXTENSION IF NOT EXISTS %s SCHEMA public' % self.name)
except (OperationalError, ProgrammingError):
# OperationalError if the extension is not available
# ProgrammingError in case of denied permission
RunSQLIfExtension.extensions_installed = False
except InternalError:
# InternalError (current transaction is aborted, commands ignored
# until end of transaction block) would be raised when django-
# tenant-schemas set search_path.
RunSQLIfExtension.extensions_installed = False
def database_backwards(self, app_label, schema_editor, from_state, to_state):
try:
with transaction.atomic():
schema_editor.execute('DROP EXTENSION IF EXISTS %s' % self.name)
except InternalError:
# Raised when other objects depend on the extension. This happens in a multitenant
# context, where extension in installed in schema "public" but referenced in others (via
# public.gist_trgm_ops). In this case, do nothing, as the query should be successful
# when last tenant is processed.
pass
class RunSQLIfExtension(migrations.RunSQL):
extensions_installed = True
def __getattribute__(self, name):
if name == 'sql' and not self.extensions_installed:
return migrations.RunSQL.noop
return object.__getattribute__(self, name)
class TrigramExtension(SafeExtensionOperation):
name = 'pg_trgm'
class Migration(migrations.Migration):
dependencies = [
('custom_user', '0023_index_username'),
]
operations = [
TrigramExtension(),
RunSQLIfExtension(
sql=["CREATE INDEX IF NOT EXISTS custom_user_user_email_trgm_idx ON custom_user_user USING gist "
"(LOWER(email) public.gist_trgm_ops)"],
reverse_sql=['DROP INDEX custom_user_user_email_trgm_idx'],
),
]

View File

@ -114,7 +114,10 @@ def test_fts_dob(fts):
def test_fts_email(fts):
assert User.objects.free_text_search('jean.darmette@example.net').count() == 1
assert User.objects.free_text_search('jean.damrette@example.net').count() == 2 # no exact match, lookup by trigrams
assert User.objects.free_text_search('micheline.darmette@example.net').count() == 1
assert User.objects.free_text_search('@example.net').count() == 2
assert User.objects.free_text_search('@example').count() == 2
def test_fts_username(fts):