make pipeline exceptions look more natural

This commit is contained in:
Adrian Sampson 2010-08-02 19:16:32 -07:00
parent df766abcb4
commit 3f34c5c9eb

View file

@ -27,6 +27,7 @@ exception.
from __future__ import with_statement # for Python 2.5
import Queue
from threading import Thread, Lock
import sys
BUBBLE = '__PIPELINE_BUBBLE__'
POISON = '__PIPELINE_POISON__'
@ -38,8 +39,8 @@ class PipelineError(object):
object is passed through the pipeline to shut down all threads
before it is raised again in the main thread.
"""
def __init__(self, exc):
self.exc = exc
def __init__(self, exc_info):
self.exc_info = exc_info
class FirstPipelineThread(Thread):
"""The thread running the first stage in a parallel pipeline setup.
@ -74,8 +75,8 @@ class FirstPipelineThread(Thread):
msg = self.coro.next()
except StopIteration:
break
except Exception, exc:
self.out_queue.put(PipelineError(exc))
except:
self.out_queue.put(PipelineError(sys.exc_info()))
return
# Send it to the next stage.
@ -133,8 +134,8 @@ class MiddlePipelineThread(Thread):
# Invoke the current stage.
try:
out = self.coro.send(msg)
except Exception, exc:
self.out_queue.put(PipelineError(exc))
except:
self.out_queue.put(PipelineError(sys.exc_info()))
return
# Send message to next stage.
@ -164,18 +165,18 @@ class LastPipelineThread(Thread):
if msg is POISON:
break
elif isinstance(msg, PipelineError):
self.exc = msg.exc
self.exc_info = msg.exc_info
return
# Send to consumer.
try:
self.coro.send(msg)
except Exception, exc:
self.exc = exc
except:
self.exc_info = sys.exc_info()
return
# No exception raised in pipeline.
self.exc = None
self.exc_info = None
class Pipeline(object):
"""Represents a staged pattern of work. Each stage in the pipeline
@ -241,9 +242,10 @@ class Pipeline(object):
for thread in threads[:-1]:
thread.join()
exc = threads[-1].exc
if exc:
raise exc
exc_info = threads[-1].exc_info
if exc_info:
# Make the exception appear as it was raised originally.
raise exc_info[0], exc_info[1], exc_info[2]
# Smoke test.
if __name__ == '__main__':
@ -295,4 +297,3 @@ if __name__ == '__main__':
num = yield
print 'received', num
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)