diff --git a/beetsplug/lastgenre/__init__.py b/beetsplug/lastgenre/__init__.py index 1c91688a6..6a61589f7 100644 --- a/beetsplug/lastgenre/__init__.py +++ b/beetsplug/lastgenre/__init__.py @@ -24,76 +24,23 @@ https://gist.github.com/1241307 from __future__ import annotations -import os -import traceback from functools import singledispatchmethod from pathlib import Path -from typing import TYPE_CHECKING, Any - -import pylast -import yaml +from typing import TYPE_CHECKING from beets import config, library, plugins, ui from beets.library import Album, Item from beets.util import plurality, unique_list +from .client import LastFmClient +from .loaders import DataFileLoader +from .utils import make_tunelog + if TYPE_CHECKING: import optparse - from collections.abc import Callable from beets.library import LibModel -LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY) - -PYLAST_EXCEPTIONS = ( - pylast.WSError, - pylast.MalformedResponseError, - pylast.NetworkError, -) - - -# Canonicalization tree processing. - - -def flatten_tree( - elem: dict[Any, Any] | list[Any] | str, - path: list[str], - branches: list[list[str]], -) -> None: - """Flatten nested lists/dictionaries into lists of strings - (branches). - """ - if not path: - path = [] - - if isinstance(elem, dict): - for k, v in elem.items(): - flatten_tree(v, [*path, k], branches) - elif isinstance(elem, list): - for sub in elem: - flatten_tree(sub, path, branches) - else: - branches.append([*path, str(elem)]) - - -def find_parents(candidate: str, branches: list[list[str]]) -> list[str]: - """Find parents genre of a given genre, ordered from the closest to - the further parent. - """ - for branch in branches: - try: - idx = branch.index(candidate.lower()) - return list(reversed(branch[: idx + 1])) - except ValueError: - continue - return [candidate] - - -# Main plugin logic. - -WHITELIST = os.path.join(os.path.dirname(__file__), "genres.txt") -C14N_TREE = os.path.join(os.path.dirname(__file__), "genres-tree.yaml") - class LastGenrePlugin(plugins.BeetsPlugin): def __init__(self) -> None: @@ -123,55 +70,17 @@ class LastGenrePlugin(plugins.BeetsPlugin): if self.config["auto"]: self.import_stages = [self.imported] - self._genre_cache: dict[str, list[str]] = {} - self.whitelist = self._load_whitelist() - self.c14n_branches, self.canonicalize = self._load_c14n_tree() + self._tunelog = make_tunelog(self._log) + self.client = LastFmClient( + self._log, self.config["min_weight"].get(int) + ) - def _load_whitelist(self) -> set[str]: - """Load the whitelist from a text file. - - Default whitelist is used if config is True, empty string or set to "nothing". - """ - whitelist = set() - wl_filename = self.config["whitelist"].get() - if wl_filename in (True, "", None): # Indicates the default whitelist. - wl_filename = WHITELIST - if wl_filename: - self._log.debug("Loading whitelist {}", wl_filename) - text = Path(wl_filename).expanduser().read_text(encoding="utf-8") - for line in text.splitlines(): - if (line := line.strip().lower()) and not line.startswith("#"): - whitelist.add(line) - - return whitelist - - def _load_c14n_tree(self) -> tuple[list[list[str]], bool]: - """Load the canonicalization tree from a YAML file. - - Default tree is used if config is True, empty string, set to "nothing" - or if prefer_specific is enabled. - """ - c14n_branches: list[list[str]] = [] - c14n_filename = self.config["canonical"].get() - canonicalize = c14n_filename is not False - # Default tree - if c14n_filename in (True, "", None) or ( - # prefer_specific requires a tree, load default tree - not canonicalize and self.config["prefer_specific"].get() - ): - c14n_filename = C14N_TREE - # Read the tree - if c14n_filename: - self._log.debug("Loading canonicalization tree {}", c14n_filename) - with Path(c14n_filename).expanduser().open(encoding="utf-8") as f: - genres_tree = yaml.safe_load(f) - flatten_tree(genres_tree, [], c14n_branches) - return c14n_branches, canonicalize - - def _tunelog(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log tuning messages at DEBUG level when verbosity level is high enough.""" - if config["verbose"].as_number() >= 3: - self._log.debug(msg, *args, **kwargs) + loader = DataFileLoader.from_config( + self.config, self._log, Path(__file__).parent + ) + self.whitelist = loader.whitelist + self.c14n_branches = loader.c14n_branches + self.canonicalize = loader.canonicalize @property def sources(self) -> tuple[str, ...]: @@ -187,7 +96,7 @@ class LastGenrePlugin(plugins.BeetsPlugin): return ("artist",) return tuple() - # More canonicalization and general helpers. + # Canonicalization and filtering. def _get_depth(self, tag: str) -> int | None: """Find the depth of a tag in the genres tree.""" @@ -207,6 +116,17 @@ class LastGenrePlugin(plugins.BeetsPlugin): depth_tag_pairs.sort(reverse=True) return [p[1] for p in depth_tag_pairs] + @staticmethod + def find_parents(candidate: str, branches: list[list[str]]) -> list[str]: + """Find parent genres of a given genre, ordered from closest to furthest.""" + for branch in branches: + try: + idx = branch.index(candidate.lower()) + return list(reversed(branch[: idx + 1])) + except ValueError: + continue + return [candidate] + def _resolve_genres(self, tags: list[str]) -> list[str]: """Canonicalize, sort and filter a list of genres. @@ -239,11 +159,11 @@ class LastGenrePlugin(plugins.BeetsPlugin): if self.whitelist: parents = [ x - for x in find_parents(tag, self.c14n_branches) + for x in self.find_parents(tag, self.c14n_branches) if self._is_valid(x) ] else: - parents = [find_parents(tag, self.c14n_branches)[-1]] + parents = [self.find_parents(tag, self.c14n_branches)[-1]] tags_all += parents # Stop if we have enough tags already, unless we need to find @@ -266,15 +186,6 @@ class LastGenrePlugin(plugins.BeetsPlugin): valid_tags = [t for t in tags if self._is_valid(t)] return valid_tags[:count] - def fetch_genre( - self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track - ) -> list[str]: - """Return genres for a pylast entity. Returns an empty list if - no suitable genres are found. - """ - min_weight = self.config["min_weight"].get(int) - return self._tags_for(lastfm_obj, min_weight) - def _is_valid(self, genre: str) -> bool: """Check if the genre is valid. @@ -285,48 +196,6 @@ class LastGenrePlugin(plugins.BeetsPlugin): return True return False - # Cached last.fm entity lookups. - - def _last_lookup( - self, entity: str, method: Callable[..., Any], *args: str - ) -> list[str]: - """Get genres based on the named entity using the callable `method` - whose arguments are given in the sequence `args`. The genre lookup - is cached based on the entity name and the arguments. - - Before the lookup, each argument has the "-" Unicode character replaced - with its rough ASCII equivalents in order to return better results from - the Last.fm database. - """ - # Shortcut if we're missing metadata. - if any(not s for s in args): - return [] - - key = f"{entity}.{'-'.join(str(a) for a in args)}" - if key not in self._genre_cache: - args_replaced = [a.replace("\u2010", "-") for a in args] - self._genre_cache[key] = self.fetch_genre(method(*args_replaced)) - - genre = self._genre_cache[key] - self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre) - return genre - - def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]: - """Return genres from Last.fm for the album by albumartist.""" - return self._last_lookup( - "album", LASTFM.get_album, albumartist, albumtitle - ) - - def fetch_artist_genre(self, artist: str) -> list[str]: - """Return genres from Last.fm for the artist.""" - return self._last_lookup("artist", LASTFM.get_artist, artist) - - def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]: - """Return genres from Last.fm for the track by artist.""" - return self._last_lookup( - "track", LASTFM.get_track, trackartist, tracktitle - ) - # Main processing: _get_genre() and helpers. def _format_and_stringify(self, tags: list[str]) -> str: @@ -414,14 +283,18 @@ class LastGenrePlugin(plugins.BeetsPlugin): # Run through stages: track, album, artist, # album artist, or most popular track genre. if isinstance(obj, library.Item) and "track" in self.sources: - if new_genres := self.fetch_track_genre(obj.artist, obj.title): + if new_genres := self.client.fetch_track_genre( + obj.artist, obj.title + ): if result := _try_resolve_stage( "track", keep_genres, new_genres ): return result if "album" in self.sources: - if new_genres := self.fetch_album_genre(obj.albumartist, obj.album): + if new_genres := self.client.fetch_album_genre( + obj.albumartist, obj.album + ): if result := _try_resolve_stage( "album", keep_genres, new_genres ): @@ -430,10 +303,10 @@ class LastGenrePlugin(plugins.BeetsPlugin): if "artist" in self.sources: new_genres = [] if isinstance(obj, library.Item): - new_genres = self.fetch_artist_genre(obj.artist) + new_genres = self.client.fetch_artist_genre(obj.artist) stage_label = "artist" elif obj.albumartist != config["va_name"].as_str(): - new_genres = self.fetch_artist_genre(obj.albumartist) + new_genres = self.client.fetch_artist_genre(obj.albumartist) stage_label = "album artist" if not new_genres: self._tunelog( @@ -443,9 +316,12 @@ class LastGenrePlugin(plugins.BeetsPlugin): ) for albumartist in obj.albumartists: self._tunelog( - 'Fetching artist genre for "{}"', albumartist + 'Fetching artist genre for "{}"', + albumartist, + ) + new_genres += self.client.fetch_artist_genre( + albumartist ) - new_genres += self.fetch_artist_genre(albumartist) if new_genres: stage_label = "multi-valued album artist" else: @@ -455,11 +331,11 @@ class LastGenrePlugin(plugins.BeetsPlugin): for item in obj.items(): item_genre = None if "track" in self.sources: - item_genre = self.fetch_track_genre( + item_genre = self.client.fetch_track_genre( item.artist, item.title ) if not item_genre: - item_genre = self.fetch_artist_genre(item.artist) + item_genre = self.client.fetch_artist_genre(item.artist) if item_genre: item_genres += item_genre if item_genres: @@ -482,13 +358,12 @@ class LastGenrePlugin(plugins.BeetsPlugin): if obj.genre and self.config["keep_existing"]: if not self.whitelist or self._is_valid(obj.genre.lower()): return obj.genre, "original fallback" - else: - # If the original genre doesn't match a whitelisted genre, check - # if we can canonicalize it to find a matching, whitelisted genre! - if result := _try_resolve_stage( - "original fallback", keep_genres, [] - ): - return result + # If the original genre doesn't match a whitelisted genre, check + # if we can canonicalize it to find a matching, whitelisted genre! + if result := _try_resolve_stage( + "original fallback", keep_genres, [] + ): + return result # Return fallback string. if fallback := self.config["fallback"].get(): @@ -607,43 +482,3 @@ class LastGenrePlugin(plugins.BeetsPlugin): self, session: library.Session, task: library.ImportTask ) -> None: self._process(task.album if task.is_album else task.item, write=False) - - def _tags_for( - self, - obj: pylast.Album | pylast.Artist | pylast.Track, - min_weight: int | None = None, - ) -> list[str]: - """Core genre identification routine. - - Given a pylast entity (album or track), return a list of - tag names for that entity. Return an empty list if the entity is - not found or another error occurs. - - If `min_weight` is specified, tags are filtered by weight. - """ - # Work around an inconsistency in pylast where - # Album.get_top_tags() does not return TopItem instances. - # https://github.com/pylast/pylast/issues/86 - obj_to_query: Any = obj - if isinstance(obj, pylast.Album): - obj_to_query = super(pylast.Album, obj) - - try: - res: Any = obj_to_query.get_top_tags() - except PYLAST_EXCEPTIONS as exc: - self._log.debug("last.fm error: {}", exc) - return [] - except Exception as exc: - # Isolate bugs in pylast. - self._log.debug("{}", traceback.format_exc()) - self._log.error("error in pylast library: {}", exc) - return [] - - # Filter by weight (optionally). - if min_weight: - res = [el for el in res if (int(el.weight or 0)) >= min_weight] - - # Get strings from tags. - tags: list[str] = [el.item.get_name().lower() for el in res] - - return tags diff --git a/beetsplug/lastgenre/client.py b/beetsplug/lastgenre/client.py new file mode 100644 index 000000000..21a0bff72 --- /dev/null +++ b/beetsplug/lastgenre/client.py @@ -0,0 +1,142 @@ +# This file is part of beets. +# Copyright 2016, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + + +"""Last.fm API client for genre lookups.""" + +from __future__ import annotations + +import traceback +from typing import TYPE_CHECKING, Any + +import pylast + +from beets import plugins + +from .utils import make_tunelog + +if TYPE_CHECKING: + from collections.abc import Callable + + from beets.logging import Logger + +LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY) + +PYLAST_EXCEPTIONS = ( + pylast.WSError, + pylast.MalformedResponseError, + pylast.NetworkError, +) + + +class LastFmClient: + """Client for fetching genres from Last.fm.""" + + def __init__(self, log: Logger, min_weight: int): + """Initialize the client. + + The min_weight parameter filters tags by their minimum weight. + """ + self._log = log + self._tunelog = make_tunelog(log) + self._min_weight = min_weight + self._genre_cache: dict[str, list[str]] = {} + + def fetch_genre( + self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track + ) -> list[str]: + """Return genres for a pylast entity. Returns an empty list if + no suitable genres are found. + """ + return self._tags_for(lastfm_obj, self._min_weight) + + def _tags_for( + self, + obj: pylast.Album | pylast.Artist | pylast.Track, + min_weight: int | None = None, + ) -> list[str]: + """Core genre identification routine. + + Given a pylast entity (album or track), return a list of + tag names for that entity. Return an empty list if the entity is + not found or another error occurs. + + If `min_weight` is specified, tags are filtered by weight. + """ + # Work around an inconsistency in pylast where + # Album.get_top_tags() does not return TopItem instances. + # https://github.com/pylast/pylast/issues/86 + obj_to_query: Any = obj + if isinstance(obj, pylast.Album): + obj_to_query = super(pylast.Album, obj) + + try: + res: Any = obj_to_query.get_top_tags() + except PYLAST_EXCEPTIONS as exc: + self._log.debug("last.fm error: {}", exc) + return [] + except Exception as exc: + # Isolate bugs in pylast. + self._log.debug("{}", traceback.format_exc()) + self._log.error("error in pylast library: {}", exc) + return [] + + # Filter by weight (optionally). + if min_weight: + res = [el for el in res if (int(el.weight or 0)) >= min_weight] + + # Get strings from tags. + tags: list[str] = [el.item.get_name().lower() for el in res] + + return tags + + def _last_lookup( + self, entity: str, method: Callable[..., Any], *args: str + ) -> list[str]: + """Get genres based on the named entity using the callable `method` + whose arguments are given in the sequence `args`. The genre lookup + is cached based on the entity name and the arguments. + + Before the lookup, each argument has the "-" Unicode character replaced + with its rough ASCII equivalents in order to return better results from + the Last.fm database. + """ + # Shortcut if we're missing metadata. + if any(not s for s in args): + return [] + + key = f"{entity}.{'-'.join(str(a) for a in args)}" + if key not in self._genre_cache: + args_replaced = [a.replace("\u2010", "-") for a in args] + self._genre_cache[key] = self.fetch_genre(method(*args_replaced)) + + genre = self._genre_cache[key] + self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre) + return genre + + def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]: + """Return genres from Last.fm for the album by albumartist.""" + return self._last_lookup( + "album", LASTFM.get_album, albumartist, albumtitle + ) + + def fetch_artist_genre(self, artist: str) -> list[str]: + """Return genres from Last.fm for the artist.""" + return self._last_lookup("artist", LASTFM.get_artist, artist) + + def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]: + """Return genres from Last.fm for the track by artist.""" + return self._last_lookup( + "track", LASTFM.get_track, trackartist, tracktitle + ) diff --git a/beetsplug/lastgenre/loaders.py b/beetsplug/lastgenre/loaders.py new file mode 100644 index 000000000..3d650bd5e --- /dev/null +++ b/beetsplug/lastgenre/loaders.py @@ -0,0 +1,149 @@ +# This file is part of beets. +# Copyright 2016, Adrian Sampson. +# Copyright 2026, J0J0 Todos. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + + +"""Data file loaders for the lastgenre plugin.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import yaml + +if TYPE_CHECKING: + from confuse import ConfigView + + from beets.logging import Logger + + +class DataFileLoader: + """Loads genre-related data files for the lastgenre plugin.""" + + def __init__( + self, + log: Logger, + plugin_dir: Path, + whitelist: set[str], + c14n_branches: list[list[str]], + canonicalize: bool, + ): + """Initialize with pre-loaded data. + + Use from_config() classmethod to construct from plugin config. + """ + self._log = log + self._plugin_dir = plugin_dir + self.whitelist = whitelist + self.c14n_branches = c14n_branches + self.canonicalize = canonicalize + + @classmethod + def from_config( + cls, + config: ConfigView, + log: Logger, + plugin_dir: Path, + ) -> DataFileLoader: + """Create a DataFileLoader from plugin configuration. + + Reads config values and loads all data files during construction. + """ + # Default paths + default_whitelist = str(plugin_dir / "genres.txt") + default_tree = str(plugin_dir / "genres-tree.yaml") + + # Load whitelist + whitelist = cls._load_whitelist( + log, config["whitelist"].get(), default_whitelist + ) + + # Load tree + c14n_branches, canonicalize = cls._load_tree( + log, + config["canonical"].get(), + default_tree, + config["prefer_specific"].get(), + ) + + return cls(log, plugin_dir, whitelist, c14n_branches, canonicalize) + + @staticmethod + def _load_whitelist( + log: Logger, config_value: str | bool | None, default_path: str + ) -> set[str]: + """Load the whitelist from a text file. + + Returns set of valid genre names (lowercase). + """ + whitelist = set() + wl_filename = config_value + if wl_filename in (True, "", None): # Indicates the default whitelist. + wl_filename = default_path + if wl_filename: + log.debug("Loading whitelist {}", wl_filename) + text = Path(wl_filename).expanduser().read_text(encoding="utf-8") + for line in text.splitlines(): + if (line := line.strip().lower()) and not line.startswith("#"): + whitelist.add(line) + + return whitelist + + @staticmethod + def _load_tree( + log: Logger, + config_value: str | bool | None, + default_path: str, + prefer_specific: bool, + ) -> tuple[list[list[str]], bool]: + """Load the canonicalization tree from a YAML file. + + Returns tuple of (branches, canonicalize_enabled). + """ + c14n_branches: list[list[str]] = [] + c14n_filename = config_value + canonicalize = c14n_filename is not False + # Default tree + if c14n_filename in (True, "", None) or ( + # prefer_specific requires a tree, load default tree + not canonicalize and prefer_specific + ): + c14n_filename = default_path + # Read the tree + if c14n_filename: + log.debug("Loading canonicalization tree {}", c14n_filename) + with Path(c14n_filename).expanduser().open(encoding="utf-8") as f: + genres_tree = yaml.safe_load(f) + DataFileLoader.flatten_tree(genres_tree, [], c14n_branches) + return c14n_branches, canonicalize + + @staticmethod + def flatten_tree( + elem: dict | list | str, + path: list[str], + branches: list[list[str]], + ) -> None: + """Flatten nested lists/dictionaries into lists of strings (branches).""" + if not path: + path = [] + + if isinstance(elem, dict): + for k, v in elem.items(): + DataFileLoader.flatten_tree(v, [*path, k], branches) + elif isinstance(elem, list): + for sub in elem: + DataFileLoader.flatten_tree(sub, path, branches) + else: + branches.append([*path, str(elem)]) diff --git a/beetsplug/lastgenre/utils.py b/beetsplug/lastgenre/utils.py new file mode 100644 index 000000000..7ae96e11a --- /dev/null +++ b/beetsplug/lastgenre/utils.py @@ -0,0 +1,41 @@ +# This file is part of beets. +# Copyright 2026, J0J0 Todos. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + + +"""Shared utility functions for the lastgenre plugin.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from beets import config + +if TYPE_CHECKING: + from collections.abc import Callable + + from beets.logging import Logger + + +def make_tunelog(log: Logger) -> Callable[..., None]: + """Create a tunelog function bound to a specific logger. + + Returns a callable that logs tuning messages at DEBUG level when + verbosity is high enough. + """ + + def tunelog(msg: str, *args: Any, **kwargs: Any) -> None: + if config["verbose"].as_number() >= 3: + log.debug(msg, *args, **kwargs) + + return tunelog diff --git a/test/plugins/test_lastgenre.py b/test/plugins/test_lastgenre.py index 3de43d197..8d092f318 100644 --- a/test/plugins/test_lastgenre.py +++ b/test/plugins/test_lastgenre.py @@ -184,9 +184,9 @@ class LastGenrePluginTest(PluginTestCase): return [tag1, tag2] plugin = lastgenre.LastGenrePlugin() - res = plugin._tags_for(MockPylastObj()) + res = plugin.client._tags_for(MockPylastObj()) assert res == ["pop", "rap"] - res = plugin._tags_for(MockPylastObj(), min_weight=50) + res = plugin.client._tags_for(MockPylastObj(), min_weight=50) assert res == ["pop"] def test_sort_by_depth(self): @@ -583,9 +583,9 @@ def test_get_genre(config_values, item_genre, mock_genres, expected_result): # Mock the last.fm fetchers. When whitelist enabled, we can assume only # whitelisted genres get returned, the plugin's _resolve_genre method # ensures it. - lastgenre.LastGenrePlugin.fetch_track_genre = mock_fetch_track_genre - lastgenre.LastGenrePlugin.fetch_album_genre = mock_fetch_album_genre - lastgenre.LastGenrePlugin.fetch_artist_genre = mock_fetch_artist_genre + lastgenre.client.LastFmClient.fetch_track_genre = mock_fetch_track_genre + lastgenre.client.LastFmClient.fetch_album_genre = mock_fetch_album_genre + lastgenre.client.LastFmClient.fetch_artist_genre = mock_fetch_artist_genre # Initialize plugin instance and item plugin = lastgenre.LastGenrePlugin()