wcs/wcs/snapshots.py

176 lines
5.7 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import xml.etree.ElementTree as ET
from django.utils.timezone import now
from quixote import get_publisher, get_session
from wcs.qommon import _, misc
from wcs.qommon.storage import Null
class UnknownUser:
def __str__(self):
return _('unknown user')
class Snapshot:
id = None
object_type = None # (formdef, carddef, blockdef, workflow, data_source, etc.)
object_id = None
timestamp = None
user_id = None
comment = None
serialization = None
label = None # (named snapshot)
# cache
_instance = None
_user = None
WCS_MAX_LEN = 1000000
@classmethod
def snap(cls, instance, comment=None, label=None):
obj = cls()
obj.object_type = instance.xml_root_node
obj.object_id = instance.id
obj.timestamp = now()
if get_session():
obj.user_id = get_session().user
obj.serialization = ET.tostring(instance.export_to_xml(include_id=True)).decode('utf-8')
obj.comment = comment
obj.label = label
latest = cls.get_latest(obj.object_type, obj.object_id)
if label is not None or latest is None or obj.serialization != latest.serialization:
# save snapshot if there are changes or an explicit label was
# given.
if label is None and len(obj.serialization) > cls.WCS_MAX_LEN:
# keep only latest snapshot for big objects
# (typically workflows with embedded documents)
for old_snapshot in cls.select_object_history(instance, clause=[Null('label')]):
cls.remove_object(old_snapshot.id)
obj.store()
def get_object_class(self):
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
for klass in (BlockDef, CardDef, NamedDataSource, FormDef, Workflow, NamedWsCall, MailTemplate):
if klass.xml_root_node == self.object_type:
return klass
raise KeyError('no class for object type: %s' % self.object_type)
@property
def instance(self):
if self._instance is None:
tree = ET.fromstring(self.serialization)
self._instance = self.get_object_class().import_from_xml_tree(
tree,
include_id=True,
snapshot=True,
check_datasources=getattr(self, '_check_datasources', True),
)
self._instance.readonly = True
self._instance.snapshot_object = self
return self._instance
@property
def user(self):
if not self.user_id:
return None
if self._user is None:
try:
self._user = get_publisher().user_class.get(self.user_id)
except KeyError:
self._user = UnknownUser()
return self._user
def load_history(self):
if not self.instance:
self._history = []
return
history = get_publisher().snapshot_class.select_object_history(self.instance)
self._history = [s.id for s in history]
@property
def previous(self):
if not hasattr(self, '_history'):
self.load_history()
try:
idx = self._history.index(self.id)
except ValueError:
return None
if idx == 0:
return None
return self._history[idx - 1]
@property
def next(self):
if not hasattr(self, '_history'):
self.load_history()
try:
idx = self._history.index(self.id)
except ValueError:
return None
try:
return self._history[idx + 1]
except IndexError:
return None
@property
def first(self):
if not hasattr(self, '_history'):
self.load_history()
return self._history[0]
@property
def last(self):
if not hasattr(self, '_history'):
self.load_history()
return self._history[-1]
def restore(self, as_new=True):
instance = self.instance
if as_new:
for attr in ('id', 'url_name', 'internal_identifier', 'slug'):
setattr(instance, attr, None)
if hasattr(instance, 'disabled'):
instance.disabled = True
else:
# keep table and max field id from current object
current_object = self.get_object_class().get(instance.id)
for attr in ('max_field_id', 'table_name'):
if hasattr(current_object, attr):
setattr(instance, attr, getattr(current_object, attr))
delattr(instance, 'readonly')
delattr(instance, 'snapshot_object')
instance.store(
comment=_('Restored snapshot %(id)s (%(timestamp)s)')
% {'id': self.id, 'timestamp': misc.localstrftime(self.timestamp)}
)