# w.c.s. - web application for online forms # Copyright (C) 2005-2020 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import difflib import re import xml.etree.ElementTree as ET from django.utils.timezone import now from quixote import get_publisher, get_response, get_session from wcs.qommon import _, misc class UnknownUser: def __str__(self): return str(_('unknown user')) def indent(tree, space=' ', level=0): # backport from Lib/xml/etree/ElementTree.py python 3.9 if isinstance(tree, ET.ElementTree): tree = tree.getroot() if level < 0: raise ValueError(f'Initial indentation level must be >= 0, got {level}') if len(tree) == 0: return # Reduce the memory consumption by reusing indentation strings. indentations = ['\n' + level * space] def _indent_children(elem, level): # Start a new indentation level for the first child. child_level = level + 1 try: child_indentation = indentations[child_level] except IndexError: child_indentation = indentations[level] + space indentations.append(child_indentation) if not elem.text or not elem.text.strip(): elem.text = child_indentation for child in elem: if len(child): _indent_children(child, child_level) if not child.tail or not child.tail.strip(): child.tail = child_indentation # Dedent after the last child by overwriting the previous indentation. if not child.tail.strip(): child.tail = indentations[level] _indent_children(tree, 0) _no_eol = '\\ No newline at end of file' _hdr_pat = re.compile(r'^@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@$') def make_patch(a, b): """ Get unified string diff between two strings. Trims top two lines. Returns empty string if strings are identical. """ diffs = difflib.unified_diff(a.splitlines(True), b.splitlines(True), n=0) try: _, _ = next(diffs), next(diffs) except StopIteration: pass return ''.join([d if d[-1] == '\n' else d + '\n' + _no_eol + '\n' for d in diffs]) def apply_patch(s, patch, revert=False): """ Apply patch to string s to recover newer string. If revert is True, treat s as the newer string, recover older string. """ s = s.splitlines(True) p = patch.splitlines(True) t = '' i = sl = 0 (midx, sign) = (1, '+') if not revert else (3, '-') while i < len(p) and p[i].startswith(('---', '+++')): i += 1 # skip header lines while i < len(p): m = _hdr_pat.match(p[i]) if not m: raise Exception('Bad patch -- regex mismatch [line ' + str(i) + ']') _l = int(m.group(midx)) - 1 + (m.group(midx + 1) == '0') if sl > _l or _l > len(s): raise Exception('Bad patch -- bad line num [line ' + str(i) + ']') t += ''.join(s[sl:_l]) sl = _l i += 1 while i < len(p) and p[i][0] != '@': if i + 1 < len(p) and p[i + 1][0] == '\\': line = p[i][:-1] i += 2 else: line = p[i] i += 1 if len(line) > 0: if line[0] == sign or line[0] == ' ': t += line[1:] sl += line[0] != sign t += ''.join(s[sl:]) return t class Snapshot: id = None object_type = None # (formdef, carddef, blockdef, workflow, data_source, etc.) object_id = None timestamp = None user_id = None comment = None serialization = None patch = None label = None # (named snapshot) test_result_id = None application_slug = None application_version = None # cache _instance = None _user = None _category_types = [ 'block_category', 'card_category', 'data_source_category', 'category', 'mail_template_category', 'comment_template_category', 'workflow_category', ] @classmethod def snap(cls, instance, comment=None, label=None, store_user=None, application=None): obj = cls() obj.object_type = instance.xml_root_node obj.object_id = instance.id obj.timestamp = now() # store_user: # None/True: get user from active session # False: do not store user # any value: consider it as user id # (store_user is explicitely checked to be a boolean, to avoid the "1" integer being treated as True) if store_user is None or (isinstance(store_user, bool) and store_user is True): if get_session(): obj.user_id = get_session().user elif store_user: obj.user_id = store_user tree = instance.export_to_xml(include_id=True) # remove position for categories if obj.object_type in cls._category_types: for position in tree.findall('position'): tree.remove(position) obj.serialization = ET.tostring(tree).decode('utf-8') obj.comment = str(comment) if comment else None obj.label = label if application is not None: obj.application_slug = application.slug obj.application_version = application.version_number latest_complete = cls.get_latest(obj.object_type, obj.object_id, complete=True) if latest_complete is None: # no complete snapshot, store it, with serialization and no patch obj.store() return # should we store a snapshot ? store_snapshot = True # get patch between latest serialization and current instance # indent xml to minimize patch try: latest_tree = ET.fromstring(latest_complete.serialization) except ET.ParseError: patch = None else: indent(tree) indent(latest_tree) patch = make_patch(ET.tostring(latest_tree).decode('utf-8'), ET.tostring(tree).decode('utf-8')) if label is None: # compare with patch of latest snapshot latest = cls.get_latest(obj.object_type, obj.object_id) if latest.patch and patch == latest.patch: # previous snapshot contains a patch (but no serialization) # and the current patch is the same as in the previous snapshot store_snapshot = False elif latest.serialization and not patch: # previous snapshot contains a serialization (but no patch) # and there is no difference (no patch) store_snapshot = False if application is not None: # always store a snapshot on application import, we want to have a trace in history store_snapshot = True if store_snapshot: if patch is not None and len(patch) < min(len(obj.serialization) / 10, 1_000_000): # patch is small (compared to full serialization and an absolute value) # store patch instead of full serialization obj.serialization = None obj.patch = patch # else: keep serialization and ignore patch obj.store() if get_response() and obj.object_type in ('formdef', 'carddef'): from wcs.admin.tests import TestsAfterJob get_response().add_after_job( TestsAfterJob(instance, reason=obj.label or obj.comment, snapshot=obj) ) @classmethod def get_recent_changes(cls, object_types=None, user=None, limit=5, offset=0): elements = cls._get_recent_changes(object_types=object_types, user=user, limit=limit, offset=offset) instances = [] for object_type, object_id, snapshot_timestamp in elements: klass = cls.get_class(object_type) instance = klass.get(object_id, ignore_errors=True) if instance: instance.snapshot_timestamp = snapshot_timestamp instances.append(instance) return instances def get_object_class(self): return get_publisher().get_object_class(self.object_type) @classmethod def get_class(cls, object_type): return get_publisher().get_object_class(object_type) def get_serialization(self, indented=True): # there is a complete serialization if self.serialization: if not indented: return self.serialization tree = ET.fromstring(self.serialization) indent(tree) return ET.tostring(tree).decode('utf-8') # get latest version with serialization latest_complete = self.__class__.get_latest( self.object_type, self.object_id, complete=True, max_timestamp=self.timestamp ) latest_tree = ET.fromstring(latest_complete.serialization) indent(latest_tree) serialization = apply_patch(ET.tostring(latest_tree).decode('utf-8'), self.patch or '') return serialization @property def instance(self): if self._instance is None: tree = ET.fromstring(self.get_serialization()) self._instance = self.get_object_class().import_from_xml_tree( tree, include_id=True, snapshot=True, check_datasources=getattr(self, '_check_datasources', True), check_deprecated=False, ) self._instance.readonly = True self._instance.snapshot_object = self return self._instance @property def user(self): if not self.user_id: return None if self._user is None: try: self._user = get_publisher().user_class.get(self.user_id) except KeyError: self._user = UnknownUser() return self._user def load_history(self): if not self.instance: self._history = [] return history = get_publisher().snapshot_class.select_object_history(self.instance) self._history = [s.id for s in history] @property def previous(self): if not hasattr(self, '_history'): self.load_history() try: idx = self._history.index(self.id) except ValueError: return None if idx == 0: return None return self._history[idx - 1] @property def next(self): if not hasattr(self, '_history'): self.load_history() try: idx = self._history.index(self.id) except ValueError: return None try: return self._history[idx + 1] except IndexError: return None @property def first(self): if not hasattr(self, '_history'): self.load_history() return self._history[0] @property def last(self): if not hasattr(self, '_history'): self.load_history() return self._history[-1] def restore(self, as_new=True): instance = self.instance if as_new: for attr in ('id', 'url_name', 'internal_identifier', 'slug'): try: setattr(instance, attr, None) except AttributeError: # attribute can be a property without setter pass if self.object_type in self._category_types: # set position instance.position = max(i.position or 0 for i in self.get_object_class().select()) + 1 elif self.object_type == 'testdef': instance.workflow_tests.id = None for response in instance.get_webservice_responses(): response.id = None if hasattr(instance, 'disabled'): instance.disabled = True else: # keep table and position from current object current_object = self.get_object_class().get(instance.id) for attr in ('table_name', 'position'): if attr != 'position' or self.object_type in self._category_types: if hasattr(current_object, attr): setattr(instance, attr, getattr(current_object, attr)) delattr(instance, 'readonly') delattr(instance, 'snapshot_object') instance.store( comment=_('Restored snapshot %(id)s (%(timestamp)s)') % {'id': self.id, 'timestamp': misc.localstrftime(self.timestamp)} )