diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index 62dc415c5..61e045db0 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -82,11 +82,11 @@ class PipelineThread(Thread): with self.abort_lock: self.abort_flag = True - # Ensure that we are not blocking on a queue read or write. - if hasattr(self, 'in_queue'): - invalidate_queue(self.in_queue) - if hasattr(self, 'out_queue'): - invalidate_queue(self.out_queue) + # Ensure that we are not blocking on a queue read or write. + if hasattr(self, 'in_queue'): + invalidate_queue(self.in_queue) + if hasattr(self, 'out_queue'): + invalidate_queue(self.out_queue) def abort_all(self, exc_info): """Abort all other threads in the system for an exception. @@ -158,6 +158,10 @@ class MiddlePipelineThread(PipelineThread): if msg is POISON: break + with self.abort_lock: + if self.abort_flag: + return + # Invoke the current stage. out = self.coro.send(msg) @@ -198,6 +202,10 @@ class LastPipelineThread(PipelineThread): if msg is POISON: break + with self.abort_lock: + if self.abort_flag: + return + # Send to consumer. self.coro.send(msg)