doublons-cd91: add wcs script

This commit is contained in:
Valentin Deniaud 2024-02-27 14:41:57 +01:00
parent a65b4719e8
commit aeba142aeb
2 changed files with 208 additions and 0 deletions

View File

@ -0,0 +1,134 @@
import json
import pytest
from django.core.management import call_command
from wcs import sql
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.storage import Equal
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub():
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
@pytest.mark.freeze_time('2022-04-19 14:00')
def test_fusion(pub, caplog):
formdef = FormDef()
formdef.name = 'test title'
formdef.store()
# normal user
normal_user = pub.user_class()
normal_user.name = 'Normal user'
normal_user.email = 'normal-user@gmail.com'
normal_user.name_identifiers = ['a' * 32]
normal_user.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = normal_user.id
formdata.store()
# duplicated user
duplicated_user_no_forms = pub.user_class()
duplicated_user_no_forms.name = 'Duplicated user no forms'
duplicated_user_no_forms.email = 'duplicated-user@cd-essonne.fr'
duplicated_user_no_forms.name_identifiers = ['b' * 32]
duplicated_user_no_forms.store()
duplicated_user_with_form = pub.user_class()
duplicated_user_with_form.name = 'Duplicated user with forms'
duplicated_user_with_form.email = 'duplicated-user@cd-essonne.fr'
duplicated_user_with_form.name_identifiers = ['c' * 32]
duplicated_user_with_form.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form.id
formdata.store()
# another duplicated user
duplicated_user_with_form_2 = pub.user_class()
duplicated_user_with_form_2.name = 'Duplicated user with forms 2'
duplicated_user_with_form_2.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_2.name_identifiers = ['d' * 32]
duplicated_user_with_form_2.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_2.id
formdata.store()
duplicated_user_with_form_21 = pub.user_class()
duplicated_user_with_form_21.name = 'Duplicated user with forms 2'
duplicated_user_with_form_21.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_21.name_identifiers = ['e' * 32]
duplicated_user_with_form_21.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_21.id
formdata.store()
duplicated_user_with_form_22 = pub.user_class()
duplicated_user_with_form_22.name = 'Duplicated user with forms 2'
duplicated_user_with_form_22.email = 'duplicated-user-2@cd-essonne.fr'
duplicated_user_with_form_22.name_identifiers = ['f' * 32]
duplicated_user_with_form_22.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = duplicated_user_with_form_22.id
formdata.store()
authentic_fusion_result = {
duplicated_user_no_forms.name_identifiers[0]: [duplicated_user_with_form.name_identifiers[0]],
duplicated_user_with_form_2.name_identifiers[0]: [
duplicated_user_with_form_21.name_identifiers[0],
duplicated_user_with_form_22.name_identifiers[0],
],
}
with open('authentic_fusion_result.json', 'w') as f:
f.write(json.dumps(authentic_fusion_result))
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
call_command('runscript', 'tests/wcs_fusion.py')
# no changes in db
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
log_messages = caplog.messages
assert log_messages == [
'=== Starting form reattachment at 2022-04-19T14:00:00 ===',
'Attaching form 1-2 from user 3 to user 2 (Duplicated user no forms)',
'Attaching form 1-4 from user 5 to user 4 (Duplicated user with forms 2)',
'Attaching form 1-5 from user 6 to user 4 (Duplicated user with forms 2)',
'=== Did nothing ===',
]
caplog.clear()
call_command('runscript', 'tests/wcs_fusion.py', '--proceed=true')
assert log_messages[:-1] == caplog.messages[:-1]
assert caplog.messages[-1] == '=== Success ==='
assert sql.AnyFormData.count([Equal('user_id', str(normal_user.id))]) == 1
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 1
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form.id))]) == 0
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_2.id))]) == 3
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_21.id))]) == 0
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_22.id))]) == 0

View File

@ -0,0 +1,74 @@
import collections
import json
import logging
import sys
from django.utils import timezone
from quixote import get_publisher
from wcs import sql
from wcs.sql_criterias import Equal
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('wcs_fusion.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('=== Starting form reattachment at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
with open('authentic_fusion_result.json') as f:
disabled_users_map = json.loads(f.read())
def get_user_by_uuid(uuid):
users = get_publisher().user_class.get_users_with_name_identifier(uuid)
if len(users) == 0:
raise ValueError('no user found for uuid %s' % uuid)
if len(users) > 1:
raise ValueError('multiple users found for uuid %s' % uuid)
return users[0]
disabled_users_by_user = collections.defaultdict(list)
for user_uuid, disabled_user_uuids in disabled_users_map.items():
user = get_user_by_uuid(user_uuid)
for disabled_user_uuid in disabled_user_uuids:
disabled_users_by_user[user].append(get_user_by_uuid(disabled_user_uuid))
forms_by_user = collections.defaultdict(list)
for user, disabled_users in disabled_users_by_user.items():
for disabled_user in disabled_users:
forms_by_user[user].extend(sql.AnyFormData.select([Equal('user_id', str(disabled_user.id))]))
def attach_forms_to_user(forms_by_user):
for user, forms in forms_by_user.items():
for form in forms:
logger.info(
'Attaching form %s from user %s to user %s (%s)', form.id_display, form.user_id, user.id, user
)
form.user_id = user.id
form.store()
try:
with sql.atomic():
attach_forms_to_user(forms_by_user)
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
raise ValueError
logger.info('=== Success ===')
except ValueError:
logger.info('=== Did nothing ===')