wcs/wcs/snapshots.py

377 lines
13 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import difflib
import re
import xml.etree.ElementTree as ET
from django.utils.timezone import now
from quixote import get_publisher, get_response, get_session
from wcs.qommon import _, misc
class UnknownUser:
def __str__(self):
return str(_('unknown user'))
def indent(tree, space=' ', level=0):
# backport from Lib/xml/etree/ElementTree.py python 3.9
if isinstance(tree, ET.ElementTree):
tree = tree.getroot()
if level < 0:
raise ValueError(f'Initial indentation level must be >= 0, got {level}')
if len(tree) == 0:
return
# Reduce the memory consumption by reusing indentation strings.
indentations = ['\n' + level * space]
def _indent_children(elem, level):
# Start a new indentation level for the first child.
child_level = level + 1
try:
child_indentation = indentations[child_level]
except IndexError:
child_indentation = indentations[level] + space
indentations.append(child_indentation)
if not elem.text or not elem.text.strip():
elem.text = child_indentation
for child in elem:
if len(child):
_indent_children(child, child_level)
if not child.tail or not child.tail.strip():
child.tail = child_indentation
# Dedent after the last child by overwriting the previous indentation.
if not child.tail.strip():
child.tail = indentations[level]
_indent_children(tree, 0)
_no_eol = '\\ No newline at end of file'
_hdr_pat = re.compile(r'^@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@$')
def make_patch(a, b):
"""
Get unified string diff between two strings. Trims top two lines.
Returns empty string if strings are identical.
"""
diffs = difflib.unified_diff(a.splitlines(True), b.splitlines(True), n=0)
try:
_, _ = next(diffs), next(diffs)
except StopIteration:
pass
return ''.join([d if d[-1] == '\n' else d + '\n' + _no_eol + '\n' for d in diffs])
def apply_patch(s, patch, revert=False):
"""
Apply patch to string s to recover newer string.
If revert is True, treat s as the newer string, recover older string.
"""
s = s.splitlines(True)
p = patch.splitlines(True)
t = ''
i = sl = 0
(midx, sign) = (1, '+') if not revert else (3, '-')
while i < len(p) and p[i].startswith(('---', '+++')):
i += 1 # skip header lines
while i < len(p):
m = _hdr_pat.match(p[i])
if not m:
raise Exception('Bad patch -- regex mismatch [line ' + str(i) + ']')
_l = int(m.group(midx)) - 1 + (m.group(midx + 1) == '0')
if sl > _l or _l > len(s):
raise Exception('Bad patch -- bad line num [line ' + str(i) + ']')
t += ''.join(s[sl:_l])
sl = _l
i += 1
while i < len(p) and p[i][0] != '@':
if i + 1 < len(p) and p[i + 1][0] == '\\':
line = p[i][:-1]
i += 2
else:
line = p[i]
i += 1
if len(line) > 0:
if line[0] == sign or line[0] == ' ':
t += line[1:]
sl += line[0] != sign
t += ''.join(s[sl:])
return t
class Snapshot:
id = None
object_type = None # (formdef, carddef, blockdef, workflow, data_source, etc.)
object_id = None
timestamp = None
user_id = None
comment = None
serialization = None
patch = None
label = None # (named snapshot)
test_result_id = None
application_slug = None
application_version = None
# cache
_instance = None
_user = None
_category_types = [
'block_category',
'card_category',
'data_source_category',
'category',
'mail_template_category',
'comment_template_category',
'workflow_category',
]
@classmethod
def snap(cls, instance, comment=None, label=None, store_user=None, application=None):
obj = cls()
obj.object_type = instance.xml_root_node
obj.object_id = instance.id
obj.timestamp = now()
# store_user:
# None/True: get user from active session
# False: do not store user
# any value: consider it as user id
# (store_user is explicitely checked to be a boolean, to avoid the "1" integer being treated as True)
if store_user is None or (isinstance(store_user, bool) and store_user is True):
if get_session():
obj.user_id = get_session().user
elif store_user:
obj.user_id = store_user
tree = instance.export_to_xml(include_id=True)
# remove position for categories
if obj.object_type in cls._category_types:
for position in tree.findall('position'):
tree.remove(position)
obj.serialization = ET.tostring(tree).decode('utf-8')
obj.comment = str(comment) if comment else None
obj.label = label
if application is not None:
obj.application_slug = application.slug
obj.application_version = application.version_number
latest_complete = cls.get_latest(obj.object_type, obj.object_id, complete=True)
if latest_complete is None:
# no complete snapshot, store it, with serialization and no patch
obj.store()
return
# should we store a snapshot ?
store_snapshot = True
# get patch between latest serialization and current instance
# indent xml to minimize patch
try:
latest_tree = ET.fromstring(latest_complete.serialization)
except ET.ParseError:
patch = None
else:
indent(tree)
indent(latest_tree)
patch = make_patch(ET.tostring(latest_tree).decode('utf-8'), ET.tostring(tree).decode('utf-8'))
if label is None:
# compare with patch of latest snapshot
latest = cls.get_latest(obj.object_type, obj.object_id)
if latest.patch and patch == latest.patch:
# previous snapshot contains a patch (but no serialization)
# and the current patch is the same as in the previous snapshot
store_snapshot = False
elif latest.serialization and not patch:
# previous snapshot contains a serialization (but no patch)
# and there is no difference (no patch)
store_snapshot = False
if application is not None:
# always store a snapshot on application import, we want to have a trace in history
store_snapshot = True
if store_snapshot:
if patch is not None and len(patch) < min(len(obj.serialization) / 10, 1_000_000):
# patch is small (compared to full serialization and an absolute value)
# store patch instead of full serialization
obj.serialization = None
obj.patch = patch
# else: keep serialization and ignore patch
obj.store()
if get_response() and obj.object_type in ('formdef', 'carddef'):
from wcs.admin.tests import TestsAfterJob
get_response().add_after_job(
TestsAfterJob(instance, reason=obj.label or obj.comment, snapshot=obj)
)
@classmethod
def get_recent_changes(cls, object_types=None, user=None, limit=5, offset=0):
elements = cls._get_recent_changes(object_types=object_types, user=user, limit=limit, offset=offset)
instances = []
for object_type, object_id, snapshot_timestamp in elements:
klass = cls.get_class(object_type)
instance = klass.get(object_id, ignore_errors=True)
if instance:
instance.snapshot_timestamp = snapshot_timestamp
instances.append(instance)
return instances
def get_object_class(self):
return get_publisher().get_object_class(self.object_type)
@classmethod
def get_class(cls, object_type):
return get_publisher().get_object_class(object_type)
def get_serialization(self, indented=True):
# there is a complete serialization
if self.serialization:
if not indented:
return self.serialization
tree = ET.fromstring(self.serialization)
indent(tree)
return ET.tostring(tree).decode('utf-8')
# get latest version with serialization
latest_complete = self.__class__.get_latest(
self.object_type, self.object_id, complete=True, max_timestamp=self.timestamp
)
latest_tree = ET.fromstring(latest_complete.serialization)
indent(latest_tree)
serialization = apply_patch(ET.tostring(latest_tree).decode('utf-8'), self.patch or '')
return serialization
@property
def instance(self):
if self._instance is None:
tree = ET.fromstring(self.get_serialization())
self._instance = self.get_object_class().import_from_xml_tree(
tree,
include_id=True,
snapshot=True,
check_datasources=getattr(self, '_check_datasources', True),
check_deprecated=False,
)
self._instance.readonly = True
self._instance.snapshot_object = self
return self._instance
@property
def user(self):
if not self.user_id:
return None
if self._user is None:
try:
self._user = get_publisher().user_class.get(self.user_id)
except KeyError:
self._user = UnknownUser()
return self._user
def load_history(self):
if not self.instance:
self._history = []
return
history = get_publisher().snapshot_class.select_object_history(self.instance)
self._history = [s.id for s in history]
@property
def previous(self):
if not hasattr(self, '_history'):
self.load_history()
try:
idx = self._history.index(self.id)
except ValueError:
return None
if idx == 0:
return None
return self._history[idx - 1]
@property
def next(self):
if not hasattr(self, '_history'):
self.load_history()
try:
idx = self._history.index(self.id)
except ValueError:
return None
try:
return self._history[idx + 1]
except IndexError:
return None
@property
def first(self):
if not hasattr(self, '_history'):
self.load_history()
return self._history[0]
@property
def last(self):
if not hasattr(self, '_history'):
self.load_history()
return self._history[-1]
def restore(self, as_new=True):
instance = self.instance
if as_new:
for attr in ('id', 'url_name', 'internal_identifier', 'slug'):
try:
setattr(instance, attr, None)
except AttributeError:
# attribute can be a property without setter
pass
if self.object_type in self._category_types:
# set position
instance.position = max(i.position or 0 for i in self.get_object_class().select()) + 1
elif self.object_type == 'testdef':
instance.workflow_tests.id = None
for response in instance.get_webservice_responses():
response.id = None
if hasattr(instance, 'disabled'):
instance.disabled = True
else:
# keep table and position from current object
current_object = self.get_object_class().get(instance.id)
for attr in ('table_name', 'position'):
if attr != 'position' or self.object_type in self._category_types:
if hasattr(current_object, attr):
setattr(instance, attr, getattr(current_object, attr))
delattr(instance, 'readonly')
delattr(instance, 'snapshot_object')
instance.store(
comment=_('Restored snapshot %(id)s (%(timestamp)s)')
% {'id': self.id, 'timestamp': misc.localstrftime(self.timestamp)}
)