applification: ordre des catégories après import (#86624) #1120

Merged
lguerin merged 2 commits from wip/86624-applification-category-order into main 2024-02-13 16:19:37 +01:00
7 changed files with 347 additions and 30 deletions

View File

@ -7,6 +7,7 @@ import xml.etree.ElementTree as ET
import pytest
from wcs.api_export_import import klass_to_slug
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
@ -940,6 +941,195 @@ def test_export_import_bundle_import(pub):
assert formdef.workflow_roles == {'_receiver': extra_role.id}
@pytest.mark.parametrize(
'category_class',
[
Category,
CardDefCategory,
BlockCategory,
WorkflowCategory,
MailTemplateCategory,
CommentTemplateCategory,
DataSourceCategory,
],
)
def test_export_import_bundle_import_categories_ordering(pub, category_class):
category_class.wipe()
category = category_class(name='cat 1')
category.position = 1
category.store()
category = category_class(name='cat 2')
category.position = 2
category.store()
category = category_class(name='cat 3')
category.position = 3
category.store()
bundle = create_bundle(
[
{'type': klass_to_slug[category_class], 'slug': 'cat-1', 'name': 'cat 1'},
{'type': klass_to_slug[category_class], 'slug': 'cat-2', 'name': 'cat 2'},
{'type': klass_to_slug[category_class], 'slug': 'cat-3', 'name': 'cat 3'},
],
('%s/cat-1' % klass_to_slug[category_class], category_class.get(1)),
('%s/cat-2' % klass_to_slug[category_class], category_class.get(2)),
('%s/cat-3' % klass_to_slug[category_class], category_class.get(3)),
)
# delete categories
category_class.wipe()
# and recreate only cat 4 and 5 in first positions
category = category_class(name='cat 4')
category.position = 1
category.store()
category = category_class(name='cat 5')
category.position = 2
category.store()
# import bundle
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed at the end
assert category_class.get_by_slug('cat-4').position == 1
assert category_class.get_by_slug('cat-5').position == 2
assert category_class.get_by_slug('cat-1').position == 3
assert category_class.get_by_slug('cat-2').position == 4
assert category_class.get_by_slug('cat-3').position == 5
# delete categories
category_class.wipe()
# recreate only cat 2, cat 4, cat 5 in this order
category = category_class(name='cat 2')
category.position = 1
category.store()
category = category_class(name='cat 4')
category.position = 2
category.store()
category = category_class(name='cat 5')
category.position = 3
category.store()
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed after cat 4
assert category_class.get_by_slug('cat-1').position == 1
assert category_class.get_by_slug('cat-2').position == 2
assert category_class.get_by_slug('cat-3').position == 3
assert category_class.get_by_slug('cat-4').position == 4
assert category_class.get_by_slug('cat-5').position == 5
# delete categories
category_class.wipe()
# recreate only cat 4, cat 2, cat 5 in this order
category = category_class(name='cat 4')
category.position = 1
category.store()
category = category_class(name='cat 2')
category.position = 2
category.store()
category = category_class(name='cat 5')
category.position = 3
category.store()
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed after cat 4
assert category_class.get_by_slug('cat-4').position == 1
assert category_class.get_by_slug('cat-1').position == 2
assert category_class.get_by_slug('cat-2').position == 3
assert category_class.get_by_slug('cat-3').position == 4
assert category_class.get_by_slug('cat-5').position == 5
# delete categories
category_class.wipe()
# recreate only cat 4, cat 5, cat 2 in this order
category = category_class(name='cat 4')
category.position = 1
category.store()
category = category_class(name='cat 5')
category.position = 2
category.store()
category = category_class(name='cat 2')
category.position = 3
category.store()
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed after cat 4
assert category_class.get_by_slug('cat-4').position == 1
assert category_class.get_by_slug('cat-5').position == 2
assert category_class.get_by_slug('cat-1').position == 3
assert category_class.get_by_slug('cat-2').position == 4
assert category_class.get_by_slug('cat-3').position == 5
# delete categories
category_class.wipe()
# recreate only cat 4, cat 2, cat1 cat 5 in this order but with weird positions
category = category_class(name='cat 4')
category.position = 4
category.store()
category = category_class(name='cat 2')
category.position = 12
category.store()
category = category_class(name='cat 1')
category.position = 13
category.store()
category = category_class(name='cat 5')
category.position = 20
category.store()
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed after cat 4
assert category_class.get_by_slug('cat-4').position == 1
assert category_class.get_by_slug('cat-1').position == 2
assert category_class.get_by_slug('cat-2').position == 3
assert category_class.get_by_slug('cat-3').position == 4
assert category_class.get_by_slug('cat-5').position == 5
# delete categories
category_class.wipe()
# recreate only cat 4, cat 2, cat1 cat 5 in this order but with weird positions
category = category_class(name='cat 4')
category.position = 1
category.store()
category = category_class(name='cat 2')
category.position = 2
category.store()
category = category_class(name='cat 1')
category.position = 2
category.store()
category = category_class(name='cat 5')
category.position = 2
category.store()
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), bundle)
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'completed'
# cat 1, 2, 3 are placed after cat 4
assert category_class.get_by_slug('cat-4').position == 1
assert category_class.get_by_slug('cat-1').position == 2
assert category_class.get_by_slug('cat-2').position == 3
assert category_class.get_by_slug('cat-3').position == 4
assert category_class.get_by_slug('cat-5').position == 5
def test_export_import_formdef_do_not_overwrite_table_name(pub):
formdef = FormDef()
formdef.name = 'Test2'

