From 31531e7133a0141005a993d9bcdc34e3628845a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laur=C3=A9line=20Gu=C3=A9rin?= Date: Mon, 9 Jan 2023 16:18:38 +0100 Subject: [PATCH] data: snapshot migration, to replace slugs by uuids (#67710) --- combo/data/migrations/0064_snapshot_uuids.py | 102 +++++ tests/test_migrations.py | 380 +++++++++++++++++++ 2 files changed, 482 insertions(+) create mode 100644 combo/data/migrations/0064_snapshot_uuids.py diff --git a/combo/data/migrations/0064_snapshot_uuids.py b/combo/data/migrations/0064_snapshot_uuids.py new file mode 100644 index 00000000..e729bded --- /dev/null +++ b/combo/data/migrations/0064_snapshot_uuids.py @@ -0,0 +1,102 @@ +import uuid + +from django.db import migrations + + +def get_page_uuid(page_uuids_by_slugs, slug, default_index=False): + try: + uuid.UUID(slug) + except (ValueError, AttributeError): + pass + else: + # it's a uuid, don't change it + if slug in page_uuids_by_slugs.values(): + return slug + + slug = str(slug).strip('/').rsplit('/', maxsplit=1)[-1] + if not slug and default_index: + slug = 'index' + return page_uuids_by_slugs.get(slug) + + +def forward(apps, schema_editor): + PageSnapshot = apps.get_model('data', 'PageSnapshot') + Page = apps.get_model('data', 'Page') + page_uuids_by_slugs = {page.slug: str(page.uuid) for page in Page.objects.only('uuid', 'slug')} + for snapshot in PageSnapshot.objects.all(): + changed = False + + if (snapshot.serialization.get('fields') or {}).get('parent'): + new_uuid = get_page_uuid( + page_uuids_by_slugs, snapshot.serialization['fields']['parent'][0], default_index=True + ) + if new_uuid: + new_uuid = [new_uuid] + snapshot.serialization['fields']['parent'] = new_uuid + changed = True + + for cell in snapshot.serialization.get('cells') or []: + if cell.get('model') not in [ + 'data.linkcell', + 'data.linklistcell', + 'search.searchcell', + 'wcs.wcscardcell', + ]: + continue + + if cell['model'] == 'data.linkcell': + if not cell['fields'].get('link_page'): + continue + new_uuid = get_page_uuid(page_uuids_by_slugs, cell['fields']['link_page'][0]) + if new_uuid: + new_uuid = [new_uuid] + cell['fields']['link_page'] = new_uuid + changed = True + + elif cell['model'] == 'data.linklistcell': + for link in cell['fields'].get('links') or []: + if link.get('model') != 'data.linkcell': + continue + if not link['fields'].get('link_page'): + continue + new_uuid = get_page_uuid(page_uuids_by_slugs, link['fields']['link_page'][0]) + if new_uuid: + new_uuid = [new_uuid] + link['fields']['link_page'] = new_uuid + changed = True + + elif cell['model'] == 'search.searchcell': + if not cell['fields'].get('_search_services'): + continue + if not cell['fields']['_search_services'].get('options'): + continue + for option in cell['fields']['_search_services']['options'].values(): + if not option: + continue + if not option.get('target_page'): + continue + option['target_page'] = get_page_uuid(page_uuids_by_slugs, option['target_page']) + changed = True + + elif cell['model'] == 'wcs.wcscardcell': + if not cell['fields'].get('custom_schema'): + continue + for custom_cell in cell['fields']['custom_schema'].get('cells') or []: + if not custom_cell.get('page'): + continue + custom_cell['page'] = get_page_uuid(page_uuids_by_slugs, custom_cell['page']) + changed = True + + if changed: + snapshot.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ('data', '0063_old_card_cells'), + ] + + operations = [ + migrations.RunPython(forward, reverse_code=migrations.RunPython.noop), + ] diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 2bad3a1a..061a1544 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -1,3 +1,5 @@ +import uuid + from django.db import connection from django.db.migrations.executor import MigrationExecutor @@ -122,3 +124,381 @@ def test_page_snapshot_with_old_card_cells_migration(transactional_db): 'restricted_to_unlogged': False, }, } + + +def test_page_snapshot_uuids_migration(transactional_db): + migrate_from = [('data', '0063_old_card_cells')] + migrate_to = [('data', '0064_snapshot_uuids')] + + executor = MigrationExecutor(connection) + old_apps = executor.loader.project_state(migrate_from).apps + executor.migrate(migrate_from) + + page_class = old_apps.get_model('data', 'Page') + pagesnapshot_class = old_apps.get_model('data', 'PageSnapshot') + + root = page_class.objects.create(order=0, slug='index') + page = page_class.objects.create(order=1, slug='slug') + old_uuid = uuid.uuid4() + link_cells = [ + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': None, + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': [str(old_uuid)], + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': [str(page.uuid)], # will not change + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': ['unknown'], + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': ['slug'], + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': ['foo/bar/slug'], + }, + }, + ] + snapshot1 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': None, + }, + 'cells': link_cells + + [ + { + 'model': 'data.linklistcell', + 'fields': { + 'links': link_cells, + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': None, + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': str(old_uuid), + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': str(page.uuid), # will not change + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': 'unknown', + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': 'slug', + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': 'foo/bar/slug', + } + } + } + }, + }, + { + 'model': 'wcs.wcscardcell', + 'fields': { + 'custom_schema': { + 'cells': [ + { + 'page': None, + }, + { + 'page': str(old_uuid), + }, + { + 'page': str(page.uuid), # will not change + }, + { + 'page': 'unknown', + }, + { + 'page': 'slug', + }, + { + 'page': 'foo/bar/slug', + }, + ], + }, + }, + }, + ], + } + ) + snapshot2 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': [str(old_uuid)], + } + } + ) + snapshot3 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': [str(page.uuid)], # will not change + } + } + ) + snapshot4 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': ['unknown'], + } + } + ) + snapshot5 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': ['slug'], + } + } + ) + snapshot6 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': ['foo/bar/slug'], + } + } + ) + snapshot7 = pagesnapshot_class.objects.create( + serialization={ + 'fields': { + 'parent': [''], # index ! + } + } + ) + + executor = MigrationExecutor(connection) + executor.migrate(migrate_to) + executor.loader.build_graph() + + apps = executor.loader.project_state(migrate_to).apps + pagesnapshot_class = apps.get_model('data', 'PageSnapshot') + + snapshot1 = pagesnapshot_class.objects.get(pk=snapshot1.pk) + assert snapshot1.serialization['fields']['parent'] is None + new_link_cells = [ + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': None, + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': None, + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': [str(page.uuid)], + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': None, + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': [str(page.uuid)], + }, + }, + { + 'model': 'data.linkcell', + 'fields': { + 'link_page': [str(page.uuid)], + }, + }, + ] + assert snapshot1.serialization['cells'] == new_link_cells + [ + { + 'model': 'data.linklistcell', + 'fields': { + 'links': new_link_cells, + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': None, + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': None, + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': str(page.uuid), + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': None, + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': str(page.uuid), + } + } + } + }, + }, + { + 'model': 'search.searchcell', + 'fields': { + '_search_services': { + 'options': { + 'foobar': { + 'target_page': str(page.uuid), + } + } + } + }, + }, + { + 'model': 'wcs.wcscardcell', + 'fields': { + 'custom_schema': { + 'cells': [ + { + 'page': None, + }, + { + 'page': None, + }, + { + 'page': str(page.uuid), + }, + { + 'page': None, + }, + { + 'page': str(page.uuid), + }, + { + 'page': str(page.uuid), + }, + ], + }, + }, + }, + ] + snapshot2 = pagesnapshot_class.objects.get(pk=snapshot2.pk) + assert snapshot2.serialization['fields']['parent'] is None + snapshot3 = pagesnapshot_class.objects.get(pk=snapshot3.pk) + assert snapshot3.serialization['fields']['parent'] == [str(page.uuid)] + snapshot4 = pagesnapshot_class.objects.get(pk=snapshot4.pk) + assert snapshot4.serialization['fields']['parent'] is None + snapshot5 = pagesnapshot_class.objects.get(pk=snapshot5.pk) + assert snapshot5.serialization['fields']['parent'] == [str(page.uuid)] + snapshot6 = pagesnapshot_class.objects.get(pk=snapshot6.pk) + assert snapshot6.serialization['fields']['parent'] == [str(page.uuid)] + snapshot7 = pagesnapshot_class.objects.get(pk=snapshot7.pk) + assert snapshot7.serialization['fields']['parent'] == [str(root.uuid)]