Compare commits

...

4 Commits

9 changed files with 501 additions and 12 deletions

36
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,36 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: double-quote-string-fixer
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
args: ['--target-version', 'py39', '--skip-string-normalization', '--line-length', '110']
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args: ['--profile', 'black', '--line-length', '110']
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
hooks:
- id: pyupgrade
args: ['--keep-percent-format', '--py39-plus']
- repo: https://github.com/adamchainz/django-upgrade
rev: 1.13.0
hooks:
- id: django-upgrade
args: ['--target-version', '3.2']
- repo: https://github.com/rtts/djhtml
rev: '3.0.5'
hooks:
- id: djhtml
args: ['--tabwidth', '2']
- repo: https://git.entrouvert.org/pre-commit-debian.git
rev: v0.3
hooks:
- id: pre-commit-debian

View File

@ -1,9 +1,8 @@
import logging
from django.contrib.contenttypes.models import ContentType
from authentic2.a2_rbac.models import Role
from authentic2.apps.authenticators.models import BaseAuthenticator
from django.contrib.contenttypes.models import ContentType
logger = logging.getLogger(__name__)

View File

@ -1,12 +1,11 @@
from django.db import connection
from chrono.agendas.models import Event
from django.db import connection
from django.db.models import Count
duplicated_events = (
Event.objects.values("primary_event", "start_datetime")
.annotate(count=Count("id"))
.values("pk")
Event.objects.values('primary_event', 'start_datetime')
.annotate(count=Count('id'))
.values('pk')
.order_by()
.filter(count__gt=1)
)

View File

@ -1,7 +1,5 @@
from django.db import connection
from chrono.agendas.models import TimePeriod
from django.db import connection
periods = TimePeriod.objects.filter(agenda__isnull=False, weekday_indexes__isnull=False)
if periods.exists():

View File

