Merge pull request #3478 from ybnd/parallel-replaygain

Implement parallel replaygain analysis
This commit is contained in:
Adrian Sampson 2020-12-14 17:41:11 -05:00 committed by GitHub
commit 8645f56512
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 274 additions and 45 deletions

View file

@ -24,12 +24,17 @@ import warnings
import enum
import re
import xml.parsers.expat
from six.moves import zip
from six.moves import zip, queue
import six
from multiprocessing.pool import ThreadPool, RUN
from threading import Thread, Event
import signal
from beets import ui
from beets.plugins import BeetsPlugin
from beets.util import (syspath, command_output, bytestring_path,
displayable_path, py3_path)
displayable_path, py3_path, cpu_count)
# Utilities.
@ -110,6 +115,8 @@ class Backend(object):
"""An abstract class representing engine for calculating RG values.
"""
do_parallel = False
def __init__(self, config, log):
"""Initialize the backend with the configuration view for the
plugin.
@ -141,6 +148,8 @@ class Bs1770gainBackend(Backend):
-18: "replaygain",
}
do_parallel = True
def __init__(self, config, log):
super(Bs1770gainBackend, self).__init__(config, log)
config.add({
@ -352,8 +361,7 @@ class Bs1770gainBackend(Backend):
except xml.parsers.expat.ExpatError:
raise ReplayGainError(
u'The bs1770gain tool produced malformed XML. '
'Using version >=0.4.10 may solve this problem.'
)
u'Using version >=0.4.10 may solve this problem.')
if len(per_file_gain) != len(path_list):
raise ReplayGainError(
@ -378,6 +386,9 @@ class Bs1770gainBackend(Backend):
class FfmpegBackend(Backend):
"""A replaygain backend using ffmpeg's ebur128 filter.
"""
do_parallel = True
def __init__(self, config, log):
super(FfmpegBackend, self).__init__(config, log)
self._ffmpeg_path = "ffmpeg"
@ -620,6 +631,7 @@ class FfmpegBackend(Backend):
# mpgain/aacgain CLI tool backend.
class CommandBackend(Backend):
do_parallel = True
def __init__(self, config, log):
super(CommandBackend, self).__init__(config, log)
@ -748,7 +760,6 @@ class CommandBackend(Backend):
# GStreamer-based backend.
class GStreamerBackend(Backend):
def __init__(self, config, log):
super(GStreamerBackend, self).__init__(config, log)
self._import_gst()
@ -1168,6 +1179,33 @@ class AudioToolsBackend(Backend):
)
class ExceptionWatcher(Thread):
"""Monitors a queue for exceptions asynchronously.
Once an exception occurs, raise it and execute a callback.
"""
def __init__(self, queue, callback):
self._queue = queue
self._callback = callback
self._stopevent = Event()
Thread.__init__(self)
def run(self):
while not self._stopevent.is_set():
try:
exc = self._queue.get_nowait()
self._callback()
six.reraise(exc[0], exc[1], exc[2])
except queue.Empty:
# No exceptions yet, loop back to check
# whether `_stopevent` is set
pass
def join(self, timeout=None):
self._stopevent.set()
Thread.join(self, timeout)
# Main plugin logic.
class ReplayGainPlugin(BeetsPlugin):
@ -1195,6 +1233,7 @@ class ReplayGainPlugin(BeetsPlugin):
'overwrite': False,
'auto': True,
'backend': u'command',
'threads': cpu_count(),
'per_disc': False,
'peak': 'true',
'targetlevel': 89,
@ -1204,12 +1243,15 @@ class ReplayGainPlugin(BeetsPlugin):
self.overwrite = self.config['overwrite'].get(bool)
self.per_disc = self.config['per_disc'].get(bool)
backend_name = self.config['backend'].as_str()
if backend_name not in self.backends:
# Remember which backend is used for CLI feedback
self.backend_name = self.config['backend'].as_str()
if self.backend_name not in self.backends:
raise ui.UserError(
u"Selected ReplayGain backend {0} is not supported. "
u"Please select one of: {1}".format(
backend_name,
self.backend_name,
u', '.join(self.backends.keys())
)
)
@ -1226,13 +1268,15 @@ class ReplayGainPlugin(BeetsPlugin):
# On-import analysis.
if self.config['auto']:
self.register_listener('import_begin', self.import_begin)
self.register_listener('import', self.import_end)
self.import_stages = [self.imported]
# Formats to use R128.
self.r128_whitelist = self.config['r128'].as_str_seq()
try:
self.backend_instance = self.backends[backend_name](
self.backend_instance = self.backends[self.backend_name](
self.config, self._log
)
except (ReplayGainError, FatalReplayGainError) as e:
@ -1264,30 +1308,40 @@ class ReplayGainPlugin(BeetsPlugin):
(not item.rg_album_gain or not item.rg_album_peak)
for item in album.items()])
def _store(self, item):
"""Store an item to the database.
When testing, item.store() sometimes fails non-destructively with
sqlite.OperationalError.
This method is here to be patched to a retry-once helper function
in test_replaygain.py, so that it can still fail appropriately
outside of these tests.
"""
item.store()
def store_track_gain(self, item, track_gain):
item.rg_track_gain = track_gain.gain
item.rg_track_peak = track_gain.peak
item.store()
self._store(item)
self._log.debug(u'applied track gain {0} LU, peak {1} of FS',
item.rg_track_gain, item.rg_track_peak)
def store_album_gain(self, item, album_gain):
item.rg_album_gain = album_gain.gain
item.rg_album_peak = album_gain.peak
item.store()
self._store(item)
self._log.debug(u'applied album gain {0} LU, peak {1} of FS',
item.rg_album_gain, item.rg_album_peak)
def store_track_r128_gain(self, item, track_gain):
item.r128_track_gain = track_gain.gain
item.store()
self._store(item)
self._log.debug(u'applied r128 track gain {0} LU',
item.r128_track_gain)
def store_album_r128_gain(self, item, album_gain):
item.r128_album_gain = album_gain.gain
item.store()
self._store(item)
self._log.debug(u'applied r128 album gain {0} LU',
item.r128_album_gain)
@ -1322,8 +1376,6 @@ class ReplayGainPlugin(BeetsPlugin):
self._log.info(u'Skipping album {0}', album)
return
self._log.info(u'analyzing {0}', album)
if (any([self.should_use_r128(item) for item in album.items()]) and not
all(([self.should_use_r128(item) for item in album.items()]))):
self._log.error(
@ -1331,6 +1383,8 @@ class ReplayGainPlugin(BeetsPlugin):
album)
return
self._log.info(u'analyzing {0}', album)
tag_vals = self.tag_specific_values(album.items())
store_track_gain, store_album_gain, target_level, peak = tag_vals
@ -1344,21 +1398,35 @@ class ReplayGainPlugin(BeetsPlugin):
discs[1] = album.items()
for discnumber, items in discs.items():
try:
album_gain = self.backend_instance.compute_album_gain(
items, target_level, peak
)
if len(album_gain.track_gains) != len(items):
def _store_album(album_gain):
if not album_gain or not album_gain.album_gain \
or len(album_gain.track_gains) != len(items):
# In some cases, backends fail to produce a valid
# `album_gain` without throwing FatalReplayGainError
# => raise non-fatal exception & continue
raise ReplayGainError(
u"ReplayGain backend failed "
u"for some tracks in album {0}".format(album)
u"ReplayGain backend `{}` failed "
u"for some tracks in album {}"
.format(self.backend_name, album)
)
for item, track_gain in zip(items, album_gain.track_gains):
for item, track_gain in zip(items,
album_gain.track_gains):
store_track_gain(item, track_gain)
store_album_gain(item, album_gain.album_gain)
if write:
item.try_write()
self._log.debug(u'done analyzing {0}', item)
try:
self._apply(
self.backend_instance.compute_album_gain, args=(),
kwds={
"items": [i for i in items],
"target_level": target_level,
"peak": peak
},
callback=_store_album
)
except ReplayGainError as e:
self._log.info(u"ReplayGain error: {0}", e)
except FatalReplayGainError as e:
@ -1376,28 +1444,121 @@ class ReplayGainPlugin(BeetsPlugin):
self._log.info(u'Skipping track {0}', item)
return
self._log.info(u'analyzing {0}', item)
tag_vals = self.tag_specific_values([item])
store_track_gain, store_album_gain, target_level, peak = tag_vals
try:
track_gains = self.backend_instance.compute_track_gain(
[item], target_level, peak
)
if len(track_gains) != 1:
def _store_track(track_gains):
if not track_gains or len(track_gains) != 1:
# In some cases, backends fail to produce a valid
# `track_gains` without throwing FatalReplayGainError
# => raise non-fatal exception & continue
raise ReplayGainError(
u"ReplayGain backend failed for track {0}".format(item)
u"ReplayGain backend `{}` failed for track {}"
.format(self.backend_name, item)
)
store_track_gain(item, track_gains[0])
if write:
item.try_write()
self._log.debug(u'done analyzing {0}', item)
try:
self._apply(
self.backend_instance.compute_track_gain, args=(),
kwds={
"items": [item],
"target_level": target_level,
"peak": peak,
},
callback=_store_track
)
except ReplayGainError as e:
self._log.info(u"ReplayGain error: {0}", e)
except FatalReplayGainError as e:
raise ui.UserError(
u"Fatal replay gain error: {0}".format(e))
raise ui.UserError(u"Fatal replay gain error: {0}".format(e))
def _has_pool(self):
"""Check whether a `ThreadPool` is running instance in `self.pool`
"""
if hasattr(self, 'pool'):
if isinstance(self.pool, ThreadPool) and self.pool._state == RUN:
return True
return False
def open_pool(self, threads):
"""Open a `ThreadPool` instance in `self.pool`
"""
if not self._has_pool() and self.backend_instance.do_parallel:
self.pool = ThreadPool(threads)
self.exc_queue = queue.Queue()
signal.signal(signal.SIGINT, self._interrupt)
self.exc_watcher = ExceptionWatcher(
self.exc_queue, # threads push exceptions here
self.terminate_pool # abort once an exception occurs
)
self.exc_watcher.start()
def _apply(self, func, args, kwds, callback):
if self._has_pool():
def catch_exc(func, exc_queue, log):
"""Wrapper to catch raised exceptions in threads
"""
def wfunc(*args, **kwargs):
try:
return func(*args, **kwargs)
except ReplayGainError as e:
log.info(e.args[0]) # log non-fatal exceptions
except Exception:
exc_queue.put(sys.exc_info())
return wfunc
# Wrap function and callback to catch exceptions
func = catch_exc(func, self.exc_queue, self._log)
callback = catch_exc(callback, self.exc_queue, self._log)
self.pool.apply_async(func, args, kwds, callback)
else:
callback(func(*args, **kwds))
def terminate_pool(self):
"""Terminate the `ThreadPool` instance in `self.pool`
(e.g. stop execution in case of exception)
"""
# Don't call self._as_pool() here,
# self.pool._state may not be == RUN
if hasattr(self, 'pool') and isinstance(self.pool, ThreadPool):
self.pool.terminate()
self.pool.join()
# self.exc_watcher.join()
def _interrupt(self, signal, frame):
try:
self._log.info('interrupted')
self.terminate_pool()
exit(0)
except SystemExit:
# Silence raised SystemExit ~ exit(0)
pass
def close_pool(self):
"""Close the `ThreadPool` instance in `self.pool` (if there is one)
"""
if self._has_pool():
self.pool.close()
self.pool.join()
self.exc_watcher.join()
def import_begin(self, session):
"""Handle `import_begin` event -> open pool
"""
self.open_pool(self.config['threads'].get(int))
def import_end(self, paths):
"""Handle `import` event -> close pool
"""
self.close_pool()
def imported(self, session, task):
"""Add replay gain info to items or albums of ``task``.
@ -1411,19 +1572,44 @@ class ReplayGainPlugin(BeetsPlugin):
"""Return the "replaygain" ui subcommand.
"""
def func(lib, opts, args):
try:
write = ui.should_write(opts.write)
force = opts.force
if opts.album:
for album in lib.albums(ui.decargs(args)):
self.handle_album(album, write, force)
# Bypass self.open_pool() if called with `--threads 0`
if opts.threads != 0:
threads = opts.threads or self.config['threads'].get(int)
self.open_pool(threads)
if opts.album:
albums = lib.albums(ui.decargs(args))
self._log.info(
"Analyzing {} albums ~ {} backend..."
.format(len(albums), self.backend_name)
)
for album in albums:
self.handle_album(album, write, force)
else:
for item in lib.items(ui.decargs(args)):
items = lib.items(ui.decargs(args))
self._log.info(
"Analyzing {} tracks ~ {} backend..."
.format(len(items), self.backend_name)
)
for item in items:
self.handle_track(item, write, force)
self.close_pool()
except (SystemExit, KeyboardInterrupt):
# Silence interrupt exceptions
pass
cmd = ui.Subcommand('replaygain', help=u'analyze for ReplayGain')
cmd.parser.add_album_option()
cmd.parser.add_option(
"-t", "--threads", dest="threads", type=int,
help=u'change the number of threads, \
defaults to maximum available processors'
)
cmd.parser.add_option(
"-f", "--force", dest="force", action="store_true", default=False,
help=u"analyze all files, including those that "

View file

@ -169,6 +169,9 @@ New features:
https://github.com/alastair/python-musicbrainzngs/pull/247 and
https://github.com/alastair/python-musicbrainzngs/pull/266 .
Thanks to :user:`aereaux`.
* :doc:`/plugins/replaygain` now does its analysis in parallel when using
the ``command``, ``ffmpeg`` or ``bs1770gain`` backends.
:bug:`3478`
Fixes:

View file

@ -13,12 +13,16 @@ Installation
This plugin can use one of many backends to compute the ReplayGain values:
GStreamer, mp3gain (and its cousin, aacgain), Python Audio Tools or ffmpeg.
ffmpeg and mp3gain can be easier to install. mp3gain supports less audio formats
then the other backend.
than the other backend.
Once installed, this plugin analyzes all files during the import process. This
can be a slow process; to instead analyze after the fact, disable automatic
analysis and use the ``beet replaygain`` command (see below).
To speed up analysis with some of the avalaible backends, this plugin processes
tracks or albums (when using the ``-a`` option) in parallel. By default,
a single thread is used per logical core of your CPU.
GStreamer
`````````
@ -35,6 +39,8 @@ the GStreamer backend by adding this to your configuration file::
replaygain:
backend: gstreamer
The GStreamer backend does not support parallel analysis.
mp3gain and aacgain
```````````````````
@ -73,6 +79,8 @@ On OS X, most of the dependencies can be installed with `Homebrew`_::
brew install mpg123 mp3gain vorbisgain faad2 libvorbis
The Python Audio Tools backend does not support parallel analysis.
.. _Python Audio Tools: http://audiotools.sourceforge.net
ffmpeg
@ -92,6 +100,9 @@ configuration file. The available options are:
- **auto**: Enable ReplayGain analysis during import.
Default: ``yes``.
- **threads**: The number of parallel threads to run the analysis in. Overridden
by ``--threads`` at the command line.
Default: # of logical CPU cores
- **backend**: The analysis backend; either ``gstreamer``, ``command``, ``audiotools``
or ``ffmpeg``.
Default: ``command``.
@ -143,8 +154,15 @@ whether ReplayGain tags are written into the music files, or stored in the
beets database only (the default is to use :ref:`the importer's configuration
<config-import-write>`).
To execute with a different number of threads, call ``beet replaygain --threads N``::
$ beet replaygain --threads N [-Waf] [QUERY]
with N any integer. To disable parallelism, use ``--threads 0``.
ReplayGain analysis is not fast, so you may want to disable it during import.
Use the ``auto`` config option to control this::
replaygain:
auto: no

View file

@ -22,11 +22,15 @@ import six
from mock import patch
from test.helper import TestHelper, capture_log, has_program
from sqlite3 import OperationalError
from beets import config
from beets.util import CommandOutput
from mediafile import MediaFile
from beetsplug.replaygain import (FatalGstreamerPluginReplayGainError,
GStreamerBackend)
GStreamerBackend,
ReplayGainPlugin)
try:
import gi
@ -55,10 +59,28 @@ def reset_replaygain(item):
item['rg_album_gain'] = None
item.write()
item.store()
item.store()
item.store()
def _store_retry_once(self, item):
"""Helper method to retry item.store() once in case
of a sqlite3.OperationalError exception.
:param self: `ReplayGainPlugin` instance
:param item: a library item to store
"""
try:
item.store()
except OperationalError:
# test_replaygain.py :memory: library can fail with
# `sqlite3.OperationalError: no such table: items`
# but the second attempt succeeds
item.store()
@patch.object(ReplayGainPlugin, '_store', _store_retry_once)
class ReplayGainCliTestBase(TestHelper):
def setUp(self):
self.setup_beets()
self.config['replaygain']['backend'] = self.backend