authentic/src/authentic2/custom_user/managers.py

131 lines
5.0 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import logging
from django.db import models, transaction
from django.utils import six
from django.utils import timezone
from django.contrib.auth.models import BaseUserManager
from authentic2 import app_settings
from authentic2.models import Attribute
class UserQuerySet(models.QuerySet):
def free_text_search(self, search):
terms = search.split()
if not terms:
return self
searchable_attributes = Attribute.objects.filter(searchable=True)
queries = []
for term in terms:
q = None
specific_queries = []
for a in searchable_attributes:
kind = a.get_kind()
free_text_search_function = kind.get('free_text_search')
if free_text_search_function:
q = free_text_search_function(term)
if q is not None:
specific_queries.append(q & models.query.Q(attribute_values__attribute=a))
# if the term is recognized by some specific attribute type, like a
# date, does not use the later generic matcher
if specific_queries:
queries.append(six.moves.reduce(models.query.Q.__or__, specific_queries))
continue
q = (
models.query.Q(username__icontains=term)
| models.query.Q(first_name__icontains=term)
| models.query.Q(last_name__icontains=term)
| models.query.Q(email__icontains=term)
)
for a in searchable_attributes:
if a.name in ('first_name', 'last_name'):
continue
q = q | models.query.Q(
attribute_values__content__icontains=term, attribute_values__attribute=a)
queries.append(q)
self = self.filter(six.moves.reduce(models.query.Q.__and__, queries))
# search by attributes can match multiple times
if searchable_attributes:
self = self.distinct()
return self
@transaction.atomic
def cleanup(self, threshold=600, timestamp=None):
'''Delete all deleted users for more than 10 minutes.'''
from .models import DeletedUser
not_after = (timestamp or timezone.now()) - datetime.timedelta(seconds=threshold)
qs = self.filter(deleted__lt=not_after)
loaded = list(qs)
def log():
logger = logging.getLogger('authentic2')
for user in loaded:
logger.info(u'deleted account %s', user)
transaction.on_commit(log)
deleted_users = []
for user in qs:
deleted_user = DeletedUser(deleted=user.deleted, old_user_id=user.id)
if 'email' in app_settings.A2_USER_DELETED_KEEP_DATA:
deleted_user.old_email = user.email.rsplit('#', 1)[0]
if 'uuid' in app_settings.A2_USER_DELETED_KEEP_DATA:
deleted_user.old_uuid = user.uuid
deleted_users.append(deleted_user)
DeletedUser.objects.bulk_create(deleted_users)
qs.delete()
class UserManager(BaseUserManager):
def _create_user(self, username, email, password,
is_staff, is_superuser, **extra_fields):
"""
Creates and saves a User with the given username, email and password.
"""
now = timezone.now()
if not username:
raise ValueError('The given username must be set')
email = self.normalize_email(email)
user = self.model(username=username, email=email,
is_staff=is_staff, is_active=True,
is_superuser=is_superuser, last_login=now,
date_joined=now, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_user(self, username, email=None, password=None, **extra_fields):
return self._create_user(username, email, password, False, False,
**extra_fields)
def create_superuser(self, username, email, password, **extra_fields):
return self._create_user(username, email, password, True, True,
**extra_fields)
def get_by_natural_key(self, uuid):
return self.get(uuid=uuid)