don't bother poisoning the queue on exception

Previously, we tried to shut down everything very nicely by sending along a
channel poison message when an exception occurred. That, of course, was
disastrous because some of the pipeline was no longer running and the poison
was unlikely to get all the way through. Now we just abort every thread and
clear every queue (to force the abort even when blocking on enqueues). This
problem manifested as a deadlock when an exception occurred in the final
stage.
This commit is contained in:
Adrian Sampson 2010-08-02 19:40:35 -07:00
parent 3f34c5c9eb
commit 6c4ce92077

View file

@ -34,6 +34,20 @@ POISON = '__PIPELINE_POISON__'
DEFAULT_QUEUE_SIZE = 16
def clear_queue(q):
"""Safely empty a queue."""
# This very hacky approach to clearing the queue
# compliments of Tim Peters:
# http://www.mail-archive.com/python-list@python.org/msg95322.html
q.mutex.acquire()
try:
q.queue.clear()
q.unfinished_tasks = 0
q.not_full.notify()
q.all_tasks_done.notifyAll()
finally:
q.mutex.release()
class PipelineError(object):
"""An indication that an exception occurred in the pipeline. The
object is passed through the pipeline to shut down all threads
@ -42,7 +56,21 @@ class PipelineError(object):
def __init__(self, exc_info):
self.exc_info = exc_info
class FirstPipelineThread(Thread):
class PipelineThread(Thread):
"""Abstract base class for pipeline-stage threads."""
def __init__(self):
super(PipelineThread, self).__init__()
self.abort_lock = Lock()
self.abort_flag = False
def abort(self):
"""Shut down the thread at the next chance possible.
"""
with self.abort_lock:
self.abort_flag = True
# Empty the channel before poisoning it.
class FirstPipelineThread(PipelineThread):
"""The thread running the first stage in a parallel pipeline setup.
The coroutine should just be a generator.
"""
@ -59,16 +87,7 @@ class FirstPipelineThread(Thread):
# Time to abort?
with self.abort_lock:
if self.abort_flag:
# We may have accidentally added one more object
# to the queue *after* it was cleared by the
# abort() method. Remove it if present.
try:
self.out_queue.get_nowait()
except Queue.Empty:
pass
# Stop generating and poison.
break
return
# Get the value from the generator.
try:
@ -87,28 +106,7 @@ class FirstPipelineThread(Thread):
# Generator finished; shut down the pipeline.
self.out_queue.put(POISON)
def abort(self):
"""Shut down the pipeline by canceling this thread and
poisoning out_channel.
"""
with self.abort_lock:
# Empty the channel before poisoning it.
# This very hacky approach to clearing the queue is
# compliments of Tim Peters:
# http://www.mail-archive.com/python-list@python.org/msg95322.html
self.out_queue.mutex.acquire()
try:
self.out_queue.queue.clear()
self.out_queue.unfinished_tasks = 0
self.out_queue.not_full.notify()
self.out_queue.all_tasks_done.notifyAll()
finally:
self.out_queue.mutex.release()
# Notify the generator thread.
self.abort_flag = True
class MiddlePipelineThread(Thread):
class MiddlePipelineThread(PipelineThread):
"""A thread running any stage in the pipeline except the first or
last.
"""
@ -123,6 +121,10 @@ class MiddlePipelineThread(Thread):
self.coro.next()
while True:
with self.abort_lock:
if self.abort_flag:
return
# Get the message from the previous stage.
msg = self.in_queue.get()
if msg is POISON:
@ -146,7 +148,7 @@ class MiddlePipelineThread(Thread):
# Pipeline is shutting down normally.
self.out_queue.put(POISON)
class LastPipelineThread(Thread):
class LastPipelineThread(PipelineThread):
"""A thread running the last stage in a pipeline. The coroutine
should yield nothing.
"""
@ -160,6 +162,10 @@ class LastPipelineThread(Thread):
self.coro.next()
while True:
with self.abort_lock:
if self.abort_flag:
return
# Get the message from the previous stage.
msg = self.in_queue.get()
if msg is POISON:
@ -228,14 +234,12 @@ class Pipeline(object):
try:
# The final thread lasts the longest.
threads[-1].join()
except:
# Shut down the pipeline by telling the first thread to
# poison its channel.
threads[0].abort()
raise
# Halt the pipeline in case there was an exception.
threads[0].abort()
finally:
# Halt the pipeline in case there was an exception.
for thread in threads:
thread.abort()
for queue in queues:
clear_queue(queue)
# Make completely sure that all the threads have finished
# before we return.
@ -270,9 +274,9 @@ if __name__ == '__main__':
time.sleep(1)
print 'received', num
ts_start = time.time()
# Pipeline([produce(), work(), consume()]).run_sequential()
Pipeline([produce(), work(), consume()]).run_sequential()
ts_middle = time.time()
# Pipeline([produce(), work(), consume()]).run_parallel()
Pipeline([produce(), work(), consume()]).run_parallel()
ts_end = time.time()
print 'Sequential time:', ts_middle - ts_start
print 'Parallel time:', ts_end - ts_middle
@ -290,10 +294,12 @@ if __name__ == '__main__':
print 'processing', num
time.sleep(3)
if num == 3:
raise Exception()
raise Exception()
num = yield num * 2
def exc_consume():
while True:
num = yield
#if num == 4:
# raise Exception()
print 'received', num
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)