From 6c4ce9207764369fa02a4e70da2d577af90fb3d0 Mon Sep 17 00:00:00 2001 From: Adrian Sampson Date: Mon, 2 Aug 2010 19:40:35 -0700 Subject: [PATCH] don't bother poisoning the queue on exception Previously, we tried to shut down everything very nicely by sending along a channel poison message when an exception occurred. That, of course, was disastrous because some of the pipeline was no longer running and the poison was unlikely to get all the way through. Now we just abort every thread and clear every queue (to force the abort even when blocking on enqueues). This problem manifested as a deadlock when an exception occurred in the final stage. --- beets/ui/pipeline.py | 96 +++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/beets/ui/pipeline.py b/beets/ui/pipeline.py index 572cff533..512276afe 100644 --- a/beets/ui/pipeline.py +++ b/beets/ui/pipeline.py @@ -34,6 +34,20 @@ POISON = '__PIPELINE_POISON__' DEFAULT_QUEUE_SIZE = 16 +def clear_queue(q): + """Safely empty a queue.""" + # This very hacky approach to clearing the queue + # compliments of Tim Peters: + # http://www.mail-archive.com/python-list@python.org/msg95322.html + q.mutex.acquire() + try: + q.queue.clear() + q.unfinished_tasks = 0 + q.not_full.notify() + q.all_tasks_done.notifyAll() + finally: + q.mutex.release() + class PipelineError(object): """An indication that an exception occurred in the pipeline. The object is passed through the pipeline to shut down all threads @@ -42,7 +56,21 @@ class PipelineError(object): def __init__(self, exc_info): self.exc_info = exc_info -class FirstPipelineThread(Thread): +class PipelineThread(Thread): + """Abstract base class for pipeline-stage threads.""" + def __init__(self): + super(PipelineThread, self).__init__() + self.abort_lock = Lock() + self.abort_flag = False + + def abort(self): + """Shut down the thread at the next chance possible. + """ + with self.abort_lock: + self.abort_flag = True + # Empty the channel before poisoning it. + +class FirstPipelineThread(PipelineThread): """The thread running the first stage in a parallel pipeline setup. The coroutine should just be a generator. """ @@ -59,16 +87,7 @@ class FirstPipelineThread(Thread): # Time to abort? with self.abort_lock: if self.abort_flag: - # We may have accidentally added one more object - # to the queue *after* it was cleared by the - # abort() method. Remove it if present. - try: - self.out_queue.get_nowait() - except Queue.Empty: - pass - - # Stop generating and poison. - break + return # Get the value from the generator. try: @@ -87,28 +106,7 @@ class FirstPipelineThread(Thread): # Generator finished; shut down the pipeline. self.out_queue.put(POISON) - def abort(self): - """Shut down the pipeline by canceling this thread and - poisoning out_channel. - """ - with self.abort_lock: - # Empty the channel before poisoning it. - # This very hacky approach to clearing the queue is - # compliments of Tim Peters: - # http://www.mail-archive.com/python-list@python.org/msg95322.html - self.out_queue.mutex.acquire() - try: - self.out_queue.queue.clear() - self.out_queue.unfinished_tasks = 0 - self.out_queue.not_full.notify() - self.out_queue.all_tasks_done.notifyAll() - finally: - self.out_queue.mutex.release() - - # Notify the generator thread. - self.abort_flag = True - -class MiddlePipelineThread(Thread): +class MiddlePipelineThread(PipelineThread): """A thread running any stage in the pipeline except the first or last. """ @@ -123,6 +121,10 @@ class MiddlePipelineThread(Thread): self.coro.next() while True: + with self.abort_lock: + if self.abort_flag: + return + # Get the message from the previous stage. msg = self.in_queue.get() if msg is POISON: @@ -146,7 +148,7 @@ class MiddlePipelineThread(Thread): # Pipeline is shutting down normally. self.out_queue.put(POISON) -class LastPipelineThread(Thread): +class LastPipelineThread(PipelineThread): """A thread running the last stage in a pipeline. The coroutine should yield nothing. """ @@ -160,6 +162,10 @@ class LastPipelineThread(Thread): self.coro.next() while True: + with self.abort_lock: + if self.abort_flag: + return + # Get the message from the previous stage. msg = self.in_queue.get() if msg is POISON: @@ -228,14 +234,12 @@ class Pipeline(object): try: # The final thread lasts the longest. threads[-1].join() - except: - # Shut down the pipeline by telling the first thread to - # poison its channel. - threads[0].abort() - raise - - # Halt the pipeline in case there was an exception. - threads[0].abort() + finally: + # Halt the pipeline in case there was an exception. + for thread in threads: + thread.abort() + for queue in queues: + clear_queue(queue) # Make completely sure that all the threads have finished # before we return. @@ -270,9 +274,9 @@ if __name__ == '__main__': time.sleep(1) print 'received', num ts_start = time.time() - # Pipeline([produce(), work(), consume()]).run_sequential() + Pipeline([produce(), work(), consume()]).run_sequential() ts_middle = time.time() - # Pipeline([produce(), work(), consume()]).run_parallel() + Pipeline([produce(), work(), consume()]).run_parallel() ts_end = time.time() print 'Sequential time:', ts_middle - ts_start print 'Parallel time:', ts_end - ts_middle @@ -290,10 +294,12 @@ if __name__ == '__main__': print 'processing', num time.sleep(3) if num == 3: - raise Exception() + raise Exception() num = yield num * 2 def exc_consume(): while True: num = yield + #if num == 4: + # raise Exception() print 'received', num Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)