View File

@ -7,10 +7,28 @@ import pytest
from django.utils.encoding import force_bytes
from quixote import cleanup
from wcs.categories import CardDefCategory, Category
from wcs.categories import (
BlockCategory,
CardDefCategory,
Category,
CommentTemplateCategory,
DataSourceCategory,
MailTemplateCategory,
WorkflowCategory,
)
from .utilities import clean_temporary_pub, create_temporary_pub
category_classes = [
Category,
CardDefCategory,
BlockCategory,
WorkflowCategory,
MailTemplateCategory,
CommentTemplateCategory,
DataSourceCategory,
]
def setup_module(module):
cleanup()
@ -24,7 +42,7 @@ def teardown_module(module):
clean_temporary_pub()
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_store(category_class):
category_class.wipe()
test = category_class()
@ -37,7 +55,7 @@ def test_store(category_class):
assert test.description == test2.description
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_urlname(category_class):
category_class.wipe()
test = category_class()
@ -48,7 +66,7 @@ def test_urlname(category_class):
assert test.url_name == 'test'
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_duplicate_urlname(category_class):
category_class.wipe()
test = category_class()
@ -64,7 +82,7 @@ def test_duplicate_urlname(category_class):
assert test2.url_name == 'test-2'
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_name_giving_a_forbidden_slug(category_class):
category_class.wipe()
test = category_class()
@ -74,7 +92,7 @@ def test_name_giving_a_forbidden_slug(category_class):
assert test.url_name == 'cat-api'
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_sort_positions(category_class):
category_class.wipe()
@ -94,7 +112,7 @@ def test_sort_positions(category_class):
assert categories[-1].name in ('Test 8', 'Test 9')
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_xml_export(category_class):
category_class.wipe()
test = category_class()
@ -108,7 +126,7 @@ def test_xml_export(category_class):
assert b' id="1"' not in test.export_to_xml_string(include_id=False)
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_xml_import(category_class):
category_class.wipe()
test = category_class()
@ -190,7 +208,7 @@ def test_load_old_python2_pickle():
assert test2.description == 'Hello world'
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_get_by_urlname(category_class):
category_class.wipe()
test = category_class()
@ -202,7 +220,7 @@ def test_get_by_urlname(category_class):
assert test.id == test2.id
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_has_urlname(category_class):
category_class.wipe()
test = category_class()
@ -215,7 +233,7 @@ def test_has_urlname(category_class):
assert not category_class.has_urlname('foobar')
@pytest.mark.parametrize('category_class', [Category, CardDefCategory])
@pytest.mark.parametrize('category_class', category_classes)
def test_remove_self(category_class):
category_class.wipe()
test = category_class()

