2020-08-10 13:10:04 +02:00
|
|
|
# w.c.s. - web application for online forms
|
|
|
|
# Copyright (C) 2005-2020 Entr'ouvert
|
|
|
|
#
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
2021-10-05 11:38:28 +02:00
|
|
|
import difflib
|
|
|
|
import re
|
2023-06-21 17:00:49 +02:00
|
|
|
import xml.etree.ElementTree as ET
|
2020-08-10 13:10:04 +02:00
|
|
|
|
|
|
|
from django.utils.timezone import now
|
2023-02-13 14:44:03 +01:00
|
|
|
from quixote import get_publisher, get_response, get_session
|
2020-08-10 13:10:04 +02:00
|
|
|
|
2020-08-11 15:16:05 +02:00
|
|
|
from wcs.qommon import _, misc
|
2020-08-10 20:29:52 +02:00
|
|
|
|
|
|
|
|
|
|
|
class UnknownUser:
|
|
|
|
def __str__(self):
|
2021-05-15 15:34:16 +02:00
|
|
|
return str(_('unknown user'))
|
2020-08-10 13:10:04 +02:00
|
|
|
|
|
|
|
|
2023-06-21 17:00:49 +02:00
|
|
|
def indent(tree, space=' ', level=0):
|
|
|
|
# backport from Lib/xml/etree/ElementTree.py python 3.9
|
|
|
|
if isinstance(tree, ET.ElementTree):
|
|
|
|
tree = tree.getroot()
|
|
|
|
if level < 0:
|
|
|
|
raise ValueError(f'Initial indentation level must be >= 0, got {level}')
|
|
|
|
if len(tree) == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
# Reduce the memory consumption by reusing indentation strings.
|
|
|
|
indentations = ['\n' + level * space]
|
|
|
|
|
|
|
|
def _indent_children(elem, level):
|
|
|
|
# Start a new indentation level for the first child.
|
|
|
|
child_level = level + 1
|
|
|
|
try:
|
|
|
|
child_indentation = indentations[child_level]
|
|
|
|
except IndexError:
|
|
|
|
child_indentation = indentations[level] + space
|
|
|
|
indentations.append(child_indentation)
|
|
|
|
|
|
|
|
if not elem.text or not elem.text.strip():
|
|
|
|
elem.text = child_indentation
|
|
|
|
|
|
|
|
for child in elem:
|
|
|
|
if len(child):
|
|
|
|
_indent_children(child, child_level)
|
|
|
|
if not child.tail or not child.tail.strip():
|
|
|
|
child.tail = child_indentation
|
|
|
|
|
|
|
|
# Dedent after the last child by overwriting the previous indentation.
|
|
|
|
if not child.tail.strip():
|
|
|
|
child.tail = indentations[level]
|
|
|
|
|
|
|
|
_indent_children(tree, 0)
|
|
|
|
|
|
|
|
|
2021-10-05 11:38:28 +02:00
|
|
|
_no_eol = '\\ No newline at end of file'
|
|
|
|
_hdr_pat = re.compile(r'^@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@$')
|
|
|
|
|
|
|
|
|
|
|
|
def make_patch(a, b):
|
|
|
|
"""
|
|
|
|
Get unified string diff between two strings. Trims top two lines.
|
|
|
|
Returns empty string if strings are identical.
|
|
|
|
"""
|
|
|
|
diffs = difflib.unified_diff(a.splitlines(True), b.splitlines(True), n=0)
|
|
|
|
try:
|
|
|
|
_, _ = next(diffs), next(diffs)
|
|
|
|
except StopIteration:
|
|
|
|
pass
|
|
|
|
return ''.join([d if d[-1] == '\n' else d + '\n' + _no_eol + '\n' for d in diffs])
|
|
|
|
|
|
|
|
|
|
|
|
def apply_patch(s, patch, revert=False):
|
|
|
|
"""
|
|
|
|
Apply patch to string s to recover newer string.
|
|
|
|
If revert is True, treat s as the newer string, recover older string.
|
|
|
|
"""
|
|
|
|
s = s.splitlines(True)
|
|
|
|
p = patch.splitlines(True)
|
|
|
|
t = ''
|
|
|
|
i = sl = 0
|
|
|
|
(midx, sign) = (1, '+') if not revert else (3, '-')
|
|
|
|
while i < len(p) and p[i].startswith(('---', '+++')):
|
|
|
|
i += 1 # skip header lines
|
|
|
|
while i < len(p):
|
|
|
|
m = _hdr_pat.match(p[i])
|
|
|
|
if not m:
|
|
|
|
raise Exception('Bad patch -- regex mismatch [line ' + str(i) + ']')
|
|
|
|
_l = int(m.group(midx)) - 1 + (m.group(midx + 1) == '0')
|
|
|
|
if sl > _l or _l > len(s):
|
|
|
|
raise Exception('Bad patch -- bad line num [line ' + str(i) + ']')
|
|
|
|
t += ''.join(s[sl:_l])
|
|
|
|
sl = _l
|
|
|
|
i += 1
|
|
|
|
while i < len(p) and p[i][0] != '@':
|
|
|
|
if i + 1 < len(p) and p[i + 1][0] == '\\':
|
|
|
|
line = p[i][:-1]
|
|
|
|
i += 2
|
|
|
|
else:
|
|
|
|
line = p[i]
|
|
|
|
i += 1
|
|
|
|
if len(line) > 0:
|
|
|
|
if line[0] == sign or line[0] == ' ':
|
|
|
|
t += line[1:]
|
|
|
|
sl += line[0] != sign
|
|
|
|
t += ''.join(s[sl:])
|
|
|
|
return t
|
|
|
|
|
|
|
|
|
2020-08-10 13:10:04 +02:00
|
|
|
class Snapshot:
|
|
|
|
id = None
|
|
|
|
object_type = None # (formdef, carddef, blockdef, workflow, data_source, etc.)
|
|
|
|
object_id = None
|
|
|
|
timestamp = None
|
|
|
|
user_id = None
|
|
|
|
comment = None
|
|
|
|
serialization = None
|
2021-10-05 11:38:28 +02:00
|
|
|
patch = None
|
2020-08-10 13:10:04 +02:00
|
|
|
label = None # (named snapshot)
|
2023-02-08 16:31:43 +01:00
|
|
|
test_result_id = None
|
2020-08-10 13:10:04 +02:00
|
|
|
|
2023-08-03 10:44:35 +02:00
|
|
|
application_slug = None
|
|
|
|
application_version = None
|
|
|
|
|
2020-08-10 20:29:52 +02:00
|
|
|
# cache
|
|
|
|
_instance = None
|
|
|
|
_user = None
|
|
|
|
|
2024-02-09 17:40:38 +01:00
|
|
|
_category_types = [
|
|
|
|
'block_category',
|
|
|
|
'card_category',
|
|
|
|
'data_source_category',
|
|
|
|
'category',
|
|
|
|
'mail_template_category',
|
|
|
|
'comment_template_category',
|
|
|
|
'workflow_category',
|
|
|
|
]
|
|
|
|
|
2020-08-10 13:10:04 +02:00
|
|
|
@classmethod
|
2024-03-06 17:54:36 +01:00
|
|
|
def snap(cls, instance, comment=None, label=None, store_user=None, application=None):
|
2020-08-10 13:10:04 +02:00
|
|
|
obj = cls()
|
|
|
|
obj.object_type = instance.xml_root_node
|
|
|
|
obj.object_id = instance.id
|
|
|
|
obj.timestamp = now()
|
2024-03-06 17:54:36 +01:00
|
|
|
# store_user:
|
|
|
|
# None/True: get user from active session
|
|
|
|
# False: do not store user
|
|
|
|
# any value: consider it as user id
|
|
|
|
# (store_user is explicitely checked to be a boolean, to avoid the "1" integer being treated as True)
|
|
|
|
if store_user is None or (isinstance(store_user, bool) and store_user is True):
|
|
|
|
if get_session():
|
|
|
|
obj.user_id = get_session().user
|
|
|
|
elif store_user:
|
|
|
|
obj.user_id = store_user
|
2024-02-09 17:40:38 +01:00
|
|
|
|
2021-10-05 11:38:28 +02:00
|
|
|
tree = instance.export_to_xml(include_id=True)
|
2024-02-09 17:40:38 +01:00
|
|
|
# remove position for categories
|
|
|
|
if obj.object_type in cls._category_types:
|
|
|
|
for position in tree.findall('position'):
|
|
|
|
tree.remove(position)
|
|
|
|
|
2021-10-05 11:38:28 +02:00
|
|
|
obj.serialization = ET.tostring(tree).decode('utf-8')
|
2021-05-15 15:34:16 +02:00
|
|
|
obj.comment = str(comment) if comment else None
|
2020-08-17 17:23:53 +02:00
|
|
|
obj.label = label
|
2023-08-03 10:44:35 +02:00
|
|
|
if application is not None:
|
|
|
|
obj.application_slug = application.slug
|
|
|
|
obj.application_version = application.version_number
|
2021-10-05 11:38:28 +02:00
|
|
|
|
|
|
|
latest_complete = cls.get_latest(obj.object_type, obj.object_id, complete=True)
|
|
|
|
if latest_complete is None:
|
|
|
|
# no complete snapshot, store it, with serialization and no patch
|
|
|
|
obj.store()
|
|
|
|
return
|
|
|
|
|
|
|
|
# should we store a snapshot ?
|
|
|
|
store_snapshot = True
|
2023-06-21 17:14:45 +02:00
|
|
|
|
|
|
|
# get patch between latest serialization and current instance
|
|
|
|
# indent xml to minimize patch
|
|
|
|
try:
|
|
|
|
latest_tree = ET.fromstring(latest_complete.serialization)
|
|
|
|
except ET.ParseError:
|
|
|
|
patch = None
|
|
|
|
else:
|
|
|
|
indent(tree)
|
|
|
|
indent(latest_tree)
|
|
|
|
patch = make_patch(ET.tostring(latest_tree).decode('utf-8'), ET.tostring(tree).decode('utf-8'))
|
|
|
|
if label is None:
|
|
|
|
# compare with patch of latest snapshot
|
|
|
|
latest = cls.get_latest(obj.object_type, obj.object_id)
|
|
|
|
if latest.patch and patch == latest.patch:
|
|
|
|
# previous snapshot contains a patch (but no serialization)
|
|
|
|
# and the current patch is the same as in the previous snapshot
|
|
|
|
store_snapshot = False
|
|
|
|
elif latest.serialization and not patch:
|
|
|
|
# previous snapshot contains a serialization (but no patch)
|
|
|
|
# and there is no difference (no patch)
|
|
|
|
store_snapshot = False
|
2021-10-05 11:38:28 +02:00
|
|
|
|
2023-08-03 10:44:35 +02:00
|
|
|
if application is not None:
|
|
|
|
# always store a snapshot on application import, we want to have a trace in history
|
|
|
|
store_snapshot = True
|
|
|
|
|
2021-10-05 11:38:28 +02:00
|
|
|
if store_snapshot:
|
2023-06-21 17:14:45 +02:00
|
|
|
if patch is not None and len(patch) < min(len(obj.serialization) / 10, 1_000_000):
|
2022-05-20 10:29:22 +02:00
|
|
|
# patch is small (compared to full serialization and an absolute value)
|
|
|
|
# store patch instead of full serialization
|
2021-10-05 11:38:28 +02:00
|
|
|
obj.serialization = None
|
|
|
|
obj.patch = patch
|
|
|
|
# else: keep serialization and ignore patch
|
2020-08-10 13:10:04 +02:00
|
|
|
obj.store()
|
2020-08-10 20:29:52 +02:00
|
|
|
|
2024-03-27 15:54:53 +01:00
|
|
|
if get_response() and obj.object_type in ('formdef', 'carddef'):
|
2023-02-13 14:44:03 +01:00
|
|
|
from wcs.admin.tests import TestsAfterJob
|
|
|
|
|
|
|
|
get_response().add_after_job(
|
|
|
|
TestsAfterJob(instance, reason=obj.label or obj.comment, snapshot=obj)
|
|
|
|
)
|
|
|
|
|
2022-03-03 08:48:11 +01:00
|
|
|
@classmethod
|
2022-06-29 22:11:19 +02:00
|
|
|
def get_recent_changes(cls, object_types=None, user=None, limit=5, offset=0):
|
|
|
|
elements = cls._get_recent_changes(object_types=object_types, user=user, limit=limit, offset=offset)
|
2022-03-03 08:48:11 +01:00
|
|
|
instances = []
|
2022-03-15 10:19:26 +01:00
|
|
|
for object_type, object_id, snapshot_timestamp in elements:
|
2022-03-03 08:48:11 +01:00
|
|
|
klass = cls.get_class(object_type)
|
|
|
|
instance = klass.get(object_id, ignore_errors=True)
|
|
|
|
if instance:
|
2022-03-15 10:19:26 +01:00
|
|
|
instance.snapshot_timestamp = snapshot_timestamp
|
2022-03-03 08:48:11 +01:00
|
|
|
instances.append(instance)
|
|
|
|
return instances
|
|
|
|
|
2020-08-10 20:29:52 +02:00
|
|
|
def get_object_class(self):
|
2022-02-24 22:47:53 +01:00
|
|
|
return get_publisher().get_object_class(self.object_type)
|
2022-03-03 08:48:11 +01:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_class(cls, object_type):
|
2022-02-24 22:47:53 +01:00
|
|
|
return get_publisher().get_object_class(object_type)
|
2020-08-10 20:29:52 +02:00
|
|
|
|
2021-11-20 18:27:53 +01:00
|
|
|
def get_serialization(self, indented=True):
|
2021-10-05 11:38:28 +02:00
|
|
|
# there is a complete serialization
|
|
|
|
if self.serialization:
|
2021-11-20 18:27:53 +01:00
|
|
|
if not indented:
|
|
|
|
return self.serialization
|
|
|
|
|
|
|
|
tree = ET.fromstring(self.serialization)
|
2023-06-21 17:00:49 +02:00
|
|
|
indent(tree)
|
2021-11-20 18:27:53 +01:00
|
|
|
return ET.tostring(tree).decode('utf-8')
|
2021-10-05 11:38:28 +02:00
|
|
|
|
|
|
|
# get latest version with serialization
|
2021-11-12 11:24:23 +01:00
|
|
|
latest_complete = self.__class__.get_latest(
|
|
|
|
self.object_type, self.object_id, complete=True, max_timestamp=self.timestamp
|
|
|
|
)
|
2021-10-05 11:38:28 +02:00
|
|
|
latest_tree = ET.fromstring(latest_complete.serialization)
|
2023-06-21 17:00:49 +02:00
|
|
|
indent(latest_tree)
|
2021-10-05 11:38:28 +02:00
|
|
|
serialization = apply_patch(ET.tostring(latest_tree).decode('utf-8'), self.patch or '')
|
|
|
|
return serialization
|
|
|
|
|
2020-08-10 20:29:52 +02:00
|
|
|
@property
|
|
|
|
def instance(self):
|
|
|
|
if self._instance is None:
|
2021-10-05 11:38:28 +02:00
|
|
|
tree = ET.fromstring(self.get_serialization())
|
2020-10-19 16:17:36 +02:00
|
|
|
self._instance = self.get_object_class().import_from_xml_tree(
|
2021-03-12 16:24:45 +01:00
|
|
|
tree,
|
|
|
|
include_id=True,
|
|
|
|
snapshot=True,
|
|
|
|
check_datasources=getattr(self, '_check_datasources', True),
|
2024-04-05 16:04:25 +02:00
|
|
|
check_deprecated=False,
|
2020-10-19 16:17:36 +02:00
|
|
|
)
|
2020-08-15 17:03:46 +02:00
|
|
|
self._instance.readonly = True
|
2021-01-12 14:55:54 +01:00
|
|
|
self._instance.snapshot_object = self
|
2020-08-10 20:29:52 +02:00
|
|
|
return self._instance
|
|
|
|
|
|
|
|
@property
|
|
|
|
def user(self):
|
|
|
|
if not self.user_id:
|
|
|
|
return None
|
|
|
|
if self._user is None:
|
|
|
|
try:
|
|
|
|
self._user = get_publisher().user_class.get(self.user_id)
|
|
|
|
except KeyError:
|
|
|
|
self._user = UnknownUser()
|
|
|
|
return self._user
|
2020-08-11 15:16:05 +02:00
|
|
|
|
2021-01-12 15:39:09 +01:00
|
|
|
def load_history(self):
|
|
|
|
if not self.instance:
|
|
|
|
self._history = []
|
|
|
|
return
|
|
|
|
history = get_publisher().snapshot_class.select_object_history(self.instance)
|
|
|
|
self._history = [s.id for s in history]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def previous(self):
|
|
|
|
if not hasattr(self, '_history'):
|
|
|
|
self.load_history()
|
|
|
|
|
|
|
|
try:
|
|
|
|
idx = self._history.index(self.id)
|
|
|
|
except ValueError:
|
|
|
|
return None
|
|
|
|
if idx == 0:
|
|
|
|
return None
|
|
|
|
return self._history[idx - 1]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def next(self):
|
|
|
|
if not hasattr(self, '_history'):
|
|
|
|
self.load_history()
|
|
|
|
|
|
|
|
try:
|
|
|
|
idx = self._history.index(self.id)
|
|
|
|
except ValueError:
|
|
|
|
return None
|
|
|
|
try:
|
|
|
|
return self._history[idx + 1]
|
|
|
|
except IndexError:
|
|
|
|
return None
|
|
|
|
|
|
|
|
@property
|
|
|
|
def first(self):
|
|
|
|
if not hasattr(self, '_history'):
|
|
|
|
self.load_history()
|
|
|
|
|
|
|
|
return self._history[0]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def last(self):
|
|
|
|
if not hasattr(self, '_history'):
|
|
|
|
self.load_history()
|
|
|
|
|
|
|
|
return self._history[-1]
|
|
|
|
|
2020-08-11 15:16:05 +02:00
|
|
|
def restore(self, as_new=True):
|
|
|
|
instance = self.instance
|
|
|
|
if as_new:
|
|
|
|
for attr in ('id', 'url_name', 'internal_identifier', 'slug'):
|
2024-02-09 17:40:38 +01:00
|
|
|
try:
|
|
|
|
setattr(instance, attr, None)
|
|
|
|
except AttributeError:
|
|
|
|
# attribute can be a property without setter
|
|
|
|
pass
|
|
|
|
if self.object_type in self._category_types:
|
|
|
|
# set position
|
2024-02-09 18:55:17 +01:00
|
|
|
instance.position = max(i.position or 0 for i in self.get_object_class().select()) + 1
|
2024-03-28 12:26:52 +01:00
|
|
|
elif self.object_type == 'testdef':
|
|
|
|
instance.workflow_tests.id = None
|
|
|
|
for response in instance.get_webservice_responses():
|
|
|
|
response.id = None
|
2020-08-11 15:16:05 +02:00
|
|
|
if hasattr(instance, 'disabled'):
|
|
|
|
instance.disabled = True
|
|
|
|
else:
|
2024-02-09 17:40:38 +01:00
|
|
|
# keep table and position from current object
|
2020-08-11 15:16:05 +02:00
|
|
|
current_object = self.get_object_class().get(instance.id)
|
2024-02-09 17:40:38 +01:00
|
|
|
for attr in ('table_name', 'position'):
|
|
|
|
if attr != 'position' or self.object_type in self._category_types:
|
|
|
|
if hasattr(current_object, attr):
|
|
|
|
setattr(instance, attr, getattr(current_object, attr))
|
2020-08-11 15:16:05 +02:00
|
|
|
|
2020-08-15 17:03:46 +02:00
|
|
|
delattr(instance, 'readonly')
|
2021-03-19 12:10:31 +01:00
|
|
|
delattr(instance, 'snapshot_object')
|
2020-08-11 15:16:05 +02:00
|
|
|
instance.store(
|
|
|
|
comment=_('Restored snapshot %(id)s (%(timestamp)s)')
|
|
|
|
% {'id': self.id, 'timestamp': misc.localstrftime(self.timestamp)}
|
|
|
|
)
|