Analyze replaygain in parallel with multiprocessing.pool.ThreadPool

* Add `--jobs` or `-j` to `replaygain`-> set the pool size
* Single-threaded execution by default, if `--jobs` is unset

* If multithreaded, calls `Backend.compute_album_gain` or `Backend.compute_track_gain` asynchronously with metadata storing/writing in the callback
This commit is contained in:
ybnd 2020-01-28 21:19:39 +01:00
parent a4a0a4bd28
commit d95bb5683b

View file

@ -104,6 +104,8 @@ class Backend(object):
"""An abstract class representing engine for calculating RG values.
"""
do_parallel = False
def __init__(self, config, log):
"""Initialize the backend with the configuration view for the
plugin.
@ -135,6 +137,8 @@ class Bs1770gainBackend(Backend):
-18: "replaygain",
}
do_parallel = True
def __init__(self, config, log):
super(Bs1770gainBackend, self).__init__(config, log)
config.add({
@ -346,6 +350,9 @@ class Bs1770gainBackend(Backend):
class FfmpegBackend(Backend):
"""A replaygain backend using ffmpeg's ebur128 filter.
"""
do_parallel = True
def __init__(self, config, log):
super(FfmpegBackend, self).__init__(config, log)
self._ffmpeg_path = "ffmpeg"
@ -588,6 +595,7 @@ class FfmpegBackend(Backend):
# mpgain/aacgain CLI tool backend.
class CommandBackend(Backend):
do_parallel = True
def __init__(self, config, log):
super(CommandBackend, self).__init__(config, log)
@ -716,7 +724,6 @@ class CommandBackend(Backend):
# GStreamer-based backend.
class GStreamerBackend(Backend):
def __init__(self, config, log):
super(GStreamerBackend, self).__init__(config, log)
self._import_gst()
@ -1173,6 +1180,7 @@ class ReplayGainPlugin(BeetsPlugin):
self.overwrite = self.config['overwrite'].get(bool)
self.per_disc = self.config['per_disc'].get(bool)
backend_name = self.config['backend'].as_str()
if backend_name not in self.backends:
raise ui.UserError(
u"Selected ReplayGain backend {0} is not supported. "
@ -1290,8 +1298,6 @@ class ReplayGainPlugin(BeetsPlugin):
self._log.info(u'Skipping album {0}', album)
return
self._log.info(u'analyzing {0}', album)
if (any([self.should_use_r128(item) for item in album.items()]) and not
all(([self.should_use_r128(item) for item in album.items()]))):
raise ReplayGainError(
@ -1299,6 +1305,8 @@ class ReplayGainPlugin(BeetsPlugin):
u" for some tracks in album {0}".format(album)
)
self._log.info(u'analyzing {0}', album)
tag_vals = self.tag_specific_values(album.items())
store_track_gain, store_album_gain, target_level, peak = tag_vals
@ -1311,27 +1319,56 @@ class ReplayGainPlugin(BeetsPlugin):
else:
discs[1] = album.items()
for discnumber, items in discs.items():
try:
album_gain = self.backend_instance.compute_album_gain(
items, target_level, peak
if hasattr(self, 'pool'):
for discnumber, items in discs.items():
def _store_album(album_gain):
if len(album_gain.track_gains) != len(items):
raise ReplayGainError(
u"ReplayGain backend failed "
u"for some tracks in album {0}".format(album)
)
for item, track_gain in zip(items,
album_gain.track_gains):
store_track_gain(item, track_gain)
store_album_gain(item, album_gain.album_gain)
if write:
item.try_write()
self._log.debug(u'done analyzing {0}', item)
self.pool.apply_async(
self.backend_instance.compute_album_gain, args=(),
kwds={
"items": [i for i in items],
"target_level": target_level,
"peak": peak
},
callback=_store_album
)
if len(album_gain.track_gains) != len(items):
raise ReplayGainError(
u"ReplayGain backend failed "
u"for some tracks in album {0}".format(album)
else:
for discnumber, items in discs.items():
try:
album_gain = self.backend_instance.compute_album_gain(
items, target_level, peak
)
for item, track_gain in zip(items, album_gain.track_gains):
store_track_gain(item, track_gain)
store_album_gain(item, album_gain.album_gain)
if write:
item.try_write()
except ReplayGainError as e:
self._log.info(u"ReplayGain error: {0}", e)
except FatalReplayGainError as e:
raise ui.UserError(
u"Fatal replay gain error: {0}".format(e))
if len(album_gain.track_gains) != len(items):
raise ReplayGainError(
u"ReplayGain backend failed "
u"for some tracks in album {0}".format(album)
)
for item, track_gain in zip(items,
album_gain.track_gains):
store_track_gain(item, track_gain)
store_album_gain(item, album_gain.album_gain)
if write:
item.try_write()
except ReplayGainError as e:
self._log.info(u"ReplayGain error: {0}", e)
except FatalReplayGainError as e:
raise ui.UserError(
u"Fatal replay gain error: {0}".format(e))
def handle_track(self, item, write, force=False):
"""Compute track replay gain and store it in the item.
@ -1344,23 +1381,38 @@ class ReplayGainPlugin(BeetsPlugin):
self._log.info(u'Skipping track {0}', item)
return
self._log.info(u'analyzing {0}', item)
tag_vals = self.tag_specific_values([item])
store_track_gain, store_album_gain, target_level, peak = tag_vals
try:
track_gains = self.backend_instance.compute_track_gain(
[item], target_level, peak
)
def _store_track(track_gains):
if len(track_gains) != 1:
raise ReplayGainError(
u"ReplayGain backend failed for track {0}".format(item)
u"ReplayGain backend failed for track {0}".format(
item)
)
store_track_gain(item, track_gains[0])
if write:
item.try_write()
self._log.debug(u'done analyzing {0}', item)
try:
if hasattr(self, 'pool'):
self.pool.apply_async(
self.backend_instance.compute_track_gain, args=(),
kwds={
"items": [item],
"target_level": target_level,
"peak": peak,
},
callback=_store_track
)
else:
_store_track(
self.backend_instance.compute_track_gain(
[item], target_level, peak
)
)
except ReplayGainError as e:
self._log.info(u"ReplayGain error: {0}", e)
except FatalReplayGainError as e:
@ -1381,17 +1433,30 @@ class ReplayGainPlugin(BeetsPlugin):
def func(lib, opts, args):
write = ui.should_write(opts.write)
force = opts.force
jobs = opts.jobs
if self.backend_instance.do_parallel and jobs > 0:
from multiprocessing.pool import ThreadPool
self.pool = ThreadPool(jobs)
if opts.album:
for album in lib.albums(ui.decargs(args)):
self.handle_album(album, write, force)
else:
for item in lib.items(ui.decargs(args)):
self.handle_track(item, write, force)
if hasattr(self, 'pool'):
self.pool.close()
self.pool.join()
self.pool.terminate()
cmd = ui.Subcommand('replaygain', help=u'analyze for ReplayGain')
cmd.parser.add_album_option()
cmd.parser.add_option(
"-j", "--jobs", dest="jobs", type=int, default=0,
help=u"worker pool size"
)
cmd.parser.add_option(
"-f", "--force", dest="force", action="store_true", default=False,
help=u"analyze all files, including those that "