diff --git a/beetsplug/lastgenre/__init__.py b/beetsplug/lastgenre/__init__.py index 1c91688a6..f75fa0312 100644 --- a/beetsplug/lastgenre/__init__.py +++ b/beetsplug/lastgenre/__init__.py @@ -25,32 +25,24 @@ https://gist.github.com/1241307 from __future__ import annotations import os -import traceback from functools import singledispatchmethod from pathlib import Path from typing import TYPE_CHECKING, Any -import pylast import yaml from beets import config, library, plugins, ui from beets.library import Album, Item from beets.util import plurality, unique_list +from .client import LastFmClient +from .utils import tunelog + if TYPE_CHECKING: import optparse - from collections.abc import Callable from beets.library import LibModel -LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY) - -PYLAST_EXCEPTIONS = ( - pylast.WSError, - pylast.MalformedResponseError, - pylast.NetworkError, -) - # Canonicalization tree processing. @@ -123,7 +115,9 @@ class LastGenrePlugin(plugins.BeetsPlugin): if self.config["auto"]: self.import_stages = [self.imported] - self._genre_cache: dict[str, list[str]] = {} + self.client = LastFmClient( + self._log, self.config["min_weight"].get(int) + ) self.whitelist = self._load_whitelist() self.c14n_branches, self.canonicalize = self._load_c14n_tree() @@ -168,11 +162,6 @@ class LastGenrePlugin(plugins.BeetsPlugin): flatten_tree(genres_tree, [], c14n_branches) return c14n_branches, canonicalize - def _tunelog(self, msg: str, *args: Any, **kwargs: Any) -> None: - """Log tuning messages at DEBUG level when verbosity level is high enough.""" - if config["verbose"].as_number() >= 3: - self._log.debug(msg, *args, **kwargs) - @property def sources(self) -> tuple[str, ...]: """A tuple of allowed genre sources. May contain 'track', @@ -266,15 +255,6 @@ class LastGenrePlugin(plugins.BeetsPlugin): valid_tags = [t for t in tags if self._is_valid(t)] return valid_tags[:count] - def fetch_genre( - self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track - ) -> list[str]: - """Return genres for a pylast entity. Returns an empty list if - no suitable genres are found. - """ - min_weight = self.config["min_weight"].get(int) - return self._tags_for(lastfm_obj, min_weight) - def _is_valid(self, genre: str) -> bool: """Check if the genre is valid. @@ -285,48 +265,6 @@ class LastGenrePlugin(plugins.BeetsPlugin): return True return False - # Cached last.fm entity lookups. - - def _last_lookup( - self, entity: str, method: Callable[..., Any], *args: str - ) -> list[str]: - """Get genres based on the named entity using the callable `method` - whose arguments are given in the sequence `args`. The genre lookup - is cached based on the entity name and the arguments. - - Before the lookup, each argument has the "-" Unicode character replaced - with its rough ASCII equivalents in order to return better results from - the Last.fm database. - """ - # Shortcut if we're missing metadata. - if any(not s for s in args): - return [] - - key = f"{entity}.{'-'.join(str(a) for a in args)}" - if key not in self._genre_cache: - args_replaced = [a.replace("\u2010", "-") for a in args] - self._genre_cache[key] = self.fetch_genre(method(*args_replaced)) - - genre = self._genre_cache[key] - self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre) - return genre - - def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]: - """Return genres from Last.fm for the album by albumartist.""" - return self._last_lookup( - "album", LASTFM.get_album, albumartist, albumtitle - ) - - def fetch_artist_genre(self, artist: str) -> list[str]: - """Return genres from Last.fm for the artist.""" - return self._last_lookup("artist", LASTFM.get_artist, artist) - - def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]: - """Return genres from Last.fm for the track by artist.""" - return self._last_lookup( - "track", LASTFM.get_track, trackartist, tracktitle - ) - # Main processing: _get_genre() and helpers. def _format_and_stringify(self, tags: list[str]) -> str: @@ -414,14 +352,18 @@ class LastGenrePlugin(plugins.BeetsPlugin): # Run through stages: track, album, artist, # album artist, or most popular track genre. if isinstance(obj, library.Item) and "track" in self.sources: - if new_genres := self.fetch_track_genre(obj.artist, obj.title): + if new_genres := self.client.fetch_track_genre( + obj.artist, obj.title + ): if result := _try_resolve_stage( "track", keep_genres, new_genres ): return result if "album" in self.sources: - if new_genres := self.fetch_album_genre(obj.albumartist, obj.album): + if new_genres := self.client.fetch_album_genre( + obj.albumartist, obj.album + ): if result := _try_resolve_stage( "album", keep_genres, new_genres ): @@ -430,22 +372,27 @@ class LastGenrePlugin(plugins.BeetsPlugin): if "artist" in self.sources: new_genres = [] if isinstance(obj, library.Item): - new_genres = self.fetch_artist_genre(obj.artist) + new_genres = self.client.fetch_artist_genre(obj.artist) stage_label = "artist" elif obj.albumartist != config["va_name"].as_str(): - new_genres = self.fetch_artist_genre(obj.albumartist) + new_genres = self.client.fetch_artist_genre(obj.albumartist) stage_label = "album artist" if not new_genres: - self._tunelog( + tunelog( + self._log, 'No album artist genre found for "{}", ' "trying multi-valued field...", obj.albumartist, ) for albumartist in obj.albumartists: - self._tunelog( - 'Fetching artist genre for "{}"', albumartist + tunelog( + self._log, + 'Fetching artist genre for "{}"', + albumartist, + ) + new_genres += self.client.fetch_artist_genre( + albumartist ) - new_genres += self.fetch_artist_genre(albumartist) if new_genres: stage_label = "multi-valued album artist" else: @@ -455,11 +402,11 @@ class LastGenrePlugin(plugins.BeetsPlugin): for item in obj.items(): item_genre = None if "track" in self.sources: - item_genre = self.fetch_track_genre( + item_genre = self.client.fetch_track_genre( item.artist, item.title ) if not item_genre: - item_genre = self.fetch_artist_genre(item.artist) + item_genre = self.client.fetch_artist_genre(item.artist) if item_genre: item_genres += item_genre if item_genres: @@ -607,43 +554,3 @@ class LastGenrePlugin(plugins.BeetsPlugin): self, session: library.Session, task: library.ImportTask ) -> None: self._process(task.album if task.is_album else task.item, write=False) - - def _tags_for( - self, - obj: pylast.Album | pylast.Artist | pylast.Track, - min_weight: int | None = None, - ) -> list[str]: - """Core genre identification routine. - - Given a pylast entity (album or track), return a list of - tag names for that entity. Return an empty list if the entity is - not found or another error occurs. - - If `min_weight` is specified, tags are filtered by weight. - """ - # Work around an inconsistency in pylast where - # Album.get_top_tags() does not return TopItem instances. - # https://github.com/pylast/pylast/issues/86 - obj_to_query: Any = obj - if isinstance(obj, pylast.Album): - obj_to_query = super(pylast.Album, obj) - - try: - res: Any = obj_to_query.get_top_tags() - except PYLAST_EXCEPTIONS as exc: - self._log.debug("last.fm error: {}", exc) - return [] - except Exception as exc: - # Isolate bugs in pylast. - self._log.debug("{}", traceback.format_exc()) - self._log.error("error in pylast library: {}", exc) - return [] - - # Filter by weight (optionally). - if min_weight: - res = [el for el in res if (int(el.weight or 0)) >= min_weight] - - # Get strings from tags. - tags: list[str] = [el.item.get_name().lower() for el in res] - - return tags diff --git a/beetsplug/lastgenre/client.py b/beetsplug/lastgenre/client.py new file mode 100644 index 000000000..b7444cae8 --- /dev/null +++ b/beetsplug/lastgenre/client.py @@ -0,0 +1,141 @@ +# This file is part of beets. +# Copyright 2016, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + + +"""Last.fm API client for genre lookups.""" + +from __future__ import annotations + +import traceback +from typing import TYPE_CHECKING, Any + +import pylast + +from beets import plugins + +from .utils import tunelog + +if TYPE_CHECKING: + from collections.abc import Callable + + from beets.logging import Logger + +LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY) + +PYLAST_EXCEPTIONS = ( + pylast.WSError, + pylast.MalformedResponseError, + pylast.NetworkError, +) + + +class LastFmClient: + """Client for fetching genres from Last.fm.""" + + def __init__(self, log: Logger, min_weight: int): + """Initialize the client. + + The min_weight parameter filters tags by their minimum weight. + """ + self._log = log + self._min_weight = min_weight + self._genre_cache: dict[str, list[str]] = {} + + def fetch_genre( + self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track + ) -> list[str]: + """Return genres for a pylast entity. Returns an empty list if + no suitable genres are found. + """ + return self._tags_for(lastfm_obj, self._min_weight) + + def _tags_for( + self, + obj: pylast.Album | pylast.Artist | pylast.Track, + min_weight: int | None = None, + ) -> list[str]: + """Core genre identification routine. + + Given a pylast entity (album or track), return a list of + tag names for that entity. Return an empty list if the entity is + not found or another error occurs. + + If `min_weight` is specified, tags are filtered by weight. + """ + # Work around an inconsistency in pylast where + # Album.get_top_tags() does not return TopItem instances. + # https://github.com/pylast/pylast/issues/86 + obj_to_query: Any = obj + if isinstance(obj, pylast.Album): + obj_to_query = super(pylast.Album, obj) + + try: + res: Any = obj_to_query.get_top_tags() + except PYLAST_EXCEPTIONS as exc: + self._log.debug("last.fm error: {}", exc) + return [] + except Exception as exc: + # Isolate bugs in pylast. + self._log.debug("{}", traceback.format_exc()) + self._log.error("error in pylast library: {}", exc) + return [] + + # Filter by weight (optionally). + if min_weight: + res = [el for el in res if (int(el.weight or 0)) >= min_weight] + + # Get strings from tags. + tags: list[str] = [el.item.get_name().lower() for el in res] + + return tags + + def _last_lookup( + self, entity: str, method: Callable[..., Any], *args: str + ) -> list[str]: + """Get genres based on the named entity using the callable `method` + whose arguments are given in the sequence `args`. The genre lookup + is cached based on the entity name and the arguments. + + Before the lookup, each argument has the "-" Unicode character replaced + with its rough ASCII equivalents in order to return better results from + the Last.fm database. + """ + # Shortcut if we're missing metadata. + if any(not s for s in args): + return [] + + key = f"{entity}.{'-'.join(str(a) for a in args)}" + if key not in self._genre_cache: + args_replaced = [a.replace("\u2010", "-") for a in args] + self._genre_cache[key] = self.fetch_genre(method(*args_replaced)) + + genre = self._genre_cache[key] + tunelog(self._log, "last.fm (unfiltered) {} tags: {}", entity, genre) + return genre + + def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]: + """Return genres from Last.fm for the album by albumartist.""" + return self._last_lookup( + "album", LASTFM.get_album, albumartist, albumtitle + ) + + def fetch_artist_genre(self, artist: str) -> list[str]: + """Return genres from Last.fm for the artist.""" + return self._last_lookup("artist", LASTFM.get_artist, artist) + + def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]: + """Return genres from Last.fm for the track by artist.""" + return self._last_lookup( + "track", LASTFM.get_track, trackartist, tracktitle + ) diff --git a/beetsplug/lastgenre/utils.py b/beetsplug/lastgenre/utils.py new file mode 100644 index 000000000..4c7846f23 --- /dev/null +++ b/beetsplug/lastgenre/utils.py @@ -0,0 +1,31 @@ +# This file is part of beets. +# Copyright 2026, J0J0 Todos. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + + +"""Shared utility functions for the lastgenre plugin.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from beets import config + +if TYPE_CHECKING: + from beets.logging import Logger + + +def tunelog(log: Logger, msg: str, *args: Any, **kwargs: Any) -> None: + """Log tuning messages at DEBUG level when verbosity level is high enough.""" + if config["verbose"].as_number() >= 3: + log.debug(msg, *args, **kwargs)