authentic/src/authentic2/management/commands/clean-unused-accounts.py

114 lines
4.3 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import logging
import smtplib
from datetime import timedelta
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
from django.utils import timezone
from django.utils.six.moves.urllib import parse as urlparse
from django_rbac.utils import get_ou_model
from authentic2.models import DeletedUser
from authentic2.utils import send_templated_mail
from django.conf import settings
logger = logging.getLogger(__name__)
User = get_user_model()
class Command(BaseCommand):
help = '''Clean unused accounts'''
def add_arguments(self, parser):
parser.add_argument("--fake", action='store_true', help='do nothing', default=False)
def handle(self, *args, **options):
if options['verbosity'] == '0':
logger.setLevel(level=logging.CRITICAL)
if options['verbosity'] == '1':
logger.setLevel(level=logging.WARNING)
elif options['verbosity'] == '2':
logger.setLevel(level=logging.INFO)
elif options['verbosity'] == '3':
logger.setLevel(level=logging.DEBUG)
self.fake = options['fake']
self.clean_unused_accounts()
def clean_unused_accounts(self):
now = timezone.now()
if self.fake:
logger.info('fake call to clean-unused-accounts')
for ou in get_ou_model().objects.filter(clean_unused_accounts_alert__isnull=False):
alert_delay = timedelta(days=ou.clean_unused_accounts_alert)
deletion_delay = timedelta(days=ou.clean_unused_accounts_deletion)
users = User.objects.filter(ou=ou, last_login__lte=now - alert_delay)
for user in users.filter(last_account_deletion_alert__isnull=True):
logger.info('%s last login %d days ago, sending alert', user, ou.clean_unused_accounts_alert)
self.send_alert(user)
to_delete = users.filter(
last_login__lte=now - deletion_delay,
# ensure respect of alert delay before deletion
last_account_deletion_alert__lte=now - (deletion_delay - alert_delay)
)
for user in to_delete:
logger.info(
'%s last login more than %d days ago, deleting user', user,
ou.clean_unused_accounts_deletion)
self.delete_user(user)
def send_alert(self, user):
days_to_deletion = user.ou.clean_unused_accounts_deletion - user.ou.clean_unused_accounts_alert
ctx = {
'user': user,
'days_to_deletion': days_to_deletion,
'login_url': urlparse.urljoin(settings.SITE_BASE_URL, settings.LOGIN_URL),
}
try:
self.send_mail('authentic2/unused_account_alert', user, ctx)
except smtplib.SMTPException as e:
logger.exception('email sending failure: %s', e)
else:
if not self.fake:
user.last_account_deletion_alert = timezone.now()
user.save()
def send_mail(self, prefix, user, ctx):
if not user.email:
logger.debug('%s has no email, no mail sent', user)
if not self.fake:
logger.debug('sending mail to %s', user.email)
send_templated_mail(user.email, prefix, ctx)
def delete_user(self, user):
ctx = {'user': user}
try:
self.send_mail('authentic2/unused_account_delete', user, ctx)
except smtplib.SMTPException as e:
logger.exception('email sending failure: %s', e)
if not self.fake:
DeletedUser.objects.delete_user(user)