diff --git a/beetsplug/replaygain.py b/beetsplug/replaygain.py index ab3db4575..5a2c4f7b9 100644 --- a/beetsplug/replaygain.py +++ b/beetsplug/replaygain.py @@ -21,7 +21,7 @@ import signal import subprocess import sys import warnings -from multiprocessing.pool import ThreadPool, RUN +from multiprocessing.pool import ThreadPool import queue from threading import Thread, Event @@ -1177,6 +1177,9 @@ class ReplayGainPlugin(BeetsPlugin): raise ui.UserError( f'replaygain initialization failed: {e}') + # Start threadpool lazily. + self.pool = None + def should_use_r128(self, item): """Checks the plugin setting to decide whether the calculation should be done using the EBU R128 standard and use R128_ tags instead. @@ -1314,18 +1317,10 @@ class ReplayGainPlugin(BeetsPlugin): except FatalReplayGainError as e: raise ui.UserError(f"Fatal replay gain error: {e}") - def _has_pool(self): - """Check whether a `ThreadPool` is running instance in `self.pool` - """ - if hasattr(self, 'pool'): - if isinstance(self.pool, ThreadPool) and self.pool._state == RUN: - return True - return False - def open_pool(self, threads): """Open a `ThreadPool` instance in `self.pool` """ - if not self._has_pool() and self.backend_instance.do_parallel: + if self.pool is None and self.backend_instance.do_parallel: self.pool = ThreadPool(threads) self.exc_queue = queue.Queue() @@ -1338,7 +1333,7 @@ class ReplayGainPlugin(BeetsPlugin): self.exc_watcher.start() def _apply(self, func, args, kwds, callback): - if self._has_pool(): + if self.pool is not None: def handle_exc(exc): """Handle exceptions in the async work. """ @@ -1353,15 +1348,17 @@ class ReplayGainPlugin(BeetsPlugin): callback(func(*args, **kwds)) def terminate_pool(self): - """Terminate the `ThreadPool` instance in `self.pool` - (e.g. stop execution in case of exception) + """Forcibly terminate the `ThreadPool` instance in `self.pool` + + Sends SIGTERM to all processes. """ - # Don't call self._as_pool() here, - # self.pool._state may not be == RUN - if hasattr(self, 'pool') and isinstance(self.pool, ThreadPool): + if self.pool is not None: self.pool.terminate() self.pool.join() + # Terminating the processes leaves the ExceptionWatcher's queues + # in an unknown state, so don't wait for it. # self.exc_watcher.join() + self.pool = None def _interrupt(self, signal, frame): try: @@ -1373,12 +1370,13 @@ class ReplayGainPlugin(BeetsPlugin): pass def close_pool(self): - """Close the `ThreadPool` instance in `self.pool` (if there is one) + """Regularly lose the `ThreadPool` instance in `self.pool`. """ - if self._has_pool(): + if self.pool is not None: self.pool.close() self.pool.join() self.exc_watcher.join() + self.pool = None def import_begin(self, session): """Handle `import_begin` event -> open pool