View File

@ -1435,25 +1435,49 @@ def test_category_snapshot_browse(pub):
Category.wipe()
category = Category(name='test')
category.position = 42
category.store()
assert pub.snapshot_class.count() == 1
# check calling .store() without changes doesn't create snapshots
category.store()
assert pub.snapshot_class.count() == 1
category.name = 'foobar'
category.store()
assert pub.snapshot_class.count() == 2
app = login(get_app(pub))
resp = app.get('/backoffice/forms/categories/%s/' % category.id)
resp = resp.click('History')
snapshot = pub.snapshot_class.select_object_history(category)[0]
snapshot = pub.snapshot_class.select_object_history(category)[1]
snapshot = snapshot.get_latest(
snapshot.object_type, snapshot.object_id, complete=True, max_timestamp=snapshot.timestamp
)
assert snapshot.patch is None
assert 'position' not in snapshot.serialization
resp = resp.click(href='%s/view/' % snapshot.id)
assert 'This category is readonly' in resp.text
assert 'inspect' not in resp
assert '<p>%s</p>' % localstrftime(snapshot.timestamp) in resp.text
with pytest.raises(IndexError):
resp = resp.click('Edit')
resp.click('Edit')
resp = app.get('/backoffice/forms/categories/%s/' % category.id)
resp = resp.click('History')
resp = resp.click(href='%s/restore' % snapshot.id)
assert resp.form['action'].value == 'as-new'
resp = resp.form.submit('submit')
assert Category.count() == 2
new_category = Category.get(resp.location.split('/')[-2])
assert new_category.position == 43
resp = app.get('/backoffice/forms/categories/%s/history/%s/view/' % (category.id, snapshot.id))
assert 'inspect' not in resp
resp = app.get('/backoffice/forms/categories/%s/' % category.id)
resp = resp.click('History')
resp = resp.click(href='%s/restore' % snapshot.id)
resp.form['action'].value = 'overwrite'
resp = resp.form.submit('submit')
assert Category.count() == 2
category.refresh_from_storage()
assert category.position == 42
def test_snapshots_test_results(pub):

View File

@ -416,7 +416,7 @@ class CategoriesDirectory(Directory):
new_order = [o for o in new_order if o in categories_by_id]
for i, o in enumerate(new_order):
categories_by_id[o].position = i + 1
categories_by_id[o].store()
categories_by_id[o].store(store_snapshot=False)
return 'ok'
def new(self):

View File

@ -70,6 +70,16 @@ klasses = {
klass_to_slug = {y: x for x, y in klasses.items()}
category_classes = [
Category,
CardDefCategory,
BlockCategory,
WorkflowCategory,
MailTemplateCategory,
CommentTemplateCategory,
DataSourceCategory,
]
def signature_required(func):
def f(*args, **kwargs):
@ -403,16 +413,16 @@ class BundleImportJob(AfterJob):
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == type])
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == type])
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# again, to remove [pre-install] in dependencies labels
for type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == type], finalize=True)
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type], finalize=True)
# remove obsolete application elements
self.unlink_obsolete_objects()
@ -447,12 +457,26 @@ class BundleImportJob(AfterJob):
get_response().process_after_jobs()
def install(self, elements, finalize=False):
if not elements:
return
element_klass = klasses[elements[0]['type']]
if not finalize and element_klass in category_classes:
# for categories, keep positions before install
objects_by_slug = {i.slug: i for i in element_klass.select()}
initial_positions = {i.slug: i.position for i in objects_by_slug.values()}
imported_positions = {}
for element in elements:
element_klass = klasses[element['type']]
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
new_object = element_klass.import_from_xml_tree(
ET.fromstring(element_content), include_id=False, check_datasources=False
)
if not finalize and element_klass in category_classes:
# for categories, keep positions of imported objects

s/importer/imported/

