authentic/src/authentic2/management/commands/clean-unused-accounts.py

142 lines
5.5 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import logging
import urllib.parse
from datetime import timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import F
from django.utils import timezone, translation
from authentic2 import app_settings
from authentic2.backends import get_user_queryset
from authentic2.backends.ldap_backend import LDAPBackend
from authentic2.utils import send_templated_mail
from django_rbac.utils import get_ou_model
logger = logging.getLogger(__name__)
User = get_user_model()
class Command(BaseCommand):
help = '''Clean unused accounts'''
verbosity_to_log_level = {
0: logging.CRITICAL,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG,
}
def add_arguments(self, parser):
parser.add_argument("--fake", action='store_true', help='do nothing', default=False)
def handle(self, *args, **options):
self.fake = options['fake']
# add StreamHandler for console output
handler = logging.StreamHandler()
handler.setLevel(level=self.verbosity_to_log_level[options['verbosity']])
logger.addHandler(handler)
# prevent logging to external logs when fake
if self.fake:
logger.propagate = False
self.now = timezone.now()
# exclude user from LDAP directories based on their source name (or realm)
realms = [block['realm'] for block in LDAPBackend.get_config() if block.get('realm')]
self.user_qs = (
get_user_queryset().exclude(oidc_account__isnull=False).exclude(userexternalid__source__in=realms)
)
translation.activate(settings.LANGUAGE_CODE)
try:
self.clean_unused_accounts()
except Exception:
logger.exception('clean-unused-accounts failed')
def clean_unused_accounts(self):
count = app_settings.A2_CLEAN_UNUSED_ACCOUNTS_MAX_MAIL_PER_PERIOD
for ou in get_ou_model().objects.filter(clean_unused_accounts_alert__isnull=False):
alert_delay = timedelta(days=ou.clean_unused_accounts_alert)
deletion_delay = timedelta(days=ou.clean_unused_accounts_deletion)
ou_users = self.user_qs.filter(ou=ou)
# reset last_account_deletion_alert for users which connected since last alert
active_users = ou_users.filter(last_login__gte=F('last_account_deletion_alert'))
active_users.update(last_account_deletion_alert=None)
inactive_users = ou_users.filter(last_login__lte=self.now - alert_delay)
# send first alert
inactive_users_first_alert = inactive_users.filter(last_account_deletion_alert__isnull=True)
days_to_deletion = ou.clean_unused_accounts_deletion - ou.clean_unused_accounts_alert
for user in inactive_users_first_alert[:count]:
logger.info('%s last login %d days ago, sending alert', user, ou.clean_unused_accounts_alert)
self.send_alert(user, days_to_deletion)
inactive_users_to_delete = inactive_users.filter(
last_login__lte=self.now - deletion_delay,
# ensure respect of alert delay before deletion
last_account_deletion_alert__lte=self.now - (deletion_delay - alert_delay),
)
for user in inactive_users_to_delete[:count]:
logger.info(
'%s last login more than %d days ago, deleting user',
user,
ou.clean_unused_accounts_deletion,
)
self.delete_user(user)
def send_alert(self, user, days_to_deletion):
ctx = {
'user': user,
'days_to_deletion': days_to_deletion,
'login_url': urllib.parse.urljoin(settings.SITE_BASE_URL, settings.LOGIN_URL),
}
with transaction.atomic():
if not self.fake:
User.objects.filter(pk=user.pk).update(last_account_deletion_alert=self.now)
self.send_mail('authentic2/unused_account_alert', user, ctx)
def send_mail(self, prefix, user, ctx):
if not user.email:
logger.debug('%s has no email, no mail sent', user)
else:
logger.debug('sending mail to %s', user.email)
if not self.fake:
email = user.email
def send_mail():
send_templated_mail(email, prefix, ctx)
transaction.on_commit(send_mail)
def delete_user(self, user):
ctx = {'user': user}
with transaction.atomic():
self.send_mail('authentic2/unused_account_delete', user, ctx)
if not self.fake:
user.delete()