# w.c.s. - web application for online forms # Copyright (C) 2005-2010 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import errno import operator import os import time import pickle import os.path import shutil import sys import tempfile from django.utils import six from django.utils.encoding import force_bytes from django.utils.six.moves import _thread from .vendor import locket from quixote import get_publisher from . import PICKLE_KWARGS def cache_umask(): global process_umask process_umask = os.umask(0) os.umask(process_umask) # cache umask when loading up the module cache_umask() def _take(objects, limit, offset=0): for y in objects: if offset: offset -= 1 continue if limit: limit -= 1 elif limit == 0: break elif limit is None: pass yield y def lax_int(s): try: return int(s) except ValueError: return -1 def fix_key(k): # insure key can be inserted in filesystem if not k: return k return str(k).replace('/', '-') def atomic_write(path, content, async_op=False): '''Rewrite a complete file automatically, that is write to new file with temporary name, fsync, then rename to final name. Use threads to remove blocking.''' def doit(): dirname = os.path.dirname(path) fd, temp = tempfile.mkstemp(dir=dirname, prefix='.tmp-'+os.path.basename(path)+'-') os.fchmod(fd, 0o666 & ~process_umask) f = os.fdopen(fd, "wb") if hasattr(content, 'read'): # file pointer def read100k(): return content.read(100000) for piece in iter(read100k, b''): f.write(piece) else: f.write(content) f.flush() os.fsync(f.fileno()) f.close() os.rename(temp, path) if async_op: _thread.start_new_thread(doit, ()) else: doit() class Criteria(object): def __init__(self, attribute, value): self.attribute = attribute self.value = value # Python 3 requires comparisons to disparate types, this means we need # to create a null value of the appropriate type, so None values can # still be sorted. self.typed_none = '' if isinstance(self.value, bool): self.typed_none = False elif isinstance(self.value, six.integer_types + (float,)): self.typed_none = -sys.maxsize elif isinstance(self.value, time.struct_time): self.typed_none = time.gmtime(-10**10) # 1653 def build_lambda(self): return lambda x: self.op(getattr(x, self.attribute, None) or self.typed_none, self.value) class Less(Criteria): op = operator.lt class Greater(Criteria): op = operator.gt class Equal(Criteria): op = operator.eq class NotEqual(Criteria): op = operator.ne class LessOrEqual(Criteria): op = operator.le class GreaterOrEqual(Criteria): op = operator.ge class Contains(Criteria): op = operator.contains def build_lambda(self): return lambda x: self.op(self.value, getattr(x, self.attribute, '')) class NotContains(Criteria): op = operator.contains def build_lambda(self): return lambda x: not self.op(self.value, getattr(x, self.attribute, '')) class Intersects(Criteria): def build_lambda(self): value = set(self.value) def func(x): try: return value.intersection(set(getattr(x, self.attribute, []) or [])) except KeyError: # this may happen if used to check a formdata field that didn't # exist when the formdata was created. return False return func class Or(Criteria): def __init__(self, criterias, **kwargs): self.criterias = criterias def build_lambda(self): func = lambda x: False def combine_callables(x1, x2): return lambda x: x1(x) or x2(x) for element in self.criterias: func = combine_callables(func, element.build_lambda()) return func class And(Criteria): def __init__(self, criterias, **kwargs): self.criterias = criterias def build_lambda(self): func = lambda x: True def combine_callables(x1, x2): return lambda x: x1(x) and x2(x) for element in self.criterias: func = combine_callables(func, element.build_lambda()) return func class ILike(Criteria): def build_lambda(self): return lambda x: self.value.lower() in (getattr(x, self.attribute, '') or '').lower() class FtsMatch(Criteria): def __init__(self, value): self.value = value def build_lambda(self): raise NotImplementedError() class NotNull(Criteria): def __init__(self, attribute): self.attribute = attribute def build_lambda(self): return lambda x: getattr(x, self.attribute, None) is not None class Null(Criteria): def __init__(self, attribute): self.attribute = attribute def build_lambda(self): return lambda x: getattr(x, self.attribute, None) is None def parse_clause(clause): # creates a callable out of a clause # (attribute, operator, value) if callable(clause): # already a callable return clause def combine_callables(x1, x2): return lambda x: x1(x) and x2(x) func = lambda x: True for element in clause: if callable(element): func = combine_callables(func, element) else: func = combine_callables(func, element.build_lambda()) return func class StorageIndexException(Exception): pass class StorableObject(object): _indexes = None _hashed_indexes = None _filename = None # None, unless must be saved to a specific location def __init__(self, id = None): self.id = id @classmethod def get_table_name(cls): return cls._names @classmethod def get_objects_dir(cls): return os.path.join(get_publisher().app_dir, cls.get_table_name()) @classmethod def keys(cls): if not os.path.exists(cls.get_objects_dir()): return [] return [fix_key(x) for x in os.listdir(cls.get_objects_dir()) if x[0] != '.'] @classmethod def values(cls, ignore_errors=False, ignore_migration=True): values = [cls.get(x, ignore_errors=ignore_errors, ignore_migration=True) for x in cls.keys()] return [x for x in values if x is not None] @classmethod def items(cls): return [(x, cls.get(x)) for x in cls.keys()] @classmethod def count(cls, clause=None): if clause: return len(cls.select(clause)) return len(cls.keys()) @classmethod def select(cls, clause=None, order_by=None, ignore_errors=False, ignore_migration=False, limit=None, offset=None, iterator=False, **kwargs): # iterator: only for compatibility with sql select() keys = cls.keys() objects = (cls.get(k, ignore_errors=ignore_errors, ignore_migration=ignore_migration, **kwargs) for k in keys) if ignore_errors: objects = (x for x in objects if x is not None) if clause: clause_function = parse_clause(clause) objects = (x for x in objects if clause_function(x)) if order_by: order_by = str(order_by) if order_by[0] == '-': reverse = True order_by = order_by[1:] else: reverse = False # only list can be sorted objects = list(objects) if order_by == 'id': key_function = lambda x: lax_int(x.id) elif order_by == 'name': # proper collation should be done but it's messy to get working # on all systems so we go the cheap and almost ok way. from .misc import simplify key_function = lambda x: simplify(x.name) elif order_by.endswith('_time'): typed_none = time.gmtime(-10**10) # 1653 key_function = lambda x: getattr(x, order_by) or typed_none else: key_function = lambda x: getattr(x, order_by) objects.sort(key=key_function) if reverse: objects.reverse() if limit or offset: objects = _take(objects, limit, offset) return list(objects) @classmethod def select_iterator(cls, **kwargs): for obj in cls.select(**kwargs): yield obj @classmethod def has_key(cls, id): filename = os.path.join(cls.get_objects_dir(), fix_key(id)) return os.path.exists(force_bytes(filename, 'utf-8')) @classmethod def get_new_id(cls, create=False): keys = cls.keys() if not keys: id = 1 else: id = max([lax_int(x) for x in keys]) + 1 if id == 0: id = len(keys)+1 if create: objects_dir = cls.get_objects_dir() object_filename = os.path.join(objects_dir, fix_key(id)) try: fd = os.open(object_filename, os.O_CREAT | os.O_EXCL) except OSError: return cls.get_new_id(create=True) os.close(fd) return str(id) @classmethod def get(cls, id, ignore_errors=False, ignore_migration=False, **kwargs): if id is None: if ignore_errors: return None else: raise KeyError() filename = os.path.join(cls.get_objects_dir(), fix_key(id)) return cls.get_filename(filename, ignore_errors=ignore_errors, ignore_migration=ignore_migration, **kwargs) @classmethod def get_ids(cls, ids, ignore_errors=False, keep_order=False): objects = [] for x in ids: obj = cls.get(x, ignore_errors=ignore_errors) if obj is not None: objects.append(obj) return objects @classmethod def get_on_index(cls, id, index, ignore_errors=False, ignore_migration=False): if not cls._indexes: raise KeyError() objects_dir = cls.get_objects_dir() index_dir = objects_dir + '-' + index if not os.path.exists(index_dir): cls.rebuild_indexes() filename = os.path.join(index_dir, str(fix_key(id))) return cls.get_filename(filename, ignore_errors=ignore_errors, ignore_migration=ignore_migration) @classmethod def get_ids_with_indexed_value(cls, index, value, auto_fallback=True): objects_dir = cls.get_objects_dir() index_dir = os.path.join(objects_dir, '.indexes', str(index)) index_file = os.path.join(index_dir, '%s-%s' % (index, fix_key(value))) if not os.path.exists(index_dir): if auto_fallback is False: raise StorageIndexException() try: cls.rebuild_indexes() except StorageIndexException: values = cls.select(ignore_errors=True) return [x for x in values if getattr(x, index) == value] if not os.path.exists(index_file): return [] return pickle.load(open(index_file, 'rb')) @classmethod def get_with_indexed_value(cls, index, value, ignore_errors = False): ids = cls.get_ids_with_indexed_value(str(index), str(value)) for x in ids: obj = cls.get(x, ignore_errors=ignore_errors) if obj is not None: yield obj @classmethod def storage_load(cls, fd): if get_publisher() and get_publisher().unpickler_class: unpickler = get_publisher().unpickler_class else: unpickler = pickle.Unpickler return unpickler(fd, **PICKLE_KWARGS).load() @classmethod def get_filename(cls, filename, ignore_errors=False, ignore_migration=False, **kwargs): try: fd = open(force_bytes(filename, 'utf-8'), 'rb') o = cls.storage_load(fd, **kwargs) except IOError: if ignore_errors: return None raise KeyError() except ImportError as e: if ignore_errors: return None raise KeyError() except EOFError as e: # maybe it's being written to, loop for a while to see current_position = fd.tell() fd.close() for i in range(10): time.sleep(0.01) if current_position != os.stat(filename).st_size: return cls.get_filename(filename, ignore_errors=ignore_errors, ignore_migration=ignore_migration) if ignore_errors: return None raise KeyError() o.__class__ = cls if not ignore_migration: o.id = str(o.id) # makes sure 'id' is a string if hasattr(cls, 'migrate'): o.migrate() return o @classmethod def rebuild_indexes(cls, indexes=[]): if not (cls._indexes or cls._hashed_indexes): return if not indexes: indexes = (cls._hashed_indexes or []) + (cls._indexes or []) objects_dir = cls.get_objects_dir() hashed_indexes = {} for index in cls._hashed_indexes or []: if index not in indexes: continue index_dir = os.path.join(objects_dir, '.indexes', index) if not os.path.exists(index_dir): try: os.makedirs(index_dir) except OSError: raise StorageIndexException() for object in cls.values(ignore_errors=True, ignore_migration=True): object_filename = os.path.join(objects_dir, fix_key(object.id)) relative_object_filename = os.path.join('..', cls.get_table_name(), fix_key(object.id)) for index in cls._indexes or []: if index not in indexes: continue if not hasattr(object, index) or getattr(object, index) is None: continue index_dir = objects_dir + '-' + index link_name = os.path.join(index_dir, fix_key(str(getattr(object, index)))) try: if relative_object_filename: os.symlink(relative_object_filename, link_name) else: os.symlink(object_filename, link_name) except OSError as exc: if exc.errno == 2: os.mkdir(index_dir) elif exc.errno == 17: try: os.unlink(link_name) except OSError as exc2: if exc2.errno != 2: # no such file or directory raise # link got created in a different process, move # along. continue else: raise if relative_object_filename: os.symlink(relative_object_filename, link_name) else: os.symlink(object_filename, link_name) for index in cls._hashed_indexes or []: if index not in indexes: continue if not hasattr(object, index) or getattr(object, index) is None: continue attribute = getattr(object, index) if type(attribute) is dict: attribute = attribute.values() elif type(attribute) not in (tuple, list, set): attribute = [attribute] for attr in attribute: attr_value = fix_key(attr) index_name = '%s-%s' % (index, attr_value) if not index_name in hashed_indexes: hashed_indexes[index_name] = [] hashed_indexes[index_name].append(str(object.id)) for index, content in hashed_indexes.items(): index_key = index.split('-')[0] if index_key not in indexes: continue index_file = os.path.join(objects_dir, '.indexes', index_key, index) pickle.dump(content, open(index_file, 'wb'), protocol=2) for index in cls._hashed_indexes or []: if index not in indexes: continue index_dir = os.path.join(objects_dir, '.indexes', index) for filename in os.listdir(index_dir): if filename not in hashed_indexes: os.unlink(os.path.join(index_dir, filename)) def get_object_filename(self): if self._filename: if self._filename[0] == '/': return self._filename else: return os.path.join(get_publisher().app_dir, self._filename) else: objects_dir = self.get_objects_dir() return os.path.join(objects_dir, fix_key(self.id)) @classmethod def storage_dumps(cls, object): return pickle.dumps(object, protocol=2) def store(self, async_op=False): objects_dir = self.get_objects_dir() new_object = False if self._filename: if self._filename[0] == '/': object_filename = self._filename relative_object_filename = None else: object_filename = os.path.join(get_publisher().app_dir, self._filename) relative_object_filename = os.path.join('..', self._filename) else: if not os.path.exists(objects_dir): try: os.mkdir(objects_dir) except OSError as error: if error.errno != 17: # 17 == Directory exists raise if self.id is None: self.id = self.get_new_id(create=True) new_object = True object_filename = os.path.join(objects_dir, fix_key(self.id)) relative_object_filename = os.path.join('..', self.get_table_name(), fix_key(self.id)) previous_object_value = None if not new_object and (self._indexes or self._hashed_indexes): previous_object_value = self.get_filename(object_filename, ignore_errors=True, ignore_migration=True) s = self.storage_dumps(self) atomic_write(object_filename, s, async_op) # update last modified time if os.path.exists(objects_dir): os.utime(objects_dir, None) with locket.lock_file(objects_dir + '.lock.index'): try: self.update_indexes(previous_object_value, relative_object_filename) except: # something failed, we can't keep using possibly broken indexes, so # we notify of the bug and remove the indexes get_publisher().notify_of_exception(sys.exc_info(), context='[STORAGE]') self.destroy_indexes() @classmethod def destroy_indexes(cls): objects_dir = cls.get_objects_dir() directories_to_trash = [] directories_to_wipe = [] for index in cls._indexes or []: index_dir = objects_dir + '-' + index directories_to_trash.append(index_dir) directories_to_trash.append(os.path.join(objects_dir, '.indexes')) for directory in directories_to_trash: if not os.path.exists(directory): continue i = 0 while True: trashed_index_name = directory + '.trash-%s' % i i += 1 try: os.mkdir(trashed_index_name) except OSError: continue try: os.rename(directory, os.path.join(trashed_index_name, 'idx')) except OSError: continue directories_to_wipe.append(trashed_index_name) break for directory in directories_to_wipe: shutil.rmtree(directory) def update_indexes(self, previous_object_value, relative_object_filename): objects_dir = self.get_objects_dir() rebuilt_indexes = False for index in self._indexes or []: if not hasattr(self, index) or getattr(self, index) is None: continue index_dir = objects_dir + '-' + index link_name = os.path.join(index_dir, fix_key(str(getattr(self, index)))) if previous_object_value: old_link_name = os.path.join(index_dir, fix_key(str(getattr(previous_object_value, index)))) if os.path.exists(old_link_name): if old_link_name == link_name: continue os.unlink(old_link_name) try: if relative_object_filename: os.symlink(relative_object_filename, link_name) else: os.symlink(self.get_object_filename(), link_name) except OSError as exc: if exc.errno == 2: os.mkdir(index_dir) if not rebuilt_indexes: # perhaps index dir got removed; rebuild it before # adding elements to it. self.rebuild_indexes() rebuilt_indexes = True elif exc.errno == 17: os.unlink(link_name) else: raise if not rebuilt_indexes: if relative_object_filename: os.symlink(relative_object_filename, link_name) else: os.symlink(self.get_object_filename(), link_name) for index in self._hashed_indexes or []: index_dir = os.path.join(objects_dir, '.indexes', index) if not os.path.exists(index_dir): try: os.makedirs(index_dir) except OSError as e: if e.errno == errno.EEXIST: # File exists pass old_value = [] if type(getattr(self, index)) is dict: new_value = getattr(self, index).values() if previous_object_value: old_value = getattr(previous_object_value, index) if old_value is None: old_value = [] else: old_value = old_value.values() elif type(getattr(self, index)) in (tuple, list, set): new_value = getattr(self, index) if previous_object_value: old_value = getattr(previous_object_value, index) if old_value is None: old_value = [] else: new_value = [getattr(self, index)] if previous_object_value: old_raw_value = getattr(previous_object_value, index) if type(old_raw_value) is dict: old_value = old_raw_value.values() elif type(old_raw_value) in (tuple, list, set): old_value = old_raw_value else: old_value = [old_raw_value] for oldv in old_value: if oldv in new_value: continue old_index_name = '%s-%s' % (index, fix_key(oldv)) old_index_file = os.path.join(index_dir, old_index_name) if os.path.exists(old_index_file): ids = [str(x) for x in pickle.load(open(old_index_file, 'rb'))] if str(self.id) in ids: ids.remove(str(self.id)) pickle.dump(ids, open(old_index_file, 'wb'), protocol=2) for newv in new_value: if newv in old_value: continue index_name = '%s-%s' % (index, fix_key(newv)) index_file = os.path.join(index_dir, index_name) if os.path.exists(index_file): ids = [str(x) for x in pickle.load(open(index_file, 'rb'))] else: ids = [] if not str(self.id) in ids: ids.append(str(self.id)) pickle.dump(ids, open(index_file, 'wb'), protocol=2) @classmethod def volatile(cls): o = cls() o.id = None return o @classmethod def remove_object(cls, id): objects_dir = cls.get_objects_dir() if cls._indexes or cls._hashed_indexes: object = cls.get(id) for index in cls._indexes or []: if not hasattr(object, index) or getattr(object, index) is None: continue index_dir = objects_dir + '-' + index link_name = os.path.join(index_dir, fix_key(str(getattr(object, index)))) try: os.unlink(link_name) except OSError: pass index_dir = os.path.join(objects_dir, '.indexes') for index in cls._hashed_indexes or []: attribute = getattr(object, index) if type(attribute) not in (tuple, list, set): attribute = [attribute] for attr in attribute: attr_value = fix_key(attr) index_name = '%s-%s' % (index, attr_value) index_file = os.path.join(index_dir, index, index_name) if os.path.exists(index_file): ids = [str(x) for x in pickle.load(open(index_file, 'rb'))] if str(object.id) in ids: ids.remove(str(object.id)) pickle.dump(ids, open(index_file, 'wb'), protocol=2) os.unlink(os.path.join(objects_dir, fix_key(id))) def remove_self(self): self.remove_object(self.id) @classmethod def wipe(cls): tmpdir = tempfile.mkdtemp(prefix='wiping', dir=os.path.join(get_publisher().app_dir)) dirs_to_move = [] objects_dir = cls.get_objects_dir() dirs_to_move.append(objects_dir) for index in cls._indexes or []: index_dir = objects_dir + '-' + index dirs_to_move.append(index_dir) for directory in dirs_to_move: if os.path.exists(directory): os.rename(directory, os.path.join(tmpdir, os.path.basename(directory))) shutil.rmtree(tmpdir) def __repr__(self): if hasattr(self, 'display_name'): display_name = '%r ' % self.display_name elif hasattr(self, 'get_display_name'): display_name = '%r ' % self.get_display_name() elif hasattr(self, 'name'): display_name = '%r ' % self.name else: display_name = '' return '<%s %sid:%s>' % (self.__class__.__name__, display_name, self.id)