122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2020 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from django.utils.timezone import now
|
|
|
|
from quixote import get_publisher, get_session
|
|
|
|
from wcs.qommon import _, misc
|
|
from wcs.qommon.storage import Null
|
|
|
|
|
|
class UnknownUser:
|
|
def __str__(self):
|
|
return _('unknown user')
|
|
|
|
|
|
class Snapshot:
|
|
id = None
|
|
object_type = None # (formdef, carddef, blockdef, workflow, data_source, etc.)
|
|
object_id = None
|
|
timestamp = None
|
|
user_id = None
|
|
comment = None
|
|
serialization = None
|
|
label = None # (named snapshot)
|
|
|
|
# cache
|
|
_instance = None
|
|
_user = None
|
|
|
|
WCS_MAX_LEN = 1000000
|
|
|
|
@classmethod
|
|
def snap(cls, instance, comment=None, label=None):
|
|
obj = cls()
|
|
obj.object_type = instance.xml_root_node
|
|
obj.object_id = instance.id
|
|
obj.timestamp = now()
|
|
if get_session():
|
|
obj.user_id = get_session().user
|
|
obj.serialization = ET.tostring(instance.export_to_xml(include_id=True)).decode('utf-8')
|
|
obj.comment = comment
|
|
obj.label = label
|
|
latest = cls.get_latest(obj.object_type, obj.object_id)
|
|
if label is not None or latest is None or obj.serialization != latest.serialization:
|
|
# save snapshot if there are changes or an explicit label was
|
|
# given.
|
|
if label is None and len(obj.serialization) > cls.WCS_MAX_LEN:
|
|
# keep only latest snapshot for big objects
|
|
# (typically workflows with embedded documents)
|
|
for old_snapshot in cls.select_object_history(instance, clause=[Null('label')]):
|
|
cls.remove_object(old_snapshot.id)
|
|
obj.store()
|
|
|
|
def get_object_class(self):
|
|
from wcs.blocks import BlockDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.data_sources import NamedDataSource
|
|
from wcs.formdef import FormDef
|
|
from wcs.workflows import Workflow
|
|
from wcs.wscalls import NamedWsCall
|
|
for klass in (BlockDef, CardDef, NamedDataSource, FormDef, Workflow, NamedWsCall):
|
|
if klass.xml_root_node == self.object_type:
|
|
return klass
|
|
raise KeyError('no class for object type: %s' % self.object_type)
|
|
|
|
@property
|
|
def instance(self):
|
|
if self._instance is None:
|
|
tree = ET.fromstring(self.serialization)
|
|
self._instance = self.get_object_class().import_from_xml_tree(
|
|
tree,
|
|
include_id=True,
|
|
snapshot=True)
|
|
self._instance.readonly = True
|
|
return self._instance
|
|
|
|
@property
|
|
def user(self):
|
|
if not self.user_id:
|
|
return None
|
|
if self._user is None:
|
|
try:
|
|
self._user = get_publisher().user_class.get(self.user_id)
|
|
except KeyError:
|
|
self._user = UnknownUser()
|
|
return self._user
|
|
|
|
def restore(self, as_new=True):
|
|
instance = self.instance
|
|
if as_new:
|
|
for attr in ('id', 'url_name', 'internal_identifier', 'slug'):
|
|
setattr(instance, attr, None)
|
|
if hasattr(instance, 'disabled'):
|
|
instance.disabled = True
|
|
else:
|
|
# keep table and max field id from current object
|
|
current_object = self.get_object_class().get(instance.id)
|
|
for attr in ('max_field_id', 'table_name'):
|
|
if hasattr(current_object, attr):
|
|
setattr(instance, attr, getattr(current_object, attr))
|
|
|
|
delattr(instance, 'readonly')
|
|
instance.store(comment=_('Restored snapshot %(id)s (%(timestamp)s)') % {
|
|
'id': self.id,
|
|
'timestamp': misc.localstrftime(self.timestamp)})
|