170 lines
6.7 KiB
Python
170 lines
6.7 KiB
Python
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import logging
|
|
import unicodedata
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.postgres.search import TrigramDistance
|
|
from django.db import models, transaction, connection
|
|
from django.db.models import F, Value, FloatField, Subquery, OuterRef
|
|
from django.db.models.functions import Lower, Coalesce
|
|
from django.utils import six
|
|
from django.utils import timezone
|
|
from django.contrib.auth.models import BaseUserManager
|
|
|
|
from authentic2 import app_settings
|
|
from authentic2.models import Attribute, AttributeValue
|
|
from authentic2.utils.lookups import Unaccent, ImmutableConcat
|
|
|
|
|
|
class UserQuerySet(models.QuerySet):
|
|
|
|
def free_text_search(self, search):
|
|
terms = search.split()
|
|
|
|
if not terms:
|
|
return self
|
|
|
|
searchable_attributes = Attribute.objects.filter(searchable=True)
|
|
queries = []
|
|
for term in terms:
|
|
q = None
|
|
|
|
specific_queries = []
|
|
for a in searchable_attributes:
|
|
kind = a.get_kind()
|
|
free_text_search_function = kind.get('free_text_search')
|
|
if free_text_search_function:
|
|
q = free_text_search_function(term)
|
|
if q is not None:
|
|
specific_queries.append(q & models.query.Q(attribute_values__attribute=a))
|
|
|
|
# if the term is recognized by some specific attribute type, like a
|
|
# date, does not use the later generic matcher
|
|
if specific_queries:
|
|
queries.append(six.moves.reduce(models.query.Q.__or__, specific_queries))
|
|
continue
|
|
|
|
q = (
|
|
models.query.Q(username__icontains=term)
|
|
| models.query.Q(first_name__icontains=term)
|
|
| models.query.Q(last_name__icontains=term)
|
|
| models.query.Q(email__icontains=term)
|
|
)
|
|
for a in searchable_attributes:
|
|
if a.name in ('first_name', 'last_name'):
|
|
continue
|
|
q = q | models.query.Q(
|
|
attribute_values__content__icontains=term, attribute_values__attribute=a)
|
|
queries.append(q)
|
|
self = self.filter(six.moves.reduce(models.query.Q.__and__, queries))
|
|
# search by attributes can match multiple times
|
|
if searchable_attributes:
|
|
self = self.distinct()
|
|
return self
|
|
|
|
def find_duplicates(self, first_name, last_name, birthdate=None):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"SET pg_trgm.similarity_threshold = %f" % app_settings.A2_DUPLICATES_THRESHOLD
|
|
)
|
|
|
|
name = '%s %s' % (first_name, last_name)
|
|
name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode('ascii').lower()
|
|
|
|
qs = self.filter(deleted__isnull=True)
|
|
qs = qs.annotate(name=Lower(Unaccent(ImmutableConcat('first_name', Value(' '), 'last_name'))))
|
|
qs = qs.filter(name__trigram_similar=name)
|
|
qs = qs.annotate(dist=TrigramDistance('name', name))
|
|
qs = qs.order_by('dist')
|
|
qs = qs[:5]
|
|
|
|
# alter distance according to additionnal parameters
|
|
if birthdate:
|
|
bonus = app_settings.A2_DUPLICATES_BIRTHDATE_BONUS
|
|
content_type = ContentType.objects.get_for_model(self.model)
|
|
same_birthdate = AttributeValue.objects.filter(
|
|
object_id=OuterRef('pk'),
|
|
content_type=content_type,
|
|
attribute__kind='birthdate',
|
|
content=birthdate
|
|
).annotate(bonus=Value(1 - bonus, output_field=FloatField()))
|
|
qs = qs.annotate(dist=Coalesce(
|
|
Subquery(same_birthdate.values('bonus'), output_field=FloatField()) * F('dist'),
|
|
F('dist')
|
|
))
|
|
|
|
return qs
|
|
|
|
@transaction.atomic
|
|
def cleanup(self, threshold=600, timestamp=None):
|
|
'''Delete all deleted users for more than 10 minutes.'''
|
|
from .models import DeletedUser
|
|
|
|
not_after = (timestamp or timezone.now()) - datetime.timedelta(seconds=threshold)
|
|
qs = self.filter(deleted__lt=not_after)
|
|
|
|
loaded = list(qs)
|
|
|
|
def log():
|
|
logger = logging.getLogger('authentic2')
|
|
for user in loaded:
|
|
logger.info(u'deleted account %s', user)
|
|
transaction.on_commit(log)
|
|
deleted_users = []
|
|
for user in qs:
|
|
deleted_user = DeletedUser(deleted=user.deleted, old_user_id=user.id)
|
|
if 'email' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
|
deleted_user.old_email = user.email.rsplit('#', 1)[0]
|
|
if 'uuid' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
|
deleted_user.old_uuid = user.uuid
|
|
deleted_users.append(deleted_user)
|
|
DeletedUser.objects.bulk_create(deleted_users)
|
|
qs.delete()
|
|
|
|
|
|
class UserManager(BaseUserManager):
|
|
|
|
def _create_user(self, username, email, password,
|
|
is_staff, is_superuser, **extra_fields):
|
|
"""
|
|
Creates and saves a User with the given username, email and password.
|
|
"""
|
|
now = timezone.now()
|
|
if not username:
|
|
raise ValueError('The given username must be set')
|
|
email = self.normalize_email(email)
|
|
user = self.model(username=username, email=email,
|
|
is_staff=is_staff, is_active=True,
|
|
is_superuser=is_superuser, last_login=now,
|
|
date_joined=now, **extra_fields)
|
|
user.set_password(password)
|
|
user.save(using=self._db)
|
|
return user
|
|
|
|
def create_user(self, username, email=None, password=None, **extra_fields):
|
|
return self._create_user(username, email, password, False, False,
|
|
**extra_fields)
|
|
|
|
def create_superuser(self, username, email, password, **extra_fields):
|
|
return self._create_user(username, email, password, True, True,
|
|
**extra_fields)
|
|
|
|
def get_by_natural_key(self, uuid):
|
|
return self.get(uuid=uuid)
|