storage: add lock around index update (#7818)

This commit is contained in:
Frédéric Péters 2015-07-11 11:44:03 +02:00
parent 60f6663b24
commit fa801967ef
4 changed files with 236 additions and 13 deletions

4
README
View File

@ -113,6 +113,10 @@ Timeout Socket:
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
Locket (wcs/qommon/vendor/locket.py):
# Copyright (c) 2012, Michael Williamson
# Licensed under the BSD 2-clause license (http://opensource.org/licenses/BSD-2-Clause)
jQuery JavaScript Library:
# Copyright (c) 2009 John Resig
# Dual licensed under the MIT and GPL licenses.

View File

@ -331,7 +331,6 @@ def test_store_async():
test.value = 'value'
test.unique_value = 'unique-value'
test.store(async=True)
assert test.id == '1'
t0 = time.time()
while not Foobar.keys():
time.sleep(0.1)
@ -373,3 +372,28 @@ def test_destroy_rebuild_index():
# the indexes should have been rebuilt automatically
assert os.path.exists(os.path.join(Foobar.get_objects_dir(), '.indexes'))
def test_concurrent_hashed_indexes():
Foobar.wipe()
children_pids = []
for i in range(10):
pid = os.fork()
if pid:
children_pids.append(pid)
else:
for j in range(10):
test = Foobar()
test.value = 'x'
test.dict_value = {'plop': random.randint(0, 10)}
test.store(async=False)
os._exit(0)
for pid in children_pids:
os.waitpid(pid, 0)
assert Foobar.count() == 100
for i in range(10):
manual_selection = set([x.id for x in Foobar.select() if x.dict_value == {'plop': i}])
index_selection = set([x.id for x in Foobar.get_with_indexed_value('dict_value', i)])
assert manual_selection == index_selection

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import errno
import operator
import os
import time
@ -27,6 +28,7 @@ try:
except ImportError:
import dummy_thread as _thread
from vendor import locket
from quixote import get_publisher
@ -181,8 +183,6 @@ class StorableObject(object):
def __init__(self, id = None):
self.id = id
if get_publisher() and not self.id:
self.id = self.get_new_id()
def get_table_name(cls):
return cls._names
@ -330,12 +330,25 @@ class StorableObject(object):
def get_filename(cls, filename, ignore_errors=False, ignore_migration=False):
try:
o = cls.storage_load(file(filename))
fd = open(filename)
o = cls.storage_load(fd)
except IOError:
if ignore_errors:
return None
raise KeyError()
except (EOFError, ImportError), e:
except ImportError as e:
if ignore_errors:
return None
raise KeyError()
except EOFError as e:
# maybe it's being written to, loop for a while to see
current_position = fd.tell()
fd.close()
for i in range(10):
time.sleep(0.01)
if current_position != os.stat(filename).st_size:
return cls.get_filename(filename, ignore_errors=ignore_errors,
ignore_migration=ignore_migration)
if ignore_errors:
return None
raise KeyError()
@ -476,13 +489,14 @@ class StorableObject(object):
if os.path.exists(objects_dir):
os.utime(objects_dir, None)
try:
self.update_indexes(previous_object_value, relative_object_filename)
except:
# something failed, we can't keep using possibly broken indexes, so
# we notify of the bug and remove the indexes
get_publisher().notify_of_exception(sys.exc_info(), context='[STORAGE]')
self.destroy_indexes()
with locket.lock_file(objects_dir + '.lock.index'):
try:
self.update_indexes(previous_object_value, relative_object_filename)
except:
# something failed, we can't keep using possibly broken indexes, so
# we notify of the bug and remove the indexes
get_publisher().notify_of_exception(sys.exc_info(), context='[STORAGE]')
self.destroy_indexes()
def destroy_indexes(cls):
objects_dir = cls.get_objects_dir()
@ -562,7 +576,11 @@ class StorableObject(object):
for index in self._hashed_indexes or []:
index_dir = os.path.join(objects_dir, '.indexes', index)
if not os.path.exists(index_dir):
os.makedirs(index_dir)
try:
os.makedirs(index_dir)
except OSError as e:
if e.errno == errno.EEXIST: # File exists
pass
old_value = []
if type(getattr(self, index)) is dict:

177
wcs/qommon/vendor/locket.py vendored Normal file
View File

@ -0,0 +1,177 @@
# derived from https://pypi.python.org/pypi/locket/
#
# Copyright (c) 2012, Michael Williamson
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of the FreeBSD Project.
import time
import errno
import threading
import weakref
__all__ = ["lock_file"]
import fcntl
def _lock_file_blocking(file_):
fcntl.flock(file_.fileno(), fcntl.LOCK_EX)
def _lock_file_non_blocking(file_):
try:
fcntl.flock(file_.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except IOError as error:
if error.errno in [errno.EACCES, errno.EAGAIN]:
return False
else:
raise
def _unlock_file(file_):
fcntl.flock(file_.fileno(), fcntl.LOCK_UN)
_locks_lock = threading.Lock()
_locks = weakref.WeakValueDictionary()
def lock_file(path, **kwargs):
_locks_lock.acquire()
try:
lock = _locks.get(path)
if lock is None:
lock = _create_lock_file(path, **kwargs)
_locks[path] = lock
return lock
finally:
_locks_lock.release()
def _create_lock_file(path, **kwargs):
thread_lock = _ThreadLock(path, **kwargs)
file_lock = _LockFile(path, **kwargs)
return _LockSet([thread_lock, file_lock])
class LockError(Exception):
pass
def _acquire_non_blocking(acquire, timeout, retry_period, path):
if retry_period is None:
retry_period = 0.05
start_time = time.time()
while True:
success = acquire()
if success:
return
elif (timeout is not None and
time.time() - start_time > timeout):
raise LockError("Couldn't lock {0}".format(path))
else:
time.sleep(retry_period)
class _LockSet(object):
def __init__(self, locks):
self._locks = locks
def acquire(self):
acquired_locks = []
try:
for lock in self._locks:
lock.acquire()
acquired_locks.append(lock)
except:
for acquired_lock in reversed(acquired_locks):
# TODO: handle exceptions
acquired_lock.release()
raise
def release(self):
for lock in reversed(self._locks):
# TODO: Handle exceptions
lock.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
class _ThreadLock(object):
def __init__(self, path, timeout=None, retry_period=None):
self._path = path
self._timeout = timeout
self._retry_period = retry_period
self._lock = threading.Lock()
def acquire(self):
if self._timeout is None:
self._lock.acquire()
else:
_acquire_non_blocking(
acquire=lambda: self._lock.acquire(False),
timeout=self._timeout,
retry_period=self._retry_period,
path=self._path,
)
def release(self):
self._lock.release()
class _LockFile(object):
def __init__(self, path, timeout=None, retry_period=None):
self._path = path
self._timeout = timeout
self._retry_period = retry_period
self._file = None
self._thread_lock = threading.Lock()
def acquire(self):
if self._file is None:
self._file = open(self._path, "w")
if self._timeout is None:
_lock_file_blocking(self._file)
else:
_acquire_non_blocking(
acquire=lambda: _lock_file_non_blocking(self._file),
timeout=self._timeout,
retry_period=self._retry_period,
path=self._path,
)
def release(self):
_unlock_file(self._file)
self._file.close()
self._file = None