@ -1,5 +1,5 @@
from django.conf import settings
from combo.apps.dataviz.models import ChartNgCell
from django.conf import settings
base_url = settings.SITE_BASE_URL
qs = ChartNgCell.objects.filter(
@ -12,6 +12,9 @@ for cell in qs:
continue
transpose = bool(chart.axis_count == 2)
if transpose:
print('Switch to "table inverted" on cell %s from page %s' % (cell.pk, settings.SITE_BASE_URL + cell.page.get_online_url()))
print(
'Switch to "table inverted" on cell %s from page %s'
% (cell.pk, settings.SITE_BASE_URL + cell.page.get_online_url())
)
cell.chart_type = 'table-inverted'
cell.save()

View File

@ -0,0 +1,92 @@
import collections
import json
import logging
import sys
from django.contrib.auth import get_user_model
from django.db import connection, transaction
from django.utils import timezone
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('authentic_fusion.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger.addHandler(ch)
User = get_user_model()
domain_url = ''
if hasattr(connection, 'tenant') and hasattr(connection.tenant, 'domain_url'):
domain_url = 'https://%s' % connection.tenant.domain_url
logger.info('=== Starting fusion at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
agent_users = (
User.objects.filter(email__icontains='@cd-essonne.fr', saml_identifiers__isnull=False)
.distinct()
.order_by('-last_login', 'first_name')
)
agent_users_by_email = collections.defaultdict(list)
for user in agent_users:
agent_users_by_email[user.email.lower()].append(user)
users_to_keep = []
for email, users in agent_users_by_email.items():
if len(users) == 1:
continue
roles = {}
for user in users:
for role in user.roles.all():
roles[role.id] = role
user_to_keep, users_to_disable = users[0], users[1:]
user_to_keep._roles_to_add = roles
user_to_keep._duplicated_users = users_to_disable
users_to_keep.append(user_to_keep)
def get_user_detail(user):
return f'{user.get_full_name()} {user.email} {user.uuid} {domain_url}{user.get_absolute_url()}'
def do_fusion(users):
disabled_users_uuid_by_user_uuid = collections.defaultdict(list)
for user in users:
logger.info('* Processing user %s', get_user_detail(user))
for role in sorted(user._roles_to_add.values(), key=lambda x: x.name.lower()):
logger.info('Adding role %s', role)
user.roles.add(role)
for duplicated_user in user._duplicated_users:
logger.info('Disabling duplicate %s', get_user_detail(duplicated_user))
disabled_users_uuid_by_user_uuid[user.uuid].append(duplicated_user.uuid)
duplicated_user.mark_as_inactive(reason='Désactivation automatique des doublons')
result = json.dumps(disabled_users_uuid_by_user_uuid)
logger.info('Result %s', result)
with open('authentic_fusion_result.json', 'w') as f:
f.write(result)
try:
with transaction.atomic():
do_fusion(users_to_keep)
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
raise ValueError
logger.info('=== Success ===')
except ValueError:
logger.info('=== Did nothing ===')

View File

@ -0,0 +1,154 @@
import datetime
import pytest
from authentic2.a2_rbac.models import Role
from django.contrib.auth import get_user_model
from django.utils.timezone import now
from mellon.models import Issuer, UserSAMLIdentifier
from .utils import call_command
User = get_user_model()
@pytest.mark.freeze_time('2022-04-19 14:00')
def test_authentic_fusion(db, caplog):
role1 = Role.objects.create(name='role1')
role2 = Role.objects.create(name='role2')
role3 = Role.objects.create(name='role3')
# duplicated users, but not agents
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
# duplicated users, agents but no saml link
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
# agent with saml link, no duplicate
issuer = Issuer.objects.create(entity_id='https://idp1.example.com/', slug='idp1')
saml_user = User.objects.create(
first_name='Agent',
last_name='No duplicate',
email='agent.no.dup@cd-essonne.fr',
)
UserSAMLIdentifier.objects.create(user=saml_user, issuer=issuer, name_id='anodup')
# duplicated users, agents with saml link
saml_user_old_connection = User.objects.create(
id=42,
uuid='uuid:42/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='agent@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=10),
)
UserSAMLIdentifier.objects.create(user=saml_user_old_connection, issuer=issuer, name_id='aduplicated')
saml_user_old_connection.roles.add(role1)
saml_user_old_connection.roles.add(role2)
saml_user_old_connection_no_roles = User.objects.create(
id=43,
uuid='uuid:43/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='agent@cd-Essonne.fr',
last_login=now() - datetime.timedelta(days=5),
)
UserSAMLIdentifier.objects.create(
user=saml_user_old_connection_no_roles, issuer=issuer, name_id='Aduplicated'
)
saml_user_recent_connection = User.objects.create(
id=44,
uuid='uuid:44/agent@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated',
email='Agent@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=1),
)
UserSAMLIdentifier.objects.create(user=saml_user_recent_connection, issuer=issuer, name_id='ADUPLICATED')
saml_user_old_connection.roles.add(role1)
saml_user_old_connection.roles.add(role3)
# again duplicated users, agents with saml link
saml_user_recent_connection_no_roles_2 = User.objects.create(
id=45,
uuid='uuid:45/agent2@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated 2',
email='agent2@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=5),
)
UserSAMLIdentifier.objects.create(
user=saml_user_recent_connection_no_roles_2, issuer=issuer, name_id='Aduplicated2'
)
saml_user_old_connection_2 = User.objects.create(
id=46,
uuid='uuid:46/agent2@cd-essonne.fr',
first_name='Agent',
last_name='Duplicated 2',
email='agent2@cd-essonne.fr',
last_login=now() - datetime.timedelta(days=10),
)
UserSAMLIdentifier.objects.create(user=saml_user_old_connection_2, issuer=issuer, name_id='aduplicated2')
saml_user_old_connection_2.roles.add(role1)
saml_user_old_connection_2.roles.add(role2)
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 10
call_command('runscript', 'tests/authentic_fusion.py')
log_messages = caplog.messages
assert log_messages == [
'=== Starting fusion at 2022-04-19T14:00:00 ===',
'* Processing user Agent Duplicated Agent@cd-essonne.fr uuid:44/agent@cd-essonne.fr /manage/users/44/',
'Adding role role1',
'Adding role role2',
'Adding role role3',
'Disabling duplicate Agent Duplicated agent@cd-Essonne.fr uuid:43/agent@cd-essonne.fr /manage/users/43/',
'Disabling duplicate Agent Duplicated agent@cd-essonne.fr uuid:42/agent@cd-essonne.fr /manage/users/42/',
'* Processing user Agent Duplicated 2 agent2@cd-essonne.fr uuid:45/agent2@cd-essonne.fr /manage/users/45/',
'Adding role role1',
'Adding role role2',
'Disabling duplicate Agent Duplicated 2 agent2@cd-essonne.fr uuid:46/agent2@cd-essonne.fr /manage/users/46/',
'Result {"uuid:44/agent@cd-essonne.fr": ["uuid:43/agent@cd-essonne.fr", "uuid:42/agent@cd-essonne.fr"], "uuid:45/agent2@cd-essonne.fr": ["uuid:46/agent2@cd-essonne.fr"]}',
'=== Did nothing ===',
]
# no changes in db
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 10
assert saml_user_recent_connection_no_roles_2.roles.count() == 0
caplog.clear()
call_command('runscript', 'tests/authentic_fusion.py', '--proceed=true')
assert log_messages[:-1] == caplog.messages[:-1]
assert caplog.messages[-1] == '=== Success ==='
assert User.objects.count() == 10
assert User.objects.filter(is_active=True).count() == 7
assert User.objects.filter(email='normal.user@gmail.com', is_active=True).count() == 2
assert User.objects.filter(email='agent.no.saml@cd-essonne.fr', is_active=True).count() == 2
assert User.objects.filter(email='agent.no.dup@cd-essonne.fr', is_active=True).count() == 1
saml_user_old_connection.refresh_from_db()
assert saml_user_old_connection.is_active is False
saml_user_old_connection_no_roles.refresh_from_db()
assert saml_user_old_connection_no_roles.is_active is False
saml_user_recent_connection.refresh_from_db()
assert saml_user_recent_connection.is_active is True
assert set(saml_user_recent_connection.roles.all()) == {role1, role2, role3}
saml_user_old_connection_2.refresh_from_db()
assert saml_user_old_connection_2.is_active is False
saml_user_recent_connection_no_roles_2.refresh_from_db()
assert saml_user_recent_connection_no_roles_2.is_active is True
assert set(saml_user_recent_connection_no_roles_2.roles.all()) == {role1, role2}

View File

@ -0,0 +1,134 @@
import json
import pytest
from django.core.management import call_command
from wcs import sql
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.storage import Equal
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub():
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.mark.freeze_time('2022-04-19 14:00')
def test_fusion(pub, caplog):
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
# normal user
normal_user = pub.user_class()
normal_user.name = 'Normal user'
normal_user.email = 'normal-user@gmail.com'
normal_user.name_identifiers = ['a' * 32]
normal_user.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = normal_user.id
formdata.store()
# duplicated user
duplicated_user_no_forms = pub.user_class()
duplicated_user_no_forms.name = 'Duplicated user no forms'
duplicated_user_no_forms.email = 'duplicated-user@cd-essonne.fr'
duplicated_user_no_forms.name_identifiers = ['b' * 32]
duplicated_user_no_forms.store()
duplicated_user_with_form = pub.user_class()
duplicated_user_with_form.name = 'Duplicated user with forms'
duplicated_user_with_form.email = 'duplicated-user@cd-essonne.fr'
duplicated_user_with_form.name_identifiers = ['c' * 32]
duplicated_user_with_form.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form.id
formdata.store()
# another duplicated user
duplicated_user_with_form_2 = pub.user_class()
duplicated_user_with_form_2.name = 'Duplicated user with forms 2'
duplicated_user_with_form_2.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_2.name_identifiers = ['d' * 32]
duplicated_user_with_form_2.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_2.id
formdata.store()
duplicated_user_with_form_21 = pub.user_class()
duplicated_user_with_form_21.name = 'Duplicated user with forms 2'
duplicated_user_with_form_21.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_21.name_identifiers = ['e' * 32]
duplicated_user_with_form_21.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_21.id
formdata.store()
duplicated_user_with_form_22 = pub.user_class()
duplicated_user_with_form_22.name = 'Duplicated user with forms 2'
duplicated_user_with_form_22.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_22.name_identifiers = ['f' * 32]
duplicated_user_with_form_22.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_22.id
formdata.store()
authentic_fusion_result = {
duplicated_user_no_forms.name_identifiers[0]: [duplicated_user_with_form.name_identifiers[0]],
duplicated_user_with_form_2.name_identifiers[0]: [
duplicated_user_with_form_21.name_identifiers[0],
duplicated_user_with_form_22.name_identifiers[0],
],
}
with open('authentic_fusion_result.json', 'w') as f:
f.write(json.dumps(authentic_fusion_result))
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
call_command('runscript', 'tests/wcs_fusion.py')
# no changes in db
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
log_messages = caplog.messages
assert log_messages == [
'=== Starting form reattachment at 2022-04-19T14:00:00 ===',
'Attaching form 1-2 from user 3 to user 2 (Duplicated user no forms)',
'Attaching form 1-4 from user 5 to user 4 (Duplicated user with forms 2)',
'Attaching form 1-5 from user 6 to user 4 (Duplicated user with forms 2)',
'=== Did nothing ===',
]
caplog.clear()
call_command('runscript', 'tests/wcs_fusion.py', '--proceed=true')
assert log_messages[:-1] == caplog.messages[:-1]
assert caplog.messages[-1] == '=== Success ==='
assert sql.AnyFormData.count([Equal('user_id', str(normal_user.id))]) == 1
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 1
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form.id))]) == 0
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_2.id))]) == 3
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_21.id))]) == 0
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_22.id))]) == 0

