misc-vdeniaud/scripts/doublons-cd91/authentic_fusion_local.py

108 lines
3.2 KiB
Python

import collections
import json
import logging
import sys
from django.contrib.auth import get_user_model
from django.db import connection, transaction
from django.utils import timezone
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('authentic_fusion.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger.addHandler(ch)
User = get_user_model()
domain_url = ''
if hasattr(connection, 'tenant') and hasattr(connection.tenant, 'domain_url'):
domain_url = 'https://%s' % connection.tenant.domain_url
logger.info('=== Starting fusion at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
agent_users = (
User.objects.filter(email__icontains='@cd-essonne.fr', is_active=True)
.distinct()
.order_by('first_name')
.prefetch_related('saml_identifiers')
)
agent_users_by_email = collections.defaultdict(list)
for user in agent_users:
agent_users_by_email[user.email.lower()].append(user)
def get_user_detail(user):
return f'{user.get_full_name()} {user.email} {user.uuid} {domain_url}{user.get_absolute_url()}'
users_to_keep = []
for email, users in agent_users_by_email.items():
if len(users) == 1:
continue
if len(users) == 3:
logger.info('* SKIPPING USER %s (more than 2 duplicates)', get_user_detail(users[0]))
continue
if users[0].saml_identifiers.all() and users[1].saml_identifiers.all():
logger.info('* SKIPPING USER %s (duplicates are both saml accounts)', get_user_detail(users[0]))
continue
if not users[0].saml_identifiers.all() and not users[1].saml_identifiers.all():
logger.info('* SKIPPING USER %s (duplicates are both local accounts)', get_user_detail(users[0]))
continue
roles = {}
for user in users:
for role in user.roles.all():
roles[role.id] = role
user_to_keep = [x for x in users if x.saml_identifiers.all()][0]
users_to_disable = [x for x in users if not x.saml_identifiers.all()]
user_to_keep._roles_to_add = roles
user_to_keep._duplicated_users = users_to_disable
users_to_keep.append(user_to_keep)
def do_fusion(users):
disabled_users_uuid_by_user_uuid = collections.defaultdict(list)
for user in users:
logger.info('* Processing user %s', get_user_detail(user))
for role in sorted(user._roles_to_add.values(), key=lambda x: x.name.lower()):
logger.info('Adding role %s', role)
user.roles.add(role)
for duplicated_user in user._duplicated_users:
logger.info('Disabling duplicate %s', get_user_detail(duplicated_user))
disabled_users_uuid_by_user_uuid[user.uuid].append(duplicated_user.uuid)
duplicated_user.mark_as_inactive(reason='Désactivation automatique des doublons')
result = json.dumps(disabled_users_uuid_by_user_uuid)
logger.info('Result %s', result)
with open('authentic_fusion_result.json', 'w') as f:
f.write(result)
try:
with transaction.atomic():
do_fusion(users_to_keep)
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
raise ValueError
logger.info('=== Success ===')
except ValueError:
logger.info('=== Did nothing ===')