Compare commits
4 Commits
main
...
doublons-c
Author | SHA1 | Date |
---|---|---|
Valentin Deniaud | f3d96794ea | |
Valentin Deniaud | 0ce6428521 | |
Valentin Deniaud | c7fa21b6c1 | |
Valentin Deniaud | bde1d770ec |
|
@ -0,0 +1,36 @@
|
|||
# See https://pre-commit.com for more information
|
||||
# See https://pre-commit.com/hooks.html for more hooks
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.4.0
|
||||
hooks:
|
||||
- id: double-quote-string-fixer
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 23.1.0
|
||||
hooks:
|
||||
- id: black
|
||||
args: ['--target-version', 'py39', '--skip-string-normalization', '--line-length', '110']
|
||||
- repo: https://github.com/PyCQA/isort
|
||||
rev: 5.12.0
|
||||
hooks:
|
||||
- id: isort
|
||||
args: ['--profile', 'black', '--line-length', '110']
|
||||
- repo: https://github.com/asottile/pyupgrade
|
||||
rev: v3.3.1
|
||||
hooks:
|
||||
- id: pyupgrade
|
||||
args: ['--keep-percent-format', '--py39-plus']
|
||||
- repo: https://github.com/adamchainz/django-upgrade
|
||||
rev: 1.13.0
|
||||
hooks:
|
||||
- id: django-upgrade
|
||||
args: ['--target-version', '3.2']
|
||||
- repo: https://github.com/rtts/djhtml
|
||||
rev: '3.0.5'
|
||||
hooks:
|
||||
- id: djhtml
|
||||
args: ['--tabwidth', '2']
|
||||
- repo: https://git.entrouvert.org/pre-commit-debian.git
|
||||
rev: v0.3
|
||||
hooks:
|
||||
- id: pre-commit-debian
|
|
@ -1,9 +1,8 @@
|
|||
import logging
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from authentic2.a2_rbac.models import Role
|
||||
from authentic2.apps.authenticators.models import BaseAuthenticator
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
from django.db import connection
|
||||
|
||||
from chrono.agendas.models import Event
|
||||
from django.db import connection
|
||||
from django.db.models import Count
|
||||
|
||||
duplicated_events = (
|
||||
Event.objects.values("primary_event", "start_datetime")
|
||||
.annotate(count=Count("id"))
|
||||
.values("pk")
|
||||
Event.objects.values('primary_event', 'start_datetime')
|
||||
.annotate(count=Count('id'))
|
||||
.values('pk')
|
||||
.order_by()
|
||||
.filter(count__gt=1)
|
||||
)
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
from django.db import connection
|
||||
|
||||
from chrono.agendas.models import TimePeriod
|
||||
|
||||
from django.db import connection
|
||||
|
||||
periods = TimePeriod.objects.filter(agenda__isnull=False, weekday_indexes__isnull=False)
|
||||
if periods.exists():
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from django.conf import settings
|
||||
from combo.apps.dataviz.models import ChartNgCell
|
||||
from django.conf import settings
|
||||
|
||||
base_url = settings.SITE_BASE_URL
|
||||
qs = ChartNgCell.objects.filter(
|
||||
|
@ -12,6 +12,9 @@ for cell in qs:
|
|||
continue
|
||||
transpose = bool(chart.axis_count == 2)
|
||||
if transpose:
|
||||
print('Switch to "table inverted" on cell %s from page %s' % (cell.pk, settings.SITE_BASE_URL + cell.page.get_online_url()))
|
||||
print(
|
||||
'Switch to "table inverted" on cell %s from page %s'
|
||||
% (cell.pk, settings.SITE_BASE_URL + cell.page.get_online_url())
|
||||
)
|
||||
cell.chart_type = 'table-inverted'
|
||||
cell.save()
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
import collections
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.db import connection, transaction
|
||||
from django.utils import timezone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
fh = logging.FileHandler('authentic_fusion.log')
|
||||
fh.setLevel(logging.DEBUG)
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.DEBUG)
|
||||
|
||||
logger.addHandler(fh)
|
||||
logger.addHandler(ch)
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
domain_url = ''
|
||||
if hasattr(connection, 'tenant') and hasattr(connection.tenant, 'domain_url'):
|
||||
domain_url = 'https://%s' % connection.tenant.domain_url
|
||||
|
||||
|
||||
logger.info('=== Starting fusion at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
|
||||
|
||||
|
||||
agent_users = (
|
||||
User.objects.filter(email__icontains='@cd-essonne.fr', saml_identifiers__isnull=False)
|
||||
.distinct()
|
||||
.order_by('-last_login', 'first_name')
|
||||
)
|
||||
|
||||
agent_users_by_email = collections.defaultdict(list)
|
||||
for user in agent_users:
|
||||
agent_users_by_email[user.email.lower()].append(user)
|
||||
|
||||
users_to_keep = []
|
||||
for email, users in agent_users_by_email.items():
|
||||
if len(users) == 1:
|
||||
continue
|
||||
|
||||
roles = {}
|
||||
for user in users:
|
||||
for role in user.roles.all():
|
||||
roles[role.id] = role
|
||||
|
||||
user_to_keep, users_to_disable = users[0], users[1:]
|
||||
|
||||
user_to_keep._roles_to_add = roles
|
||||
user_to_keep._duplicated_users = users_to_disable
|
||||
users_to_keep.append(user_to_keep)
|
||||
|
||||
|
||||
def get_user_detail(user):
|
||||
return f'{user.get_full_name()} {user.email} {user.uuid} {domain_url}{user.get_absolute_url()}'
|
||||
|
||||
|
||||
def do_fusion(users):
|
||||
disabled_users_uuid_by_user_uuid = collections.defaultdict(list)
|
||||
for user in users:
|
||||
logger.info('* Processing user %s', get_user_detail(user))
|
||||
|
||||
for role in sorted(user._roles_to_add.values(), key=lambda x: x.name.lower()):
|
||||
logger.info('Adding role %s', role)
|
||||
user.roles.add(role)
|
||||
|
||||
for duplicated_user in user._duplicated_users:
|
||||
logger.info('Disabling duplicate %s', get_user_detail(duplicated_user))
|
||||
disabled_users_uuid_by_user_uuid[user.uuid].append(duplicated_user.uuid)
|
||||
duplicated_user.mark_as_inactive(reason='Désactivation automatique des doublons')
|
||||
|
||||
result = json.dumps(disabled_users_uuid_by_user_uuid)
|
||||
logger.info('Result %s', result)
|
||||
|
||||
with open('authentic_fusion_result.json', 'w') as f:
|
||||
f.write(result)
|
||||
|
||||
|
||||
try:
|
||||
with transaction.atomic():
|
||||
do_fusion(users_to_keep)
|
||||
|
||||
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
|
||||
raise ValueError
|
||||
logger.info('=== Success ===')
|
||||
except ValueError:
|
||||
logger.info('=== Did nothing ===')
|
|
@ -0,0 +1,154 @@
|
|||
import datetime
|
||||
|
||||
import pytest
|
||||
from authentic2.a2_rbac.models import Role
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.timezone import now
|
||||
from mellon.models import Issuer, UserSAMLIdentifier
|
||||
|
||||
from .utils import call_command
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
@pytest.mark.freeze_time('2022-04-19 14:00')
|
||||
def test_authentic_fusion(db, caplog):
|
||||
role1 = Role.objects.create(name='role1')
|
||||
role2 = Role.objects.create(name='role2')
|
||||
role3 = Role.objects.create(name='role3')
|
||||
|
||||
# duplicated users, but not agents
|
||||
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
|
||||
User.objects.create(first_name='Normal', last_name='User', email='normal.user@gmail.com')
|
||||
|
||||
# duplicated users, agents but no saml link
|
||||
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
|
||||
User.objects.create(first_name='Agent', last_name='No SAML', email='agent.no.saml@cd-essonne.fr')
|
||||
|
||||
# agent with saml link, no duplicate
|
||||
issuer = Issuer.objects.create(entity_id='https://idp1.example.com/', slug='idp1')
|
||||
saml_user = User.objects.create(
|
||||
first_name='Agent',
|
||||
last_name='No duplicate',
|
||||
email='agent.no.dup@cd-essonne.fr',
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(user=saml_user, issuer=issuer, name_id='anodup')
|
||||
|
||||
# duplicated users, agents with saml link
|
||||
saml_user_old_connection = User.objects.create(
|
||||
id=42,
|
||||
uuid='uuid:42/agent@cd-essonne.fr',
|
||||
first_name='Agent',
|
||||
last_name='Duplicated',
|
||||
email='agent@cd-essonne.fr',
|
||||
last_login=now() - datetime.timedelta(days=10),
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(user=saml_user_old_connection, issuer=issuer, name_id='aduplicated')
|
||||
saml_user_old_connection.roles.add(role1)
|
||||
saml_user_old_connection.roles.add(role2)
|
||||
|
||||
saml_user_old_connection_no_roles = User.objects.create(
|
||||
id=43,
|
||||
uuid='uuid:43/agent@cd-essonne.fr',
|
||||
first_name='Agent',
|
||||
last_name='Duplicated',
|
||||
email='agent@cd-Essonne.fr',
|
||||
last_login=now() - datetime.timedelta(days=5),
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(
|
||||
user=saml_user_old_connection_no_roles, issuer=issuer, name_id='Aduplicated'
|
||||
)
|
||||
|
||||
saml_user_recent_connection = User.objects.create(
|
||||
id=44,
|
||||
uuid='uuid:44/agent@cd-essonne.fr',
|
||||
first_name='Agent',
|
||||
last_name='Duplicated',
|
||||
email='Agent@cd-essonne.fr',
|
||||
last_login=now() - datetime.timedelta(days=1),
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(user=saml_user_recent_connection, issuer=issuer, name_id='ADUPLICATED')
|
||||
saml_user_old_connection.roles.add(role1)
|
||||
saml_user_old_connection.roles.add(role3)
|
||||
|
||||
# again duplicated users, agents with saml link
|
||||
saml_user_recent_connection_no_roles_2 = User.objects.create(
|
||||
id=45,
|
||||
uuid='uuid:45/agent2@cd-essonne.fr',
|
||||
first_name='Agent',
|
||||
last_name='Duplicated 2',
|
||||
email='agent2@cd-essonne.fr',
|
||||
last_login=now() - datetime.timedelta(days=5),
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(
|
||||
user=saml_user_recent_connection_no_roles_2, issuer=issuer, name_id='Aduplicated2'
|
||||
)
|
||||
|
||||
saml_user_old_connection_2 = User.objects.create(
|
||||
id=46,
|
||||
uuid='uuid:46/agent2@cd-essonne.fr',
|
||||
first_name='Agent',
|
||||
last_name='Duplicated 2',
|
||||
email='agent2@cd-essonne.fr',
|
||||
last_login=now() - datetime.timedelta(days=10),
|
||||
)
|
||||
UserSAMLIdentifier.objects.create(user=saml_user_old_connection_2, issuer=issuer, name_id='aduplicated2')
|
||||
saml_user_old_connection_2.roles.add(role1)
|
||||
saml_user_old_connection_2.roles.add(role2)
|
||||
|
||||
assert User.objects.count() == 10
|
||||
assert User.objects.filter(is_active=True).count() == 10
|
||||
|
||||
call_command('runscript', 'tests/authentic_fusion.py')
|
||||
|
||||
log_messages = caplog.messages
|
||||
assert log_messages == [
|
||||
'=== Starting fusion at 2022-04-19T14:00:00 ===',
|
||||
'* Processing user Agent Duplicated Agent@cd-essonne.fr uuid:44/agent@cd-essonne.fr /manage/users/44/',
|
||||
'Adding role role1',
|
||||
'Adding role role2',
|
||||
'Adding role role3',
|
||||
'Disabling duplicate Agent Duplicated agent@cd-Essonne.fr uuid:43/agent@cd-essonne.fr /manage/users/43/',
|
||||
'Disabling duplicate Agent Duplicated agent@cd-essonne.fr uuid:42/agent@cd-essonne.fr /manage/users/42/',
|
||||
'* Processing user Agent Duplicated 2 agent2@cd-essonne.fr uuid:45/agent2@cd-essonne.fr /manage/users/45/',
|
||||
'Adding role role1',
|
||||
'Adding role role2',
|
||||
'Disabling duplicate Agent Duplicated 2 agent2@cd-essonne.fr uuid:46/agent2@cd-essonne.fr /manage/users/46/',
|
||||
'Result {"uuid:44/agent@cd-essonne.fr": ["uuid:43/agent@cd-essonne.fr", "uuid:42/agent@cd-essonne.fr"], "uuid:45/agent2@cd-essonne.fr": ["uuid:46/agent2@cd-essonne.fr"]}',
|
||||
'=== Did nothing ===',
|
||||
]
|
||||
|
||||
# no changes in db
|
||||
assert User.objects.count() == 10
|
||||
assert User.objects.filter(is_active=True).count() == 10
|
||||
assert saml_user_recent_connection_no_roles_2.roles.count() == 0
|
||||
|
||||
caplog.clear()
|
||||
call_command('runscript', 'tests/authentic_fusion.py', '--proceed=true')
|
||||
|
||||
assert log_messages[:-1] == caplog.messages[:-1]
|
||||
assert caplog.messages[-1] == '=== Success ==='
|
||||
|
||||
assert User.objects.count() == 10
|
||||
assert User.objects.filter(is_active=True).count() == 7
|
||||
|
||||
assert User.objects.filter(email='normal.user@gmail.com', is_active=True).count() == 2
|
||||
assert User.objects.filter(email='agent.no.saml@cd-essonne.fr', is_active=True).count() == 2
|
||||
assert User.objects.filter(email='agent.no.dup@cd-essonne.fr', is_active=True).count() == 1
|
||||
|
||||
saml_user_old_connection.refresh_from_db()
|
||||
assert saml_user_old_connection.is_active is False
|
||||
|
||||
saml_user_old_connection_no_roles.refresh_from_db()
|
||||
assert saml_user_old_connection_no_roles.is_active is False
|
||||
|
||||
saml_user_recent_connection.refresh_from_db()
|
||||
assert saml_user_recent_connection.is_active is True
|
||||
assert set(saml_user_recent_connection.roles.all()) == {role1, role2, role3}
|
||||
|
||||
saml_user_old_connection_2.refresh_from_db()
|
||||
assert saml_user_old_connection_2.is_active is False
|
||||
|
||||
saml_user_recent_connection_no_roles_2.refresh_from_db()
|
||||
assert saml_user_recent_connection_no_roles_2.is_active is True
|
||||
assert set(saml_user_recent_connection_no_roles_2.roles.all()) == {role1, role2}
|
|
@ -0,0 +1,134 @@
|
|||
import json
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
from wcs import sql
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.qommon.storage import Equal
|
||||
|
||||
from .utilities import clean_temporary_pub, create_temporary_pub
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pub():
|
||||
pub = create_temporary_pub()
|
||||
|
||||
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
||||
pub.set_app_dir(req)
|
||||
pub.write_cfg()
|
||||
return pub
|
||||
|
||||
|
||||
def teardown_module(module):
|
||||
clean_temporary_pub()
|
||||
|
||||
|
||||
@pytest.mark.freeze_time('2022-04-19 14:00')
|
||||
def test_fusion(pub, caplog):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.store()
|
||||
|
||||
# normal user
|
||||
normal_user = pub.user_class()
|
||||
normal_user.name = 'Normal user'
|
||||
normal_user.email = 'normal-user@gmail.com'
|
||||
normal_user.name_identifiers = ['a' * 32]
|
||||
normal_user.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.user_id = normal_user.id
|
||||
formdata.store()
|
||||
|
||||
# duplicated user
|
||||
duplicated_user_no_forms = pub.user_class()
|
||||
duplicated_user_no_forms.name = 'Duplicated user no forms'
|
||||
duplicated_user_no_forms.email = 'duplicated-user@cd-essonne.fr'
|
||||
duplicated_user_no_forms.name_identifiers = ['b' * 32]
|
||||
duplicated_user_no_forms.store()
|
||||
|
||||
duplicated_user_with_form = pub.user_class()
|
||||
duplicated_user_with_form.name = 'Duplicated user with forms'
|
||||
duplicated_user_with_form.email = 'duplicated-user@cd-essonne.fr'
|
||||
duplicated_user_with_form.name_identifiers = ['c' * 32]
|
||||
duplicated_user_with_form.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.user_id = duplicated_user_with_form.id
|
||||
formdata.store()
|
||||
|
||||
# another duplicated user
|
||||
duplicated_user_with_form_2 = pub.user_class()
|
||||
duplicated_user_with_form_2.name = 'Duplicated user with forms 2'
|
||||
duplicated_user_with_form_2.email = 'duplicated-user-2@cd-essonne.fr'
|
||||
duplicated_user_with_form_2.name_identifiers = ['d' * 32]
|
||||
duplicated_user_with_form_2.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.user_id = duplicated_user_with_form_2.id
|
||||
formdata.store()
|
||||
|
||||
duplicated_user_with_form_21 = pub.user_class()
|
||||
duplicated_user_with_form_21.name = 'Duplicated user with forms 2'
|
||||
duplicated_user_with_form_21.email = 'duplicated-user-2@cd-essonne.fr'
|
||||
duplicated_user_with_form_21.name_identifiers = ['e' * 32]
|
||||
duplicated_user_with_form_21.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.user_id = duplicated_user_with_form_21.id
|
||||
formdata.store()
|
||||
|
||||
duplicated_user_with_form_22 = pub.user_class()
|
||||
duplicated_user_with_form_22.name = 'Duplicated user with forms 2'
|
||||
duplicated_user_with_form_22.email = 'duplicated-user-2@cd-essonne.fr'
|
||||
duplicated_user_with_form_22.name_identifiers = ['f' * 32]
|
||||
duplicated_user_with_form_22.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.user_id = duplicated_user_with_form_22.id
|
||||
formdata.store()
|
||||
|
||||
authentic_fusion_result = {
|
||||
duplicated_user_no_forms.name_identifiers[0]: [duplicated_user_with_form.name_identifiers[0]],
|
||||
duplicated_user_with_form_2.name_identifiers[0]: [
|
||||
duplicated_user_with_form_21.name_identifiers[0],
|
||||
duplicated_user_with_form_22.name_identifiers[0],
|
||||
],
|
||||
}
|
||||
with open('authentic_fusion_result.json', 'w') as f:
|
||||
f.write(json.dumps(authentic_fusion_result))
|
||||
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
|
||||
|
||||
call_command('runscript', 'tests/wcs_fusion.py')
|
||||
|
||||
# no changes in db
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 0
|
||||
|
||||
log_messages = caplog.messages
|
||||
assert log_messages == [
|
||||
'=== Starting form reattachment at 2022-04-19T14:00:00 ===',
|
||||
'Attaching form 1-2 from user 3 to user 2 (Duplicated user no forms)',
|
||||
'Attaching form 1-4 from user 5 to user 4 (Duplicated user with forms 2)',
|
||||
'Attaching form 1-5 from user 6 to user 4 (Duplicated user with forms 2)',
|
||||
'=== Did nothing ===',
|
||||
]
|
||||
|
||||
caplog.clear()
|
||||
call_command('runscript', 'tests/wcs_fusion.py', '--proceed=true')
|
||||
|
||||
assert log_messages[:-1] == caplog.messages[:-1]
|
||||
assert caplog.messages[-1] == '=== Success ==='
|
||||
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(normal_user.id))]) == 1
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_no_forms.id))]) == 1
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form.id))]) == 0
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_2.id))]) == 3
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_21.id))]) == 0
|
||||
assert sql.AnyFormData.count([Equal('user_id', str(duplicated_user_with_form_22.id))]) == 0
|
|
@ -0,0 +1,74 @@
|
|||
import collections
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from django.utils import timezone
|
||||
from quixote import get_publisher
|
||||
from wcs import sql
|
||||
from wcs.sql_criterias import Equal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
fh = logging.FileHandler('wcs_fusion.log')
|
||||
fh.setLevel(logging.DEBUG)
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.DEBUG)
|
||||
|
||||
logger.addHandler(fh)
|
||||
logger.addHandler(ch)
|
||||
|
||||
|
||||
logger.info('=== Starting form reattachment at %s ===', timezone.now().strftime('%Y-%m-%dT%H:%M:%S'))
|
||||
|
||||
with open('authentic_fusion_result.json') as f:
|
||||
disabled_users_map = json.loads(f.read())
|
||||
|
||||
|
||||
def get_user_by_uuid(uuid):
|
||||
users = get_publisher().user_class.get_users_with_name_identifier(uuid)
|
||||
|
||||
if len(users) == 0:
|
||||
raise ValueError('no user found for uuid %s' % uuid)
|
||||
|
||||
if len(users) > 1:
|
||||
raise ValueError('multiple users found for uuid %s' % uuid)
|
||||
|
||||
return users[0]
|
||||
|
||||
|
||||
disabled_users_by_user = collections.defaultdict(list)
|
||||
for user_uuid, disabled_user_uuids in disabled_users_map.items():
|
||||
user = get_user_by_uuid(user_uuid)
|
||||
|
||||
for disabled_user_uuid in disabled_user_uuids:
|
||||
disabled_users_by_user[user].append(get_user_by_uuid(disabled_user_uuid))
|
||||
|
||||
forms_by_user = collections.defaultdict(list)
|
||||
for user, disabled_users in disabled_users_by_user.items():
|
||||
for disabled_user in disabled_users:
|
||||
forms_by_user[user].extend(sql.AnyFormData.select([Equal('user_id', str(disabled_user.id))]))
|
||||
|
||||
|
||||
def attach_forms_to_user(forms_by_user):
|
||||
for user, forms in forms_by_user.items():
|
||||
for form in forms:
|
||||
logger.info(
|
||||
'Attaching form %s from user %s to user %s (%s)', form.id_display, form.user_id, user.id, user
|
||||
)
|
||||
form.user_id = user.id
|
||||
form.store()
|
||||
|
||||
|
||||
try:
|
||||
with sql.atomic():
|
||||
attach_forms_to_user(forms_by_user)
|
||||
|
||||
if len(sys.argv) < 2 or sys.argv[1] != '--proceed=true':
|
||||
raise ValueError
|
||||
|
||||
logger.info('=== Success ===')
|
||||
except ValueError:
|
||||
logger.info('=== Did nothing ===')
|
Loading…
Reference in New Issue