View File

@ -0,0 +1,74 @@
import collections
import json
import logging
import sys
from django.utils import timezone
from quixote import get_publisher
from wcs import sql
from wcs.sql_criterias import Equal
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('wcs_fusion.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('=== Starting form reattachment at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
with open('authentic_fusion_result.json') as f:
disabled_users_map = json.loads(f.read())
def get_user_by_uuid(uuid):
users = get_publisher().user_class.get_users_with_name_identifier(uuid)
if len(users) == 0:
raise ValueError('no user found for uuid %s' % uuid)
if len(users) > 1:
raise ValueError('multiple users found for uuid %s' % uuid)
return users[0]
disabled_users_by_user = collections.defaultdict(list)
for user_uuid, disabled_user_uuids in disabled_users_map.items():
user = get_user_by_uuid(user_uuid)
for disabled_user_uuid in disabled_user_uuids:
disabled_users_by_user[user].append(get_user_by_uuid(disabled_user_uuid))
forms_by_user = collections.defaultdict(list)
for user, disabled_users in disabled_users_by_user.items():
for disabled_user in disabled_users:
forms_by_user[user].extend(sql.AnyFormData.select([Equal('user_id', str(disabled_user.id))]))
def attach_forms_to_user(forms_by_user):
for user, forms in forms_by_user.items():
for form in forms:
logger.info(
'Attaching form %s from user %s to user %s (%s)', form.id_display, form.user_id, user.id, user
)
form.user_id = user.id
form.store()
try:
with sql.atomic():
attach_forms_to_user(forms_by_user)
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
raise ValueError
logger.info('=== Success ===')
except ValueError:
logger.info('=== Did nothing ===')