Spawn new thread for different pid
This spawns a thread again first time on queue if the process was forked by the time the enqueueing happened.
This commit is contained in:
parent
2fc64e8056
commit
6f0ad8232a
|
@ -30,18 +30,26 @@ class AsyncWorker(object):
|
|||
self._queue = Queue(-1)
|
||||
self._lock = threading.Lock()
|
||||
self._thread = None
|
||||
self._thread_for_pid = None
|
||||
self.options = {
|
||||
'shutdown_timeout': shutdown_timeout,
|
||||
}
|
||||
self.start()
|
||||
|
||||
def is_alive(self):
|
||||
return self._thread.is_alive()
|
||||
if self._thread_for_pid != os.getpid():
|
||||
return False
|
||||
return self._thread and self._thread.is_alive()
|
||||
|
||||
def _ensure_thread(self):
|
||||
if self.is_alive():
|
||||
return
|
||||
self.start()
|
||||
|
||||
def main_thread_terminated(self):
|
||||
self._lock.acquire()
|
||||
try:
|
||||
if not self._thread:
|
||||
if not self.is_alive():
|
||||
# thread not started or already stopped - nothing to do
|
||||
return
|
||||
|
||||
|
@ -107,10 +115,11 @@ class AsyncWorker(object):
|
|||
"""
|
||||
self._lock.acquire()
|
||||
try:
|
||||
if not self._thread:
|
||||
if not self.is_alive():
|
||||
self._thread = threading.Thread(target=self._target)
|
||||
self._thread.setDaemon(True)
|
||||
self._thread.start()
|
||||
self._thread_for_pid = os.getpid()
|
||||
finally:
|
||||
self._lock.release()
|
||||
atexit.register(self.main_thread_terminated)
|
||||
|
@ -125,10 +134,12 @@ class AsyncWorker(object):
|
|||
self._queue.put_nowait(self._terminator)
|
||||
self._thread.join(timeout=timeout)
|
||||
self._thread = None
|
||||
self._thread_for_pid = None
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
def queue(self, callback, *args, **kwargs):
|
||||
self._ensure_thread()
|
||||
self._queue.put_nowait((callback, args, kwargs))
|
||||
|
||||
def _target(self):
|
||||
|
|
|
@ -64,6 +64,29 @@ class ThreadedTransportTest(TestCase):
|
|||
|
||||
self.assertEqual(len(transport.events), 1)
|
||||
|
||||
def test_fork_spawns_anew(self):
|
||||
url = urlparse(self.url)
|
||||
transport = DummyThreadedScheme(url)
|
||||
transport.send_delay = 0.5
|
||||
|
||||
data = self.client.build_msg('raven.events.Message', message='foo')
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
transport.async_send(data, None, None, None)
|
||||
|
||||
# this should wait for the message to get sent
|
||||
transport.get_worker().main_thread_terminated()
|
||||
|
||||
self.assertEqual(len(transport.events), 1)
|
||||
# Use os._exit here so that py.test gets not confused about
|
||||
# what the hell we're doing here.
|
||||
os._exit(0)
|
||||
else:
|
||||
os.waitpid(pid, 0)
|
||||
|
||||
def test_fork_with_active_worker(self):
|
||||
# Test threaded transport when forking with an active worker.
|
||||
# Forking a process doesn't clone the worker thread - make sure
|
||||
|
|
Loading…
Reference in New Issue