handle KeyboardInterrupts in pipeline.py

This commit is contained in:
Adrian Sampson 2011-04-12 00:04:11 -07:00
parent 7600a8c0ad
commit 494c0f4d0e

View file

@ -22,7 +22,13 @@ BUBBLE constant from any stage coroutine except the last.
In the parallel case, the implementation transparently handles thread
shutdown when the processing is complete and when a stage raises an
exception.
exception. KeyboardInterrupts (^C) are also handled.
When running a parallel pipeline, it is also possible to use
multiple coroutines for the same pipeline stage; this lets you speed
up a bottleneck stage by dividing its work among multiple threads.
To do so, pass an iterable of coroutines to the Pipeline constructor
in place of any single coroutine.
"""
from __future__ import with_statement # for Python 2.5
import Queue
@ -271,13 +277,24 @@ class Pipeline(object):
thread.start()
# Wait for termination. The final thread lasts the longest.
threads[-1].join()
try:
# Using a timeout allows us to receive KeyboardInterrupt
# exceptions during the join().
while threads[-1].isAlive():
threads[-1].join(1)
except:
# Stop all the threads immediately.
for thread in threads:
thread.abort()
raise
# Make completely sure that all the threads have finished
# before we return. They should already be either finished,
# in normal operation, or aborted, in case of an exception.
for thread in threads[:-1]:
thread.join()
finally:
# Make completely sure that all the threads have finished
# before we return. They should already be either finished,
# in normal operation, or aborted, in case of an exception.
for thread in threads[:-1]:
thread.join()
for thread in threads:
exc_info = thread.exc_info