doublons-cd91: add authentic script

This commit is contained in:
Valentin Deniaud 2024-02-26 16:32:33 +01:00
parent 576ad4c171
commit a65b4719e8
2 changed files with 243 additions and 0 deletions

View File

@ -0,0 +1,89 @@
import collections
import json
import logging
import sys
from django.contrib.auth import get_user_model
from django.db import connection, transaction
from django.utils import timezone
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('authentic_fusion.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger.addHandler(ch)
User = get_user_model()
domain_url = ''
if hasattr(connection, 'tenant') and hasattr(connection.tenant, 'domain_url'):
domain_url = 'https://%s' % connection.tenant.domain_url
logger.info('=== Starting fusion at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
agent_users = User.objects.filter(email__icontains='@cd-essonne.fr', saml_identifiers__isnull=False).order_by(
'-last_login', 'first_name'
)
agent_users_by_email = collections.defaultdict(list)
for user in agent_users:
agent_users_by_email[user.email.lower()].append(user)
users_to_keep = []
for email, users in agent_users_by_email.items():
if len(users) == 1:
continue
roles = set()
for user in users:
roles.update(user.roles.all())
user_to_keep, users_to_disable = users[0], users[1:]
user_to_keep._roles_to_add = roles
user_to_keep._duplicated_users = users_to_disable
users_to_keep.append(user_to_keep)
def get_user_detail(user):
return f'{user.get_full_name()} {user.email} {user.uuid} {domain_url}{user.get_absolute_url()}'
def do_fusion(users):
disabled_users_uuid_by_user_uuid = collections.defaultdict(list)
for user in users:
logger.info('* Processing user %s', get_user_detail(user))
for role in sorted(user._roles_to_add, key=lambda x: x.name.lower()):
logger.info('Adding role %s', role)
user.roles.add(role)
for duplicated_user in user._duplicated_users:
logger.info('Disabling duplicate %s', get_user_detail(duplicated_user))
disabled_users_uuid_by_user_uuid[user.uuid].append(duplicated_user.uuid)
duplicated_user.mark_as_inactive(reason='Désactivation automatique des doublons')
result = json.dumps(disabled_users_uuid_by_user_uuid)
logger.info('Result %s', result)
with open('authentic_fusion_result.json', 'w') as f:
f.write(result)
try:
with transaction.atomic():
do_fusion(users_to_keep)
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
raise ValueError
logger.info('=== Success ===')
except ValueError:
logger.info('=== Did nothing ===')

View File

@ -0,0 +1,154 @@
import datetime
import pytest
from authentic2.a2_rbac.models import Role
from django.contrib.auth import get_user_model
from django.utils.timezone import now
from mellon.models import Issuer, UserSAMLIdentifier
from .utils import call_command
User = get_user_model()
@pytest.mark.freeze_time('2022-04-19 14:00')
def test_authentic_fusion(db, caplog):
role1 = Role.objects.create(name='role1')
role2 = Role.objects.create(name='role2')
role3 = Role.objects.create(name='role3')
# duplicated users, but not agents
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
# duplicated users, agents but no saml link
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
# agent with saml link, no duplicate
issuer = Issuer.objects.create(entity_id='https://idp1.example.com/', slug='idp1')
saml_user = User.objects.create(
first_name='Agent',
last_name='No duplicate',
email='agent.no.dup@cd-essonne.fr',
)
UserSAMLIdentifier.objects.create(user=saml_user, issuer=issuer, name_id='anodup')
# duplicated users, agents with saml link
saml_user_old_connection = User.objects.create(
id=42,
uuid='uuid:42/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='agent@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=10),
)
UserSAMLIdentifier.objects.create(user=saml_user_old_connection, issuer=issuer, name_id='aduplicated')
saml_user_old_connection.roles.add(role1)
saml_user_old_connection.roles.add(role2)
saml_user_old_connection_no_roles = User.objects.create(
id=43,
uuid='uuid:43/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='agent@cd-Essonne.fr',
last_login=now() - datetime.timedelta(days=5),
)
UserSAMLIdentifier.objects.create(
user=saml_user_old_connection_no_roles, issuer=issuer, name_id='Aduplicated'
)
saml_user_recent_connection = User.objects.create(
id=44,
uuid='uuid:44/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='Agent@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=1),
)
UserSAMLIdentifier.objects.create(user=saml_user_recent_connection, issuer=issuer, name_id='ADUPLICATED')
saml_user_old_connection.roles.add(role1)
saml_user_old_connection.roles.add(role3)
# again duplicated users, agents with saml link
saml_user_recent_connection_no_roles_2 = User.objects.create(
id=45,
uuid='uuid:45/agent2@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated 2',
email='agent2@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=5),
)
UserSAMLIdentifier.objects.create(
user=saml_user_recent_connection_no_roles_2, issuer=issuer, name_id='Aduplicated2'
)
saml_user_old_connection_2 = User.objects.create(
id=46,
uuid='uuid:46/agent2@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated 2',
email='agent2@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=10),
)
UserSAMLIdentifier.objects.create(user=saml_user_old_connection_2, issuer=issuer, name_id='aduplicated2')
saml_user_old_connection_2.roles.add(role1)
saml_user_old_connection_2.roles.add(role2)
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 10
call_command('runscript', 'tests/authentic_fusion.py')
log_messages = caplog.messages
assert log_messages == [
'=== Starting fusion at 2022-04-19T14:00:00 ===',
'* Processing user Agent Duplicated Agent@cd-essonne.fr uuid:44/agent@cd-essonne.fr /manage/users/44/',
'Adding role role1',
'Adding role role2',
'Adding role role3',
'Disabling duplicate Agent Duplicated agent@cd-Essonne.fr uuid:43/agent@cd-essonne.fr /manage/users/43/',
'Disabling duplicate Agent Duplicated agent@cd-essonne.fr uuid:42/agent@cd-essonne.fr /manage/users/42/',
'* Processing user Agent Duplicated 2 agent2@cd-essonne.fr uuid:45/agent2@cd-essonne.fr /manage/users/45/',
'Adding role role1',
'Adding role role2',
'Disabling duplicate Agent Duplicated 2 agent2@cd-essonne.fr uuid:46/agent2@cd-essonne.fr /manage/users/46/',
'Result {"uuid:44/agent@cd-essonne.fr": ["uuid:43/agent@cd-essonne.fr", "uuid:42/agent@cd-essonne.fr"], "uuid:45/agent2@cd-essonne.fr": ["uuid:46/agent2@cd-essonne.fr"]}',
'=== Did nothing ===',
]
# no changes in db
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 10
assert saml_user_recent_connection_no_roles_2.roles.count() == 0
caplog.clear()
call_command('runscript', 'tests/authentic_fusion.py', '--proceed=true')
assert log_messages[:-1] == caplog.messages[:-1]
assert caplog.messages[-1] == '=== Success ==='
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 7
assert User.objects.filter(email='normal.user@gmail.com', is_active=True).count() == 2
assert User.objects.filter(email='agent.no.saml@cd-essonne.fr', is_active=True).count() == 2
assert User.objects.filter(email='agent.no.dup@cd-essonne.fr', is_active=True).count() == 1
saml_user_old_connection.refresh_from_db()
assert saml_user_old_connection.is_active is False
saml_user_old_connection_no_roles.refresh_from_db()
assert saml_user_old_connection_no_roles.is_active is False
saml_user_recent_connection.refresh_from_db()
assert saml_user_recent_connection.is_active is True
assert set(saml_user_recent_connection.roles.all()) == {role1, role2, role3}
saml_user_old_connection_2.refresh_from_db()
assert saml_user_old_connection_2.is_active is False
saml_user_recent_connection_no_roles_2.refresh_from_db()
assert saml_user_recent_connection_no_roles_2.is_active is True
assert set(saml_user_recent_connection_no_roles_2.roles.all()) == {role1, role2}