s/importer/imported/
imported_positions[new_object.slug] = new_object.position
try:
existing_object = element_klass.get_by_slug(new_object.slug)
if existing_object is None:
@ -500,6 +524,41 @@ class BundleImportJob(AfterJob):
self.link_object(new_object)
self.increment_count()
# for categories, rebuild positions
if not finalize and element_klass in category_classes:
objects_by_slug = {i.slug: i for i in element_klass.select()}
# find imported objects from initials
existing_positions = {k: v for k, v in initial_positions.items() if k in imported_positions}
# find not imported objects from initials
not_imported_positions = {
k: v for k, v in initial_positions.items() if k not in imported_positions
}
# determine position of application objects
application_position = None
if existing_positions:
application_position = min(existing_positions.values())
# all objects placed before application objects
before_positions = {
k: v
for k, v in not_imported_positions.items()
if application_position is None or v < application_position
}
# all objects placed after application objects
after_positions = {
k: v
for k, v in not_imported_positions.items()
if application_position is not None and v >= application_position
}
# rebuild positions
position = 1
slugs = sorted(before_positions.keys(), key=lambda a: before_positions[a])
slugs += sorted(imported_positions.keys(), key=lambda a: imported_positions[a])
slugs += sorted(after_positions.keys(), key=lambda a: after_positions[a])
for slug in slugs:
objects_by_slug[slug].position = position
objects_by_slug[slug].store(store_snapshot=False)
position += 1
def link_object(self, obj):
element = ApplicationElement.update_or_create_for_object(self.application, obj)
self.application_elements.add((element.object_type, element.object_id))

View File

@ -90,7 +90,9 @@ class Category(XmlStorableObject):
def get_admin_url(self):
return '%s/%s%s/' % (get_publisher().get_backoffice_url(), self.backoffice_base_url, self.id)
def store(self, *args, comment=None, snapshot_store_user=True, application=None, **kwargs):
def store(
self, *args, comment=None, snapshot_store_user=True, application=None, store_snapshot=True, **kwargs
):
if not self.url_name:
existing_slugs = {
x.url_name: True for x in self.select(ignore_migration=True, ignore_errors=True)
@ -104,7 +106,7 @@ class Category(XmlStorableObject):
self.url_name = '%s-%s' % (base_slug, i)
i += 1
super().store(*args, **kwargs)
if get_publisher().snapshot_class:
if get_publisher().snapshot_class and store_snapshot:
get_publisher().snapshot_class.snap(
instance=self, comment=comment, store_user=snapshot_store_user, application=application
)

View File

@ -139,6 +139,16 @@ class Snapshot:
_instance = None
_user = None
_category_types = [
'block_category',
'card_category',
'data_source_category',
'category',
'mail_template_category',
'comment_template_category',
'workflow_category',
]
@classmethod
def snap(cls, instance, comment=None, label=None, store_user=True, application=None):
obj = cls()
@ -147,7 +157,13 @@ class Snapshot:
obj.timestamp = now()
if get_session() and store_user:
obj.user_id = get_session().user
tree = instance.export_to_xml(include_id=True)
# remove position for categories
if obj.object_type in cls._category_types:
for position in tree.findall('position'):
tree.remove(position)
obj.serialization = ET.tostring(tree).decode('utf-8')
obj.comment = str(comment) if comment else None
obj.label = label
@ -321,15 +337,23 @@ class Snapshot:
instance = self.instance
if as_new:
for attr in ('id', 'url_name', 'internal_identifier', 'slug'):
setattr(instance, attr, None)
try:
setattr(instance, attr, None)
except AttributeError:
# attribute can be a property without setter
pass
if self.object_type in self._category_types:
# set position
instance.position = max(i.position or 0 for i in self.get_object_class().select()) + 1
if hasattr(instance, 'disabled'):
instance.disabled = True
else:
# keep table and max field id from current object
# keep table and position from current object
current_object = self.get_object_class().get(instance.id)
for attr in ('table_name',):
if hasattr(current_object, attr):
setattr(instance, attr, getattr(current_object, attr))
for attr in ('table_name', 'position'):
if attr != 'position' or self.object_type in self._category_types:
if hasattr(current_object, attr):
setattr(instance, attr, getattr(current_object, attr))
delattr(instance, 'readonly')
delattr(instance, 'snapshot_object')