snapshots: keep only latest version for big objects (#48255)
This commit is contained in:
parent
6899480328
commit
bd7387f36b
|
@ -486,3 +486,85 @@ def test_snaphost_workflow_status_item_comments(pub):
|
|||
'Change in action "Webservice" in status "baz"',
|
||||
'New action "Webservice" in status "baz"',
|
||||
'']
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def size_limit(pub):
|
||||
pub.snapshot_class.WCS_MAX_LEN = 100
|
||||
yield
|
||||
pub.snapshot_class.WCS_MAX_LEN = 1000000
|
||||
|
||||
|
||||
def test_workflow_snapshot_max_len(pub, size_limit):
|
||||
formdef = FormDef()
|
||||
formdef.name = 'testform'
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
|
||||
Workflow.wipe()
|
||||
workflow = Workflow(name='test')
|
||||
workflow.store()
|
||||
|
||||
another_workflow = Workflow(name='other test')
|
||||
another_workflow.store() # same object_type - check that other instances snapshots are not deleted
|
||||
|
||||
assert formdef.id == workflow.id # same id - check other object_type snapshots are not deleted
|
||||
|
||||
# first one: saved
|
||||
assert pub.snapshot_class.count() == 3
|
||||
first_id = pub.snapshot_class.select()[0].id
|
||||
assert pub.snapshot_class.get(first_id).object_type == 'formdef'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_id == '1'
|
||||
old_timestamp = pub.snapshot_class.get(first_id + 1).timestamp
|
||||
assert pub.snapshot_class.get(first_id + 2).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_id == '2'
|
||||
|
||||
# save snapshot
|
||||
pub.snapshot_class.snap(instance=workflow, label="snapshot !")
|
||||
assert pub.snapshot_class.count() == 4
|
||||
assert pub.snapshot_class.get(first_id).object_type == 'formdef'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 1).label is None
|
||||
assert pub.snapshot_class.get(first_id + 1).timestamp == old_timestamp
|
||||
assert pub.snapshot_class.get(first_id + 1).instance.name == 'test'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_id == '2'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 3).label == "snapshot !"
|
||||
assert pub.snapshot_class.get(first_id + 3).instance.name == 'test'
|
||||
|
||||
# no changes
|
||||
workflow.store()
|
||||
assert pub.snapshot_class.count() == 4
|
||||
assert pub.snapshot_class.get(first_id).object_type == 'formdef'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 1).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 1).label is None
|
||||
assert pub.snapshot_class.get(first_id + 1).timestamp == old_timestamp
|
||||
assert pub.snapshot_class.get(first_id + 1).instance.name == 'test'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_id == '2'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 3).label == "snapshot !"
|
||||
assert pub.snapshot_class.get(first_id + 3).instance.name == 'test'
|
||||
|
||||
# with changes
|
||||
workflow.name = 'foo bar'
|
||||
workflow.store()
|
||||
assert pub.snapshot_class.count() == 4
|
||||
assert pub.snapshot_class.get(first_id).object_type == 'formdef'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 2).object_id == '2'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 3).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 3).label == "snapshot !"
|
||||
assert pub.snapshot_class.get(first_id + 3).instance.name == 'test'
|
||||
assert pub.snapshot_class.get(first_id + 4).object_type == 'workflow'
|
||||
assert pub.snapshot_class.get(first_id + 4).object_id == '1'
|
||||
assert pub.snapshot_class.get(first_id + 4).label is None
|
||||
assert pub.snapshot_class.get(first_id + 4).timestamp > old_timestamp
|
||||
assert pub.snapshot_class.get(first_id + 4).instance.name == 'foo bar'
|
||||
|
|
|
@ -21,6 +21,7 @@ from django.utils.timezone import now
|
|||
from quixote import get_publisher, get_session
|
||||
|
||||
from wcs.qommon import _, misc
|
||||
from wcs.qommon.storage import Null
|
||||
|
||||
|
||||
class UnknownUser:
|
||||
|
@ -42,6 +43,8 @@ class Snapshot:
|
|||
_instance = None
|
||||
_user = None
|
||||
|
||||
WCS_MAX_LEN = 1000000
|
||||
|
||||
@classmethod
|
||||
def snap(cls, instance, comment=None, label=None):
|
||||
obj = cls()
|
||||
|
@ -54,13 +57,14 @@ class Snapshot:
|
|||
obj.comment = comment
|
||||
obj.label = label
|
||||
latest = cls.get_latest(obj.object_type, obj.object_id)
|
||||
if label is None and len(obj.serialization) > 1000000:
|
||||
# save disk space by not automatically saving big objects
|
||||
# (typically workflows with embedded documents)
|
||||
return
|
||||
if label is not None or latest is None or obj.serialization != latest.serialization:
|
||||
# save snapshot if there are changes or an explicit label was
|
||||
# given.
|
||||
if label is None and len(obj.serialization) > cls.WCS_MAX_LEN:
|
||||
# keep only latest snapshot for big objects
|
||||
# (typically workflows with embedded documents)
|
||||
for old_snapshot in cls.select_object_history(instance, clause=[Null('label')]):
|
||||
cls.remove_object(old_snapshot.id)
|
||||
obj.store()
|
||||
|
||||
def get_object_class(self):
|
||||
|
|
Loading…
Reference in New Issue