more frequent and better-synchronized abort checks

Previously, if an abort occurred during a queue read, then the queue would
return None. With this extra check, the None is never exposed to the user code
(coroutine).
This commit is contained in:
Adrian Sampson 2011-04-12 14:04:30 -07:00
parent fcee8b2ab6
commit 0460d73e5e

View file

@ -82,11 +82,11 @@ class PipelineThread(Thread):
with self.abort_lock:
self.abort_flag = True
# Ensure that we are not blocking on a queue read or write.
if hasattr(self, 'in_queue'):
invalidate_queue(self.in_queue)
if hasattr(self, 'out_queue'):
invalidate_queue(self.out_queue)
# Ensure that we are not blocking on a queue read or write.
if hasattr(self, 'in_queue'):
invalidate_queue(self.in_queue)
if hasattr(self, 'out_queue'):
invalidate_queue(self.out_queue)
def abort_all(self, exc_info):
"""Abort all other threads in the system for an exception.
@ -158,6 +158,10 @@ class MiddlePipelineThread(PipelineThread):
if msg is POISON:
break
with self.abort_lock:
if self.abort_flag:
return
# Invoke the current stage.
out = self.coro.send(msg)
@ -198,6 +202,10 @@ class LastPipelineThread(PipelineThread):
if msg is POISON:
break
with self.abort_lock:
if self.abort_flag:
return
# Send to consumer.
self.coro.send(msg)