mirror of
https://github.com/beetbox/beets.git
synced 2025-12-13 12:02:44 +01:00
Merge branch 'beetbox:master' into master
This commit is contained in:
commit
ca1edabffc
16 changed files with 433 additions and 167 deletions
|
|
@ -3,12 +3,12 @@
|
|||
|
||||
repos:
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 22.12.0
|
||||
rev: 23.11.0
|
||||
hooks:
|
||||
- id: black
|
||||
|
||||
- repo: https://github.com/pycqa/isort
|
||||
rev: 5.11.4
|
||||
rev: 5.12.0
|
||||
hooks:
|
||||
- id: isort
|
||||
name: isort (python)
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ We love to get contributions from our community—you! There are many ways
|
|||
to contribute, whether you’re a programmer or not.
|
||||
|
||||
The first thing to do, regardless of how you'd like to contribute to the
|
||||
project, is to check out our :doc:`Code of Conduct </code_of_conduct>` and to
|
||||
project, is to check out our :doc:`Code of Conduct <code_of_conduct>` and to
|
||||
keep that in mind while interacting with other contributors and users.
|
||||
|
||||
Non-Programming
|
||||
|
|
@ -282,8 +282,8 @@ Running the Tests
|
|||
|
||||
To run the tests for multiple Python versions, compile the docs, and
|
||||
check style, use `tox`_. Just type ``tox`` or use something like
|
||||
``tox -e py27`` to test a specific configuration. `detox`_ makes this go
|
||||
faster.
|
||||
``tox -e py27`` to test a specific configuration. You can use the
|
||||
``--parallel`` flag to make this go faster.
|
||||
|
||||
You can disable a hand-selected set of "slow" tests by setting the
|
||||
environment variable SKIP_SLOW_TESTS before running them.
|
||||
|
|
@ -359,7 +359,6 @@ others. See `unittest.mock`_ for more info.
|
|||
.. _Codecov: https://codecov.io/github/beetbox/beets
|
||||
.. _pytest-random: https://github.com/klrmn/pytest-random
|
||||
.. _tox: https://tox.readthedocs.io/en/latest/
|
||||
.. _detox: https://pypi.org/project/detox/
|
||||
.. _pytest: https://docs.pytest.org/en/stable/
|
||||
.. _Linux: https://github.com/beetbox/beets/actions
|
||||
.. _Windows: https://ci.appveyor.com/project/beetbox/beets/
|
||||
|
|
|
|||
|
|
@ -100,11 +100,11 @@ programmer or not, you should be able to find all the info you need at
|
|||
Read More
|
||||
---------
|
||||
|
||||
Learn more about beets at `its Web site`_. Follow `@b33ts`_ on Twitter for
|
||||
Learn more about beets at `its Web site`_. Follow `@b33ts`_ on Mastodon for
|
||||
news and updates.
|
||||
|
||||
.. _its Web site: https://beets.io/
|
||||
.. _@b33ts: https://twitter.com/b33ts/
|
||||
.. _@b33ts: https://fosstodon.org/@beets
|
||||
|
||||
Contact
|
||||
-------
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
Library.
|
||||
"""
|
||||
|
||||
from .db import Database, Model
|
||||
from .db import Database, Model, Results
|
||||
from .query import (
|
||||
AndQuery,
|
||||
FieldQuery,
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from mediafile import MediaFile, UnreadableFileError
|
|||
|
||||
import beets
|
||||
from beets import dbcore, logging, plugins, util
|
||||
from beets.dbcore import types
|
||||
from beets.dbcore import Results, types
|
||||
from beets.util import (
|
||||
MoveOperation,
|
||||
bytestring_path,
|
||||
|
|
@ -1665,11 +1665,11 @@ class Library(dbcore.Database):
|
|||
Item, beets.config["sort_item"].as_str_seq()
|
||||
)
|
||||
|
||||
def albums(self, query=None, sort=None):
|
||||
def albums(self, query=None, sort=None) -> Results[Album]:
|
||||
"""Get :class:`Album` objects matching the query."""
|
||||
return self._fetch(Album, query, sort or self.get_default_album_sort())
|
||||
|
||||
def items(self, query=None, sort=None):
|
||||
def items(self, query=None, sort=None) -> Results[Item]:
|
||||
"""Get :class:`Item` objects matching the query."""
|
||||
return self._fetch(Item, query, sort or self.get_default_item_sort())
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import sys
|
|||
import textwrap
|
||||
import traceback
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Callable, List
|
||||
|
||||
import confuse
|
||||
|
||||
|
|
@ -857,7 +858,7 @@ def split_into_lines(string, width_tuple):
|
|||
m.group("esc") + raw_word + RESET_COLOR
|
||||
for raw_word in raw_words
|
||||
]
|
||||
else:
|
||||
elif raw_words:
|
||||
# Pretext stops mid-word
|
||||
if m.group("esc") != RESET_COLOR:
|
||||
# Add the rest of the current word, with a reset after it
|
||||
|
|
@ -1450,6 +1451,8 @@ class Subcommand:
|
|||
invoked by a SubcommandOptionParser.
|
||||
"""
|
||||
|
||||
func: Callable[[library.Library, optparse.Values, List[str]], Any]
|
||||
|
||||
def __init__(self, name, parser=None, help="", aliases=(), hide=False):
|
||||
"""Creates a new subcommand. name is the primary way to invoke
|
||||
the subcommand; aliases are alternate names. parser is an
|
||||
|
|
|
|||
|
|
@ -680,10 +680,13 @@ class AlbumChange(ChangeRepresentation):
|
|||
# Save new medium details for future comparison.
|
||||
medium, disctitle = track_info.medium, track_info.disctitle
|
||||
|
||||
if config["import"]["detail"]:
|
||||
# Construct the line tuple for the track.
|
||||
left, right = self.make_line(item, track_info)
|
||||
# Construct the line tuple for the track.
|
||||
left, right = self.make_line(item, track_info)
|
||||
if right["contents"] != "":
|
||||
lines.append((left, right))
|
||||
else:
|
||||
if config["import"]["detail"]:
|
||||
lines.append((left, right))
|
||||
self.print_tracklist(lines)
|
||||
|
||||
# Missing and unmatched tracks.
|
||||
|
|
|
|||
|
|
@ -405,7 +405,10 @@ def bytestring_path(path: Bytes_or_String) -> bytes:
|
|||
PATH_SEP: bytes = bytestring_path(os.sep)
|
||||
|
||||
|
||||
def displayable_path(path: bytes, separator: str = "; ") -> str:
|
||||
def displayable_path(
|
||||
path: Union[bytes, str, Tuple[Union[bytes, str], ...]],
|
||||
separator: str = "; ",
|
||||
) -> str:
|
||||
"""Attempts to decode a bytestring path to a unicode object for the
|
||||
purpose of displaying it to the user. If the `path` argument is a
|
||||
list or a tuple, the elements are joined with `separator`.
|
||||
|
|
@ -801,7 +804,7 @@ def legalize_path(
|
|||
return second_stage_path, retruncated
|
||||
|
||||
|
||||
def py3_path(path: AnyStr) -> str:
|
||||
def py3_path(path: Union[bytes, str]) -> str:
|
||||
"""Convert a bytestring path to Unicode.
|
||||
|
||||
This helps deal with APIs on Python 3 that *only* accept Unicode
|
||||
|
|
|
|||
|
|
@ -63,6 +63,19 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
|
||||
return [deezer_update_cmd]
|
||||
|
||||
def fetch_data(self, url):
|
||||
try:
|
||||
response = requests.get(url, timeout=10)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._log.error("Error fetching data from {}\n Error: {}", url, e)
|
||||
return None
|
||||
if "error" in data:
|
||||
self._log.error("Deezer API error: {}", data["error"]["message"])
|
||||
return None
|
||||
return data
|
||||
|
||||
def album_for_id(self, album_id):
|
||||
"""Fetch an album by its Deezer ID or URL and return an
|
||||
AlbumInfo object or None if the album is not found.
|
||||
|
|
@ -75,13 +88,8 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
deezer_id = self._get_id("album", album_id, self.id_regex)
|
||||
if deezer_id is None:
|
||||
return None
|
||||
|
||||
album_data = requests.get(self.album_url + deezer_id).json()
|
||||
if "error" in album_data:
|
||||
self._log.debug(
|
||||
f"Error fetching album {album_id}: "
|
||||
f"{album_data['error']['message']}"
|
||||
)
|
||||
album_data = self.fetch_data(self.album_url + deezer_id)
|
||||
if album_data is None:
|
||||
return None
|
||||
contributors = album_data.get("contributors")
|
||||
if contributors is not None:
|
||||
|
|
@ -107,9 +115,14 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
"Invalid `release_date` returned "
|
||||
"by {} API: '{}'".format(self.data_source, release_date)
|
||||
)
|
||||
|
||||
tracks_obj = requests.get(self.album_url + deezer_id + "/tracks").json()
|
||||
tracks_data = tracks_obj["data"]
|
||||
tracks_obj = self.fetch_data(self.album_url + deezer_id + "/tracks")
|
||||
if tracks_obj is None:
|
||||
return None
|
||||
try:
|
||||
tracks_data = tracks_obj["data"]
|
||||
except KeyError:
|
||||
self._log.debug("Error fetching album tracks for {}", deezer_id)
|
||||
tracks_data = None
|
||||
if not tracks_data:
|
||||
return None
|
||||
while "next" in tracks_obj:
|
||||
|
|
@ -192,21 +205,26 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
deezer_id = self._get_id("track", track_id, self.id_regex)
|
||||
if deezer_id is None:
|
||||
return None
|
||||
track_data = requests.get(self.track_url + deezer_id).json()
|
||||
if "error" in track_data:
|
||||
self._log.debug(
|
||||
f"Error fetching track {track_id}: "
|
||||
f"{track_data['error']['message']}"
|
||||
)
|
||||
track_data = self.fetch_data(self.track_url + deezer_id)
|
||||
if track_data is None:
|
||||
return None
|
||||
track = self._get_track(track_data)
|
||||
|
||||
# Get album's tracks to set `track.index` (position on the entire
|
||||
# release) and `track.medium_total` (total number of tracks on
|
||||
# the track's disc).
|
||||
album_tracks_data = requests.get(
|
||||
album_tracks_obj = self.fetch_data(
|
||||
self.album_url + str(track_data["album"]["id"]) + "/tracks"
|
||||
).json()["data"]
|
||||
)
|
||||
if album_tracks_obj is None:
|
||||
return None
|
||||
try:
|
||||
album_tracks_data = album_tracks_obj["data"]
|
||||
except KeyError:
|
||||
self._log.debug(
|
||||
"Error fetching album tracks for {}", track_data["album"]["id"]
|
||||
)
|
||||
return None
|
||||
medium_total = 0
|
||||
for i, track_data in enumerate(album_tracks_data, start=1):
|
||||
if track_data["disk_number"] == track.medium:
|
||||
|
|
@ -283,11 +301,9 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
self._log.debug("No deezer_track_id present for: {}", item)
|
||||
continue
|
||||
try:
|
||||
rank = (
|
||||
requests.get(f"{self.track_url}{deezer_track_id}")
|
||||
.json()
|
||||
.get("rank")
|
||||
)
|
||||
rank = self.fetch_data(
|
||||
f"{self.track_url}{deezer_track_id}"
|
||||
).get("rank")
|
||||
self._log.debug(
|
||||
"Deezer track: {} has {} rank", deezer_track_id, rank
|
||||
)
|
||||
|
|
|
|||
|
|
@ -290,10 +290,34 @@ class Backend:
|
|||
self._log.debug("failed to fetch: {0} ({1})", url, r.status_code)
|
||||
return None
|
||||
|
||||
def fetch(self, artist, title):
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class LRCLib(Backend):
|
||||
base_url = "https://lrclib.net/api/get"
|
||||
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
params = {
|
||||
"artist_name": artist,
|
||||
"track_name": title,
|
||||
"album_name": album,
|
||||
"duration": length,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base_url, params=params)
|
||||
data = response.json()
|
||||
except (requests.RequestException, json.decoder.JSONDecodeError) as exc:
|
||||
self._log.debug("LRCLib API request failed: {0}", exc)
|
||||
return None
|
||||
|
||||
if self.config["synced"]:
|
||||
return data.get("syncedLyrics")
|
||||
|
||||
return data.get("plainLyrics")
|
||||
|
||||
|
||||
class MusiXmatch(Backend):
|
||||
REPLACEMENTS = {
|
||||
r"\s+": "-",
|
||||
|
|
@ -313,7 +337,7 @@ class MusiXmatch(Backend):
|
|||
|
||||
return super()._encode(s)
|
||||
|
||||
def fetch(self, artist, title):
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
url = self.build_url(artist, title)
|
||||
|
||||
html = self.fetch_url(url)
|
||||
|
|
@ -361,7 +385,7 @@ class Genius(Backend):
|
|||
"User-Agent": USER_AGENT,
|
||||
}
|
||||
|
||||
def fetch(self, artist, title):
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
"""Fetch lyrics from genius.com
|
||||
|
||||
Because genius doesn't allow accessing lyrics via the api,
|
||||
|
|
@ -473,7 +497,7 @@ class Tekstowo(Backend):
|
|||
BASE_URL = "http://www.tekstowo.pl"
|
||||
URL_PATTERN = BASE_URL + "/wyszukaj.html?search-title=%s&search-artist=%s"
|
||||
|
||||
def fetch(self, artist, title):
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
url = self.build_url(title, artist)
|
||||
search_results = self.fetch_url(url)
|
||||
if not search_results:
|
||||
|
|
@ -706,7 +730,7 @@ class Google(Backend):
|
|||
ratio = difflib.SequenceMatcher(None, song_title, title).ratio()
|
||||
return ratio >= typo_ratio
|
||||
|
||||
def fetch(self, artist, title):
|
||||
def fetch(self, artist, title, album=None, length=None):
|
||||
query = f"{artist} {title}"
|
||||
url = "https://www.googleapis.com/customsearch/v1?key=%s&cx=%s&q=%s" % (
|
||||
self.api_key,
|
||||
|
|
@ -750,12 +774,13 @@ class Google(Backend):
|
|||
|
||||
|
||||
class LyricsPlugin(plugins.BeetsPlugin):
|
||||
SOURCES = ["google", "musixmatch", "genius", "tekstowo"]
|
||||
SOURCES = ["google", "musixmatch", "genius", "tekstowo", "lrclib"]
|
||||
SOURCE_BACKENDS = {
|
||||
"google": Google,
|
||||
"musixmatch": MusiXmatch,
|
||||
"genius": Genius,
|
||||
"tekstowo": Tekstowo,
|
||||
"lrclib": LRCLib,
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
|
|
@ -774,6 +799,7 @@ class LyricsPlugin(plugins.BeetsPlugin):
|
|||
"fallback": None,
|
||||
"force": False,
|
||||
"local": False,
|
||||
"synced": False,
|
||||
# Musixmatch is disabled by default as they are currently blocking
|
||||
# requests with the beets user agent.
|
||||
"sources": [s for s in self.SOURCES if s != "musixmatch"],
|
||||
|
|
@ -1019,8 +1045,13 @@ class LyricsPlugin(plugins.BeetsPlugin):
|
|||
return
|
||||
|
||||
lyrics = None
|
||||
album = item.album
|
||||
length = round(item.length)
|
||||
for artist, titles in search_pairs(item):
|
||||
lyrics = [self.get_lyrics(artist, title) for title in titles]
|
||||
lyrics = [
|
||||
self.get_lyrics(artist, title, album=album, length=length)
|
||||
for title in titles
|
||||
]
|
||||
if any(lyrics):
|
||||
break
|
||||
|
||||
|
|
@ -1049,12 +1080,12 @@ class LyricsPlugin(plugins.BeetsPlugin):
|
|||
item.try_write()
|
||||
item.store()
|
||||
|
||||
def get_lyrics(self, artist, title):
|
||||
def get_lyrics(self, artist, title, album=None, length=None):
|
||||
"""Fetch lyrics, trying each source in turn. Return a string or
|
||||
None if no lyrics were found.
|
||||
"""
|
||||
for backend in self.backends:
|
||||
lyrics = backend.fetch(artist, title)
|
||||
lyrics = backend.fetch(artist, title, album=album, length=length)
|
||||
if lyrics:
|
||||
self._log.debug(
|
||||
"got lyrics from backend: {0}", backend.__class__.__name__
|
||||
|
|
|
|||
|
|
@ -16,16 +16,38 @@
|
|||
import collections
|
||||
import enum
|
||||
import math
|
||||
import optparse
|
||||
import os
|
||||
import queue
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from logging import Logger
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from threading import Event, Thread
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
DefaultDict,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
from confuse import ConfigView
|
||||
|
||||
from beets import ui
|
||||
from beets.importer import ImportSession, ImportTask
|
||||
from beets.library import Album, Item, Library
|
||||
from beets.plugins import BeetsPlugin
|
||||
from beets.util import (
|
||||
command_output,
|
||||
|
|
@ -53,7 +75,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError):
|
|||
loading the required plugins."""
|
||||
|
||||
|
||||
def call(args, log, **kwargs):
|
||||
def call(args: List[Any], log: Logger, **kwargs: Any):
|
||||
"""Execute the command and return its output or raise a
|
||||
ReplayGainError on failure.
|
||||
"""
|
||||
|
|
@ -71,13 +93,7 @@ def call(args, log, **kwargs):
|
|||
raise ReplayGainError("argument encoding failed")
|
||||
|
||||
|
||||
def after_version(version_a, version_b):
|
||||
return tuple(int(s) for s in version_a.split(".")) >= tuple(
|
||||
int(s) for s in version_b.split(".")
|
||||
)
|
||||
|
||||
|
||||
def db_to_lufs(db):
|
||||
def db_to_lufs(db: float) -> float:
|
||||
"""Convert db to LUFS.
|
||||
|
||||
According to https://wiki.hydrogenaud.io/index.php?title=
|
||||
|
|
@ -86,7 +102,7 @@ def db_to_lufs(db):
|
|||
return db - 107
|
||||
|
||||
|
||||
def lufs_to_db(db):
|
||||
def lufs_to_db(db: float) -> float:
|
||||
"""Convert LUFS to db.
|
||||
|
||||
According to https://wiki.hydrogenaud.io/index.php?title=
|
||||
|
|
@ -97,9 +113,13 @@ def lufs_to_db(db):
|
|||
|
||||
# Backend base and plumbing classes.
|
||||
|
||||
# gain: in LU to reference level
|
||||
# peak: part of full scale (FS is 1.0)
|
||||
Gain = collections.namedtuple("Gain", "gain peak")
|
||||
|
||||
@dataclass
|
||||
class Gain:
|
||||
# gain: in LU to reference level
|
||||
gain: float
|
||||
# peak: part of full scale (FS is 1.0)
|
||||
peak: float
|
||||
|
||||
|
||||
class PeakMethod(enum.Enum):
|
||||
|
|
@ -118,7 +138,13 @@ class RgTask:
|
|||
"""
|
||||
|
||||
def __init__(
|
||||
self, items, album, target_level, peak_method, backend_name, log
|
||||
self,
|
||||
items: Sequence[Item],
|
||||
album: Optional[Album],
|
||||
target_level: float,
|
||||
peak_method: Optional[PeakMethod],
|
||||
backend_name: str,
|
||||
log: Logger,
|
||||
):
|
||||
self.items = items
|
||||
self.album = album
|
||||
|
|
@ -126,10 +152,10 @@ class RgTask:
|
|||
self.peak_method = peak_method
|
||||
self.backend_name = backend_name
|
||||
self._log = log
|
||||
self.album_gain = None
|
||||
self.track_gains = None
|
||||
self.album_gain: Optional[Gain] = None
|
||||
self.track_gains: Optional[List[Gain]] = None
|
||||
|
||||
def _store_track_gain(self, item, track_gain):
|
||||
def _store_track_gain(self, item: Item, track_gain: Gain):
|
||||
"""Store track gain for a single item in the database."""
|
||||
item.rg_track_gain = track_gain.gain
|
||||
item.rg_track_peak = track_gain.peak
|
||||
|
|
@ -140,13 +166,13 @@ class RgTask:
|
|||
item.rg_track_peak,
|
||||
)
|
||||
|
||||
def _store_album_gain(self, item):
|
||||
def _store_album_gain(self, item: Item, album_gain: Gain):
|
||||
"""Store album gain for a single item in the database.
|
||||
|
||||
The caller needs to ensure that `self.album_gain is not None`.
|
||||
"""
|
||||
item.rg_album_gain = self.album_gain.gain
|
||||
item.rg_album_peak = self.album_gain.peak
|
||||
item.rg_album_gain = album_gain.gain
|
||||
item.rg_album_peak = album_gain.peak
|
||||
item.store()
|
||||
self._log.debug(
|
||||
"applied album gain {0} LU, peak {1} of FS",
|
||||
|
|
@ -154,7 +180,7 @@ class RgTask:
|
|||
item.rg_album_peak,
|
||||
)
|
||||
|
||||
def _store_track(self, write):
|
||||
def _store_track(self, write: bool):
|
||||
"""Store track gain for the first track of the task in the database."""
|
||||
item = self.items[0]
|
||||
if self.track_gains is None or len(self.track_gains) != 1:
|
||||
|
|
@ -172,7 +198,7 @@ class RgTask:
|
|||
item.try_write()
|
||||
self._log.debug("done analyzing {0}", item)
|
||||
|
||||
def _store_album(self, write):
|
||||
def _store_album(self, write: bool):
|
||||
"""Store track/album gains for all tracks of the task in the database."""
|
||||
if (
|
||||
self.album_gain is None
|
||||
|
|
@ -190,12 +216,12 @@ class RgTask:
|
|||
)
|
||||
for item, track_gain in zip(self.items, self.track_gains):
|
||||
self._store_track_gain(item, track_gain)
|
||||
self._store_album_gain(item)
|
||||
self._store_album_gain(item, self.album_gain)
|
||||
if write:
|
||||
item.try_write()
|
||||
self._log.debug("done analyzing {0}", item)
|
||||
|
||||
def store(self, write):
|
||||
def store(self, write: bool):
|
||||
"""Store computed gains for the items of this task in the database."""
|
||||
if self.album is not None:
|
||||
self._store_album(write)
|
||||
|
|
@ -213,44 +239,56 @@ class R128Task(RgTask):
|
|||
tags.
|
||||
"""
|
||||
|
||||
def __init__(self, items, album, target_level, backend_name, log):
|
||||
def __init__(
|
||||
self,
|
||||
items: Sequence[Item],
|
||||
album: Optional[Album],
|
||||
target_level: float,
|
||||
backend_name: str,
|
||||
log: Logger,
|
||||
):
|
||||
# R128_* tags do not store the track/album peak
|
||||
super().__init__(items, album, target_level, None, backend_name, log)
|
||||
|
||||
def _store_track_gain(self, item, track_gain):
|
||||
def _store_track_gain(self, item: Item, track_gain: Gain):
|
||||
item.r128_track_gain = track_gain.gain
|
||||
item.store()
|
||||
self._log.debug("applied r128 track gain {0} LU", item.r128_track_gain)
|
||||
|
||||
def _store_album_gain(self, item):
|
||||
def _store_album_gain(self, item: Item, album_gain: Gain):
|
||||
"""
|
||||
|
||||
The caller needs to ensure that `self.album_gain is not None`.
|
||||
"""
|
||||
item.r128_album_gain = self.album_gain.gain
|
||||
item.r128_album_gain = album_gain.gain
|
||||
item.store()
|
||||
self._log.debug("applied r128 album gain {0} LU", item.r128_album_gain)
|
||||
|
||||
|
||||
class Backend:
|
||||
AnyRgTask = TypeVar("AnyRgTask", bound=RgTask)
|
||||
|
||||
|
||||
class Backend(ABC):
|
||||
"""An abstract class representing engine for calculating RG values."""
|
||||
|
||||
NAME = ""
|
||||
do_parallel = False
|
||||
|
||||
def __init__(self, config, log):
|
||||
def __init__(self, config: ConfigView, log: Logger):
|
||||
"""Initialize the backend with the configuration view for the
|
||||
plugin.
|
||||
"""
|
||||
self._log = log
|
||||
|
||||
def compute_track_gain(self, task):
|
||||
@abstractmethod
|
||||
def compute_track_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the track gain for the tracks belonging to `task`, and sets
|
||||
the `track_gains` attribute on the task. Returns `task`.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def compute_album_gain(self, task):
|
||||
@abstractmethod
|
||||
def compute_album_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the album gain for the album belonging to `task`, and sets
|
||||
the `album_gain` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -264,7 +302,7 @@ class FfmpegBackend(Backend):
|
|||
NAME = "ffmpeg"
|
||||
do_parallel = True
|
||||
|
||||
def __init__(self, config, log):
|
||||
def __init__(self, config: ConfigView, log: Logger):
|
||||
super().__init__(config, log)
|
||||
self._ffmpeg_path = "ffmpeg"
|
||||
|
||||
|
|
@ -292,7 +330,7 @@ class FfmpegBackend(Backend):
|
|||
"the --enable-libebur128 configuration option is required."
|
||||
)
|
||||
|
||||
def compute_track_gain(self, task):
|
||||
def compute_track_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the track gain for the tracks belonging to `task`, and sets
|
||||
the `track_gains` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -310,7 +348,7 @@ class FfmpegBackend(Backend):
|
|||
|
||||
return task
|
||||
|
||||
def compute_album_gain(self, task):
|
||||
def compute_album_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the album gain for the album belonging to `task`, and sets
|
||||
the `album_gain` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -318,7 +356,7 @@ class FfmpegBackend(Backend):
|
|||
|
||||
# analyse tracks
|
||||
# Gives a list of tuples (track_gain, track_n_blocks)
|
||||
track_results = [
|
||||
track_results: List[Tuple[Gain, int]] = [
|
||||
self._analyse_item(
|
||||
item,
|
||||
task.target_level,
|
||||
|
|
@ -328,8 +366,7 @@ class FfmpegBackend(Backend):
|
|||
for item in task.items
|
||||
]
|
||||
|
||||
# list of track Gain objects
|
||||
track_gains = [tg for tg, _nb in track_results]
|
||||
track_gains: List[Gain] = [tg for tg, _nb in track_results]
|
||||
|
||||
# Album peak is maximum track peak
|
||||
album_peak = max(tg.peak for tg in track_gains)
|
||||
|
|
@ -337,7 +374,7 @@ class FfmpegBackend(Backend):
|
|||
# Total number of BS.1770 gating blocks
|
||||
n_blocks = sum(nb for _tg, nb in track_results)
|
||||
|
||||
def sum_of_track_powers(track_gain, track_n_blocks):
|
||||
def sum_of_track_powers(track_gain: Gain, track_n_blocks: int):
|
||||
# convert `LU to target_level` -> LUFS
|
||||
loudness = target_level_lufs - track_gain.gain
|
||||
|
||||
|
|
@ -363,6 +400,7 @@ class FfmpegBackend(Backend):
|
|||
album_gain = -0.691 + 10 * math.log10(sum_powers / n_blocks)
|
||||
else:
|
||||
album_gain = -70
|
||||
|
||||
# convert LUFS -> `LU to target_level`
|
||||
album_gain = target_level_lufs - album_gain
|
||||
|
||||
|
|
@ -378,7 +416,9 @@ class FfmpegBackend(Backend):
|
|||
|
||||
return task
|
||||
|
||||
def _construct_cmd(self, item, peak_method):
|
||||
def _construct_cmd(
|
||||
self, item: Item, peak_method: Optional[PeakMethod]
|
||||
) -> List[Union[str, bytes]]:
|
||||
"""Construct the shell command to analyse items."""
|
||||
return [
|
||||
self._ffmpeg_path,
|
||||
|
|
@ -397,7 +437,13 @@ class FfmpegBackend(Backend):
|
|||
"-",
|
||||
]
|
||||
|
||||
def _analyse_item(self, item, target_level, peak_method, count_blocks=True):
|
||||
def _analyse_item(
|
||||
self,
|
||||
item: Item,
|
||||
target_level: float,
|
||||
peak_method: Optional[PeakMethod],
|
||||
count_blocks: bool = True,
|
||||
) -> Tuple[Gain, int]:
|
||||
"""Analyse item. Return a pair of a Gain object and the number
|
||||
of gating blocks above the threshold.
|
||||
|
||||
|
|
@ -415,7 +461,7 @@ class FfmpegBackend(Backend):
|
|||
# parse output
|
||||
|
||||
if peak_method is None:
|
||||
peak = 0
|
||||
peak = 0.0
|
||||
else:
|
||||
line_peak = self._find_line(
|
||||
output,
|
||||
|
|
@ -486,7 +532,13 @@ class FfmpegBackend(Backend):
|
|||
|
||||
return Gain(gain, peak), n_blocks
|
||||
|
||||
def _find_line(self, output, search, start_line=0, step_size=1):
|
||||
def _find_line(
|
||||
self,
|
||||
output: Sequence[bytes],
|
||||
search: bytes,
|
||||
start_line: int = 0,
|
||||
step_size: int = 1,
|
||||
) -> int:
|
||||
"""Return index of line beginning with `search`.
|
||||
|
||||
Begins searching at index `start_line` in `output`.
|
||||
|
|
@ -501,19 +553,19 @@ class FfmpegBackend(Backend):
|
|||
)
|
||||
)
|
||||
|
||||
def _parse_float(self, line):
|
||||
def _parse_float(self, line: bytes) -> float:
|
||||
"""Extract a float from a key value pair in `line`.
|
||||
|
||||
This format is expected: /[^:]:[[:space:]]*value.*/, where `value` is
|
||||
the float.
|
||||
"""
|
||||
# extract value
|
||||
value = line.split(b":", 1)
|
||||
if len(value) < 2:
|
||||
parts = line.split(b":", 1)
|
||||
if len(parts) < 2:
|
||||
raise ReplayGainError(
|
||||
"ffmpeg output: expected key value pair, found {}".format(line)
|
||||
f"ffmpeg output: expected key value pair, found {line!r}"
|
||||
)
|
||||
value = value[1].lstrip()
|
||||
value = parts[1].lstrip()
|
||||
# strip unit
|
||||
value = value.split(b" ", 1)[0]
|
||||
# cast value to float
|
||||
|
|
@ -521,7 +573,7 @@ class FfmpegBackend(Backend):
|
|||
return float(value)
|
||||
except ValueError:
|
||||
raise ReplayGainError(
|
||||
"ffmpeg output: expected float value, found {}".format(value)
|
||||
f"ffmpeg output: expected float value, found {value!r}"
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -530,7 +582,7 @@ class CommandBackend(Backend):
|
|||
NAME = "command"
|
||||
do_parallel = True
|
||||
|
||||
def __init__(self, config, log):
|
||||
def __init__(self, config: ConfigView, log: Logger):
|
||||
super().__init__(config, log)
|
||||
config.add(
|
||||
{
|
||||
|
|
@ -539,7 +591,7 @@ class CommandBackend(Backend):
|
|||
}
|
||||
)
|
||||
|
||||
self.command = config["command"].as_str()
|
||||
self.command = cast(str, config["command"].as_str())
|
||||
|
||||
if self.command:
|
||||
# Explicit executable path.
|
||||
|
|
@ -562,7 +614,7 @@ class CommandBackend(Backend):
|
|||
|
||||
self.noclip = config["noclip"].get(bool)
|
||||
|
||||
def compute_track_gain(self, task):
|
||||
def compute_track_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the track gain for the tracks belonging to `task`, and sets
|
||||
the `track_gains` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -571,7 +623,7 @@ class CommandBackend(Backend):
|
|||
task.track_gains = output
|
||||
return task
|
||||
|
||||
def compute_album_gain(self, task):
|
||||
def compute_album_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the album gain for the album belonging to `task`, and sets
|
||||
the `album_gain` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -590,7 +642,7 @@ class CommandBackend(Backend):
|
|||
task.track_gains = output[:-1]
|
||||
return task
|
||||
|
||||
def format_supported(self, item):
|
||||
def format_supported(self, item: Item) -> bool:
|
||||
"""Checks whether the given item is supported by the selected tool."""
|
||||
if "mp3gain" in self.command and item.format != "MP3":
|
||||
return False
|
||||
|
|
@ -598,7 +650,12 @@ class CommandBackend(Backend):
|
|||
return False
|
||||
return True
|
||||
|
||||
def compute_gain(self, items, target_level, is_album):
|
||||
def compute_gain(
|
||||
self,
|
||||
items: Sequence[Item],
|
||||
target_level: float,
|
||||
is_album: bool,
|
||||
) -> List[Gain]:
|
||||
"""Computes the track or album gain of a list of items, returns
|
||||
a list of TrackGain objects.
|
||||
|
||||
|
|
@ -618,7 +675,7 @@ class CommandBackend(Backend):
|
|||
# tag-writing; this turns the mp3gain/aacgain tool into a gain
|
||||
# calculator rather than a tag manipulator because we take care
|
||||
# of changing tags ourselves.
|
||||
cmd = [self.command, "-o", "-s", "s"]
|
||||
cmd: List[Union[bytes, str]] = [self.command, "-o", "-s", "s"]
|
||||
if self.noclip:
|
||||
# Adjust to avoid clipping.
|
||||
cmd = cmd + ["-k"]
|
||||
|
|
@ -636,7 +693,7 @@ class CommandBackend(Backend):
|
|||
output, len(items) + (1 if is_album else 0)
|
||||
)
|
||||
|
||||
def parse_tool_output(self, text, num_lines):
|
||||
def parse_tool_output(self, text: bytes, num_lines: int) -> List[Gain]:
|
||||
"""Given the tab-delimited output from an invocation of mp3gain
|
||||
or aacgain, parse the text and return a list of dictionaries
|
||||
containing information about each analyzed file.
|
||||
|
|
@ -647,15 +704,15 @@ class CommandBackend(Backend):
|
|||
if len(parts) != 6 or parts[0] == b"File":
|
||||
self._log.debug("bad tool output: {0}", text)
|
||||
raise ReplayGainError("mp3gain failed")
|
||||
d = {
|
||||
"file": parts[0],
|
||||
"mp3gain": int(parts[1]),
|
||||
"gain": float(parts[2]),
|
||||
"peak": float(parts[3]) / (1 << 15),
|
||||
"maxgain": int(parts[4]),
|
||||
"mingain": int(parts[5]),
|
||||
}
|
||||
out.append(Gain(d["gain"], d["peak"]))
|
||||
|
||||
# _file = parts[0]
|
||||
# _mp3gain = int(parts[1])
|
||||
gain = float(parts[2])
|
||||
peak = float(parts[3]) / (1 << 15)
|
||||
# _maxgain = int(parts[4])
|
||||
# _mingain = int(parts[5])
|
||||
|
||||
out.append(Gain(gain, peak))
|
||||
return out
|
||||
|
||||
|
||||
|
|
@ -665,7 +722,7 @@ class CommandBackend(Backend):
|
|||
class GStreamerBackend(Backend):
|
||||
NAME = "gstreamer"
|
||||
|
||||
def __init__(self, config, log):
|
||||
def __init__(self, config: ConfigView, log: Logger):
|
||||
super().__init__(config, log)
|
||||
self._import_gst()
|
||||
|
||||
|
|
@ -722,7 +779,7 @@ class GStreamerBackend(Backend):
|
|||
|
||||
self._main_loop = self.GLib.MainLoop()
|
||||
|
||||
self._files = []
|
||||
self._files: List[bytes] = []
|
||||
|
||||
def _import_gst(self):
|
||||
"""Import the necessary GObject-related modules and assign `Gst`
|
||||
|
|
@ -754,14 +811,17 @@ class GStreamerBackend(Backend):
|
|||
self.GLib = GLib
|
||||
self.Gst = Gst
|
||||
|
||||
def compute(self, items, target_level, album):
|
||||
def compute(self, items: Sequence[Item], target_level: float, album: bool):
|
||||
if len(items) == 0:
|
||||
return
|
||||
|
||||
self._error = None
|
||||
self._files = [i.path for i in items]
|
||||
|
||||
self._file_tags = collections.defaultdict(dict)
|
||||
# FIXME: Turn this into DefaultDict[bytes, Gain]
|
||||
self._file_tags: DefaultDict[
|
||||
bytes, Dict[str, float]
|
||||
] = collections.defaultdict(dict)
|
||||
|
||||
self._rg.set_property("reference-level", target_level)
|
||||
|
||||
|
|
@ -773,7 +833,7 @@ class GStreamerBackend(Backend):
|
|||
if self._error is not None:
|
||||
raise self._error
|
||||
|
||||
def compute_track_gain(self, task):
|
||||
def compute_track_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the track gain for the tracks belonging to `task`, and sets
|
||||
the `track_gains` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -793,7 +853,7 @@ class GStreamerBackend(Backend):
|
|||
task.track_gains = ret
|
||||
return task
|
||||
|
||||
def compute_album_gain(self, task):
|
||||
def compute_album_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the album gain for the album belonging to `task`, and sets
|
||||
the `album_gain` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -876,7 +936,7 @@ class GStreamerBackend(Backend):
|
|||
|
||||
tags.foreach(handle_tag, None)
|
||||
|
||||
def _set_first_file(self):
|
||||
def _set_first_file(self) -> bool:
|
||||
if len(self._files) == 0:
|
||||
return False
|
||||
|
||||
|
|
@ -886,7 +946,7 @@ class GStreamerBackend(Backend):
|
|||
self._pipe.set_state(self.Gst.State.PLAYING)
|
||||
return True
|
||||
|
||||
def _set_file(self):
|
||||
def _set_file(self) -> bool:
|
||||
"""Initialize the filesrc element with the next file to be analyzed."""
|
||||
# No more files, we're done
|
||||
if len(self._files) == 0:
|
||||
|
|
@ -919,7 +979,7 @@ class GStreamerBackend(Backend):
|
|||
|
||||
return True
|
||||
|
||||
def _set_next_file(self):
|
||||
def _set_next_file(self) -> bool:
|
||||
"""Set the next file to be analyzed while keeping the pipeline
|
||||
in the PAUSED state so that the rganalysis element can correctly
|
||||
handle album gain.
|
||||
|
|
@ -960,7 +1020,7 @@ class AudioToolsBackend(Backend):
|
|||
|
||||
NAME = "audiotools"
|
||||
|
||||
def __init__(self, config, log):
|
||||
def __init__(self, config: ConfigView, log: Logger):
|
||||
super().__init__(config, log)
|
||||
self._import_audiotools()
|
||||
|
||||
|
|
@ -980,7 +1040,7 @@ class AudioToolsBackend(Backend):
|
|||
self._mod_audiotools = audiotools
|
||||
self._mod_replaygain = audiotools.replaygain
|
||||
|
||||
def open_audio_file(self, item):
|
||||
def open_audio_file(self, item: Item):
|
||||
"""Open the file to read the PCM stream from the using
|
||||
``item.path``.
|
||||
|
||||
|
|
@ -998,7 +1058,7 @@ class AudioToolsBackend(Backend):
|
|||
|
||||
return audiofile
|
||||
|
||||
def init_replaygain(self, audiofile, item):
|
||||
def init_replaygain(self, audiofile, item: Item):
|
||||
"""Return an initialized :class:`audiotools.replaygain.ReplayGain`
|
||||
instance, which requires the sample rate of the song(s) on which
|
||||
the ReplayGain values will be computed. The item is passed in case
|
||||
|
|
@ -1015,7 +1075,7 @@ class AudioToolsBackend(Backend):
|
|||
return
|
||||
return rg
|
||||
|
||||
def compute_track_gain(self, task):
|
||||
def compute_track_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the track gain for the tracks belonging to `task`, and sets
|
||||
the `track_gains` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -1025,14 +1085,14 @@ class AudioToolsBackend(Backend):
|
|||
task.track_gains = gains
|
||||
return task
|
||||
|
||||
def _with_target_level(self, gain, target_level):
|
||||
def _with_target_level(self, gain: float, target_level: float):
|
||||
"""Return `gain` relative to `target_level`.
|
||||
|
||||
Assumes `gain` is relative to 89 db.
|
||||
"""
|
||||
return gain + (target_level - 89)
|
||||
|
||||
def _title_gain(self, rg, audiofile, target_level):
|
||||
def _title_gain(self, rg, audiofile, target_level: float):
|
||||
"""Get the gain result pair from PyAudioTools using the `ReplayGain`
|
||||
instance `rg` for the given `audiofile`.
|
||||
|
||||
|
|
@ -1050,7 +1110,7 @@ class AudioToolsBackend(Backend):
|
|||
raise ReplayGainError("audiotools audio data error")
|
||||
return self._with_target_level(gain, target_level), peak
|
||||
|
||||
def _compute_track_gain(self, item, target_level):
|
||||
def _compute_track_gain(self, item: Item, target_level: float):
|
||||
"""Compute ReplayGain value for the requested item.
|
||||
|
||||
:rtype: :class:`Gain`
|
||||
|
|
@ -1073,7 +1133,7 @@ class AudioToolsBackend(Backend):
|
|||
)
|
||||
return Gain(gain=rg_track_gain, peak=rg_track_peak)
|
||||
|
||||
def compute_album_gain(self, task):
|
||||
def compute_album_gain(self, task: AnyRgTask) -> AnyRgTask:
|
||||
"""Computes the album gain for the album belonging to `task`, and sets
|
||||
the `album_gain` attribute on the task. Returns `task`.
|
||||
"""
|
||||
|
|
@ -1121,7 +1181,7 @@ class ExceptionWatcher(Thread):
|
|||
Once an exception occurs, raise it and execute a callback.
|
||||
"""
|
||||
|
||||
def __init__(self, queue, callback):
|
||||
def __init__(self, queue: queue.Queue, callback: Callable[[], None]):
|
||||
self._queue = queue
|
||||
self._callback = callback
|
||||
self._stopevent = Event()
|
||||
|
|
@ -1138,20 +1198,20 @@ class ExceptionWatcher(Thread):
|
|||
# whether `_stopevent` is set
|
||||
pass
|
||||
|
||||
def join(self, timeout=None):
|
||||
def join(self, timeout: Optional[float] = None):
|
||||
self._stopevent.set()
|
||||
Thread.join(self, timeout)
|
||||
|
||||
|
||||
# Main plugin logic.
|
||||
|
||||
BACKEND_CLASSES = [
|
||||
BACKEND_CLASSES: List[Type[Backend]] = [
|
||||
CommandBackend,
|
||||
GStreamerBackend,
|
||||
AudioToolsBackend,
|
||||
FfmpegBackend,
|
||||
]
|
||||
BACKENDS = {b.NAME: b for b in BACKEND_CLASSES}
|
||||
BACKENDS: Dict[str, Type[Backend]] = {b.NAME: b for b in BACKEND_CLASSES}
|
||||
|
||||
|
||||
class ReplayGainPlugin(BeetsPlugin):
|
||||
|
|
@ -1178,7 +1238,7 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
|
||||
# FIXME: Consider renaming the configuration option and deprecating the
|
||||
# old name 'overwrite'.
|
||||
self.force_on_import = self.config["overwrite"].get(bool)
|
||||
self.force_on_import = cast(bool, self.config["overwrite"].get(bool))
|
||||
|
||||
# Remember which backend is used for CLI feedback
|
||||
self.backend_name = self.config["backend"].as_str()
|
||||
|
|
@ -1224,21 +1284,21 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
# Start threadpool lazily.
|
||||
self.pool = None
|
||||
|
||||
def should_use_r128(self, item):
|
||||
def should_use_r128(self, item: Item) -> bool:
|
||||
"""Checks the plugin setting to decide whether the calculation
|
||||
should be done using the EBU R128 standard and use R128_ tags instead.
|
||||
"""
|
||||
return item.format in self.r128_whitelist
|
||||
|
||||
@staticmethod
|
||||
def has_r128_track_data(item):
|
||||
def has_r128_track_data(item: Item) -> bool:
|
||||
return item.r128_track_gain is not None
|
||||
|
||||
@staticmethod
|
||||
def has_rg_track_data(item):
|
||||
def has_rg_track_data(item: Item) -> bool:
|
||||
return item.rg_track_gain is not None and item.rg_track_peak is not None
|
||||
|
||||
def track_requires_gain(self, item):
|
||||
def track_requires_gain(self, item: Item) -> bool:
|
||||
if self.should_use_r128(item):
|
||||
if not self.has_r128_track_data(item):
|
||||
return True
|
||||
|
|
@ -1249,17 +1309,17 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
return False
|
||||
|
||||
@staticmethod
|
||||
def has_r128_album_data(item):
|
||||
def has_r128_album_data(item: Item) -> bool:
|
||||
return (
|
||||
item.r128_track_gain is not None
|
||||
and item.r128_album_gain is not None
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def has_rg_album_data(item):
|
||||
def has_rg_album_data(item: Item) -> bool:
|
||||
return item.rg_album_gain is not None and item.rg_album_peak is not None
|
||||
|
||||
def album_requires_gain(self, album):
|
||||
def album_requires_gain(self, album: Album) -> bool:
|
||||
# Skip calculating gain only when *all* files don't need
|
||||
# recalculation. This way, if any file among an album's tracks
|
||||
# needs recalculation, we still get an accurate album gain
|
||||
|
|
@ -1274,7 +1334,12 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
|
||||
return False
|
||||
|
||||
def create_task(self, items, use_r128, album=None):
|
||||
def create_task(
|
||||
self,
|
||||
items: Sequence[Item],
|
||||
use_r128: bool,
|
||||
album: Optional[Album] = None,
|
||||
) -> RgTask:
|
||||
if use_r128:
|
||||
return R128Task(
|
||||
items,
|
||||
|
|
@ -1293,7 +1358,7 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
self._log,
|
||||
)
|
||||
|
||||
def handle_album(self, album, write, force=False):
|
||||
def handle_album(self, album: Album, write: bool, force: bool = False):
|
||||
"""Compute album and track replay gain store it in all of the
|
||||
album's items.
|
||||
|
||||
|
|
@ -1316,7 +1381,7 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
|
||||
self._log.info("analyzing {0}", album)
|
||||
|
||||
discs = {}
|
||||
discs: Dict[int, List[Item]] = {}
|
||||
if self.config["per_disc"].get(bool):
|
||||
for item in album.items():
|
||||
if discs.get(item.disc) is None:
|
||||
|
|
@ -1325,6 +1390,9 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
else:
|
||||
discs[1] = album.items()
|
||||
|
||||
def store_cb(task: RgTask):
|
||||
task.store(write)
|
||||
|
||||
for discnumber, items in discs.items():
|
||||
task = self.create_task(items, use_r128, album=album)
|
||||
try:
|
||||
|
|
@ -1332,14 +1400,14 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
self.backend_instance.compute_album_gain,
|
||||
args=[task],
|
||||
kwds={},
|
||||
callback=lambda task: task.store(write),
|
||||
callback=store_cb,
|
||||
)
|
||||
except ReplayGainError as e:
|
||||
self._log.info("ReplayGain error: {0}", e)
|
||||
except FatalReplayGainError as e:
|
||||
raise ui.UserError(f"Fatal replay gain error: {e}")
|
||||
|
||||
def handle_track(self, item, write, force=False):
|
||||
def handle_track(self, item: Item, write: bool, force: bool = False):
|
||||
"""Compute track replay gain and store it in the item.
|
||||
|
||||
If ``write`` is truthy then ``item.write()`` is called to write
|
||||
|
|
@ -1352,24 +1420,27 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
|
||||
use_r128 = self.should_use_r128(item)
|
||||
|
||||
def store_cb(task: RgTask):
|
||||
task.store(write)
|
||||
|
||||
task = self.create_task([item], use_r128)
|
||||
try:
|
||||
self._apply(
|
||||
self.backend_instance.compute_track_gain,
|
||||
args=[task],
|
||||
kwds={},
|
||||
callback=lambda task: task.store(write),
|
||||
callback=store_cb,
|
||||
)
|
||||
except ReplayGainError as e:
|
||||
self._log.info("ReplayGain error: {0}", e)
|
||||
except FatalReplayGainError as e:
|
||||
raise ui.UserError(f"Fatal replay gain error: {e}")
|
||||
|
||||
def open_pool(self, threads):
|
||||
def open_pool(self, threads: int):
|
||||
"""Open a `ThreadPool` instance in `self.pool`"""
|
||||
if self.pool is None and self.backend_instance.do_parallel:
|
||||
self.pool = ThreadPool(threads)
|
||||
self.exc_queue = queue.Queue()
|
||||
self.exc_queue: queue.Queue[Exception] = queue.Queue()
|
||||
|
||||
signal.signal(signal.SIGINT, self._interrupt)
|
||||
|
||||
|
|
@ -1379,7 +1450,13 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
)
|
||||
self.exc_watcher.start()
|
||||
|
||||
def _apply(self, func, args, kwds, callback):
|
||||
def _apply(
|
||||
self,
|
||||
func: Callable[..., AnyRgTask],
|
||||
args: List[Any],
|
||||
kwds: Dict[str, Any],
|
||||
callback: Callable[[AnyRgTask], Any],
|
||||
):
|
||||
if self.pool is not None:
|
||||
|
||||
def handle_exc(exc):
|
||||
|
|
@ -1425,9 +1502,9 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
self.exc_watcher.join()
|
||||
self.pool = None
|
||||
|
||||
def import_begin(self, session):
|
||||
def import_begin(self, session: ImportSession):
|
||||
"""Handle `import_begin` event -> open pool"""
|
||||
threads = self.config["threads"].get(int)
|
||||
threads = cast(int, self.config["threads"].get(int))
|
||||
|
||||
if (
|
||||
self.config["parallel_on_import"]
|
||||
|
|
@ -1440,22 +1517,31 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
"""Handle `import` event -> close pool"""
|
||||
self.close_pool()
|
||||
|
||||
def imported(self, session, task):
|
||||
def imported(self, session: ImportSession, task: ImportTask):
|
||||
"""Add replay gain info to items or albums of ``task``."""
|
||||
if self.config["auto"]:
|
||||
if task.is_album:
|
||||
self.handle_album(task.album, False, self.force_on_import)
|
||||
else:
|
||||
# Should be a SingletonImportTask
|
||||
assert hasattr(task, "item")
|
||||
self.handle_track(task.item, False, self.force_on_import)
|
||||
|
||||
def command_func(self, lib, opts, args):
|
||||
def command_func(
|
||||
self,
|
||||
lib: Library,
|
||||
opts: optparse.Values,
|
||||
args: List[str],
|
||||
):
|
||||
try:
|
||||
write = ui.should_write(opts.write)
|
||||
force = opts.force
|
||||
|
||||
# Bypass self.open_pool() if called with `--threads 0`
|
||||
if opts.threads != 0:
|
||||
threads = opts.threads or self.config["threads"].get(int)
|
||||
threads = opts.threads or cast(
|
||||
int, self.config["threads"].get(int)
|
||||
)
|
||||
self.open_pool(threads)
|
||||
|
||||
if opts.album:
|
||||
|
|
@ -1482,7 +1568,7 @@ class ReplayGainPlugin(BeetsPlugin):
|
|||
# Silence interrupt exceptions
|
||||
pass
|
||||
|
||||
def commands(self):
|
||||
def commands(self) -> List[ui.Subcommand]:
|
||||
"""Return the "replaygain" ui subcommand."""
|
||||
cmd = ui.Subcommand("replaygain", help="analyze for ReplayGain")
|
||||
cmd.parser.add_album_option()
|
||||
|
|
|
|||
|
|
@ -217,6 +217,9 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
elif e.response.status_code == 503:
|
||||
self._log.error("Service Unavailable.")
|
||||
raise SpotifyAPIError("Service Unavailable.")
|
||||
elif e.response.status_code == 502:
|
||||
self._log.error("Bad Gateway.")
|
||||
raise SpotifyAPIError("Bad Gateway.")
|
||||
elif e.response is not None:
|
||||
raise SpotifyAPIError(
|
||||
f"{self.data_source} API error:\n{e.response.text}\n"
|
||||
|
|
@ -658,8 +661,11 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
self._log.debug("No track_id present for: {}", item)
|
||||
continue
|
||||
|
||||
popularity = self.track_popularity(spotify_track_id)
|
||||
popularity, isrc, ean, upc = self.track_info(spotify_track_id)
|
||||
item["spotify_track_popularity"] = popularity
|
||||
item["isrc"] = isrc
|
||||
item["ean"] = ean
|
||||
item["upc"] = upc
|
||||
audio_features = self.track_audio_features(spotify_track_id)
|
||||
if audio_features is None:
|
||||
self._log.info("No audio features found for: {}", item)
|
||||
|
|
@ -674,13 +680,22 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
|
|||
if write:
|
||||
item.try_write()
|
||||
|
||||
def track_popularity(self, track_id=None):
|
||||
"""Fetch a track popularity by its Spotify ID."""
|
||||
def track_info(self, track_id=None):
|
||||
"""Fetch a track's popularity and external IDs using its Spotify ID."""
|
||||
track_data = self._handle_response(
|
||||
requests.get, self.track_url + track_id
|
||||
)
|
||||
self._log.debug("track_data: {}", track_data.get("popularity"))
|
||||
return track_data.get("popularity")
|
||||
self._log.debug(
|
||||
"track_popularity: {} and track_isrc: {}",
|
||||
track_data.get("popularity"),
|
||||
track_data.get("external_ids").get("isrc"),
|
||||
)
|
||||
return (
|
||||
track_data.get("popularity"),
|
||||
track_data.get("external_ids").get("isrc"),
|
||||
track_data.get("external_ids").get("ean"),
|
||||
track_data.get("external_ids").get("upc"),
|
||||
)
|
||||
|
||||
def track_audio_features(self, track_id=None):
|
||||
"""Fetch track audio features by its Spotify ID."""
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ Major new features:
|
|||
|
||||
New features:
|
||||
|
||||
* :doc:`plugins/spotify`: We now fetch track's ISRC, EAN, and UPC identifiers from Spotify when using the ``spotifysync`` command.
|
||||
:bug:`4992`
|
||||
* :doc:`plugins/discogs`: supply a value for the `cover_art_url` attribute, for use by `fetchart`.
|
||||
:bug:`429`
|
||||
* :ref:`update-cmd`: added ```-e``` flag for excluding fields from being updated.
|
||||
|
|
@ -141,9 +143,14 @@ New features:
|
|||
but no thumbnail is provided by CAA. We now fallback to the raw image.
|
||||
* :doc:`/plugins/advancedrewrite`: Add an advanced version of the `rewrite`
|
||||
plugin which allows to replace fields based on a given library query.
|
||||
* :doc:`/plugins/lyrics`: Add LRCLIB as a new lyrics provider and a new
|
||||
`synced` option to prefer synced lyrics over plain lyrics.
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* :doc:`/plugins/deezer`: Improve Deezer plugin error handling and set requests timeout to 10 seconds.
|
||||
:bug:`4983`
|
||||
* :doc:`/plugins/spotify`: Add bad gateway (502) error handling.
|
||||
* :doc:`/plugins/spotify`: Add a limit of 3 retries, instead of retrying endlessly when the API is not available.
|
||||
* Fix a crash when the Spotify API timeouts or does not return a `Retry-After` interval.
|
||||
:bug:`4942`
|
||||
|
|
@ -259,6 +266,8 @@ Bug fixes:
|
|||
a null path that can't be removed.
|
||||
* Fix bug where empty artist and title fields would return None instead of an
|
||||
empty list in the discord plugin. :bug:`4973`
|
||||
* Fix bug regarding displaying tracks that have been changed not being
|
||||
displayed unless the detail configuration is enabled.
|
||||
|
||||
For packagers:
|
||||
|
||||
|
|
|
|||
|
|
@ -2,11 +2,12 @@ Lyrics Plugin
|
|||
=============
|
||||
|
||||
The ``lyrics`` plugin fetches and stores song lyrics from databases on the Web.
|
||||
Namely, the current version of the plugin uses `Genius.com`_, `Tekstowo.pl`_,
|
||||
Namely, the current version of the plugin uses `Genius.com`_, `Tekstowo.pl`_, `LRCLIB`_
|
||||
and, optionally, the Google custom search API.
|
||||
|
||||
.. _Genius.com: https://genius.com/
|
||||
.. _Tekstowo.pl: https://www.tekstowo.pl/
|
||||
.. _LRCLIB: https://lrclib.net/
|
||||
|
||||
|
||||
Fetch Lyrics During Import
|
||||
|
|
@ -58,11 +59,12 @@ configuration file. The available options are:
|
|||
sources known to be scrapeable.
|
||||
- **sources**: List of sources to search for lyrics. An asterisk ``*`` expands
|
||||
to all available sources.
|
||||
Default: ``google genius tekstowo``, i.e., all the available sources. The
|
||||
Default: ``google genius tekstowo lrclib``, i.e., all the available sources. The
|
||||
``google`` source will be automatically deactivated if no ``google_API_key``
|
||||
is setup.
|
||||
The ``google``, ``genius``, and ``tekstowo`` sources will only be enabled if
|
||||
BeautifulSoup is installed.
|
||||
- **synced**: Prefer synced lyrics over plain lyrics if a source offers them. Currently `lrclib` is the only source that provides them. Default: `no`.
|
||||
|
||||
Here's an example of ``config.yaml``::
|
||||
|
||||
|
|
|
|||
|
|
@ -466,7 +466,9 @@ example ``blue`` will become ``['blue']``.
|
|||
terminal_width
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
Controls line wrapping. Defaults to ``80`` characters::
|
||||
Controls line wrapping on non-Unix systems. On Unix systems, the width of the
|
||||
terminal is detected automatically. If this fails, or on non-Unix systems, the
|
||||
specified value is used as a fallback. Defaults to ``80`` characters::
|
||||
|
||||
ui:
|
||||
terminal_width: 80
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ from test import _common
|
|||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import confuse
|
||||
import requests
|
||||
|
||||
from beets import logging
|
||||
from beets.library import Item
|
||||
|
|
@ -34,6 +35,7 @@ raw_backend = lyrics.Backend({}, log)
|
|||
google = lyrics.Google(MagicMock(), log)
|
||||
genius = lyrics.Genius(MagicMock(), log)
|
||||
tekstowo = lyrics.Tekstowo(MagicMock(), log)
|
||||
lrclib = lyrics.LRCLib(MagicMock(), log)
|
||||
|
||||
|
||||
class LyricsPluginTest(unittest.TestCase):
|
||||
|
|
@ -677,6 +679,101 @@ class TekstowoIntegrationTest(TekstowoBaseTest, LyricsAssertions):
|
|||
self.assertEqual(lyrics, None)
|
||||
|
||||
|
||||
# test LRCLib backend
|
||||
|
||||
|
||||
class LRCLibLyricsTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.plugin = lyrics.LyricsPlugin()
|
||||
lrclib.config = self.plugin.config
|
||||
|
||||
@patch("beetsplug.lyrics.requests.get")
|
||||
def test_fetch_synced_lyrics(self, mock_get):
|
||||
mock_response = {
|
||||
"syncedLyrics": "[00:00.00] la la la",
|
||||
"plainLyrics": "la la la",
|
||||
}
|
||||
mock_get.return_value.json.return_value = mock_response
|
||||
mock_get.return_value.status_code = 200
|
||||
|
||||
lyrics = lrclib.fetch("la", "la", "la", 999)
|
||||
self.assertEqual(lyrics, mock_response["plainLyrics"])
|
||||
|
||||
self.plugin.config["synced"] = True
|
||||
lyrics = lrclib.fetch("la", "la", "la", 999)
|
||||
self.assertEqual(lyrics, mock_response["syncedLyrics"])
|
||||
|
||||
@patch("beetsplug.lyrics.requests.get")
|
||||
def test_fetch_plain_lyrics(self, mock_get):
|
||||
mock_response = {
|
||||
"syncedLyrics": "",
|
||||
"plainLyrics": "la la la",
|
||||
}
|
||||
mock_get.return_value.json.return_value = mock_response
|
||||
mock_get.return_value.status_code = 200
|
||||
|
||||
lyrics = lrclib.fetch("la", "la", "la", 999)
|
||||
|
||||
self.assertEqual(lyrics, mock_response["plainLyrics"])
|
||||
|
||||
@patch("beetsplug.lyrics.requests.get")
|
||||
def test_fetch_not_found(self, mock_get):
|
||||
mock_response = {
|
||||
"statusCode": 404,
|
||||
"error": "Not Found",
|
||||
"message": "Failed to find specified track",
|
||||
}
|
||||
mock_get.return_value.json.return_value = mock_response
|
||||
mock_get.return_value.status_code = 404
|
||||
|
||||
lyrics = lrclib.fetch("la", "la", "la", 999)
|
||||
|
||||
self.assertIsNone(lyrics)
|
||||
|
||||
@patch("beetsplug.lyrics.requests.get")
|
||||
def test_fetch_exception(self, mock_get):
|
||||
mock_get.side_effect = requests.RequestException
|
||||
|
||||
lyrics = lrclib.fetch("la", "la", "la", 999)
|
||||
|
||||
self.assertIsNone(lyrics)
|
||||
|
||||
|
||||
class LRCLibIntegrationTest(LyricsAssertions):
|
||||
def setUp(self):
|
||||
self.plugin = lyrics.LyricsPlugin()
|
||||
lrclib.config = self.plugin.config
|
||||
|
||||
@unittest.skipUnless(
|
||||
os.environ.get("INTEGRATION_TEST", "0") == "1",
|
||||
"integration testing not enabled",
|
||||
)
|
||||
def test_track_with_lyrics(self):
|
||||
lyrics = lrclib.fetch("Boy in Space", "u n eye", "Live EP", 160)
|
||||
self.assertLyricsContentOk("u n eye", lyrics)
|
||||
|
||||
@unittest.skipUnless(
|
||||
os.environ.get("INTEGRATION_TEST", "0") == "1",
|
||||
"integration testing not enabled",
|
||||
)
|
||||
def test_instrumental_track(self):
|
||||
lyrics = lrclib.fetch(
|
||||
"Kelly Bailey",
|
||||
"Black Mesa Inbound",
|
||||
"Half Life 2 Soundtrack",
|
||||
134,
|
||||
)
|
||||
self.assertIsNone(lyrics)
|
||||
|
||||
@unittest.skipUnless(
|
||||
os.environ.get("INTEGRATION_TEST", "0") == "1",
|
||||
"integration testing not enabled",
|
||||
)
|
||||
def test_nonexistent_track(self):
|
||||
lyrics = lrclib.fetch("blah", "blah", "blah", 999)
|
||||
self.assertIsNone(lyrics)
|
||||
|
||||
|
||||
# test utilities
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue