lastgenre: Move fetching to client module

and tunelog helper to a utils module since we will need it everywhere.
This commit is contained in:
J0J0 Todos 2026-02-01 23:36:42 +01:00
parent 680473b9e5
commit 7aae9a9519
3 changed files with 197 additions and 118 deletions

View file

@ -25,32 +25,24 @@ https://gist.github.com/1241307
from __future__ import annotations
import os
import traceback
from functools import singledispatchmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any
import pylast
import yaml
from beets import config, library, plugins, ui
from beets.library import Album, Item
from beets.util import plurality, unique_list
from .client import LastFmClient
from .utils import tunelog
if TYPE_CHECKING:
import optparse
from collections.abc import Callable
from beets.library import LibModel
LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)
PYLAST_EXCEPTIONS = (
pylast.WSError,
pylast.MalformedResponseError,
pylast.NetworkError,
)
# Canonicalization tree processing.
@ -123,7 +115,9 @@ class LastGenrePlugin(plugins.BeetsPlugin):
if self.config["auto"]:
self.import_stages = [self.imported]
self._genre_cache: dict[str, list[str]] = {}
self.client = LastFmClient(
self._log, self.config["min_weight"].get(int)
)
self.whitelist = self._load_whitelist()
self.c14n_branches, self.canonicalize = self._load_c14n_tree()
@ -168,11 +162,6 @@ class LastGenrePlugin(plugins.BeetsPlugin):
flatten_tree(genres_tree, [], c14n_branches)
return c14n_branches, canonicalize
def _tunelog(self, msg: str, *args: Any, **kwargs: Any) -> None:
"""Log tuning messages at DEBUG level when verbosity level is high enough."""
if config["verbose"].as_number() >= 3:
self._log.debug(msg, *args, **kwargs)
@property
def sources(self) -> tuple[str, ...]:
"""A tuple of allowed genre sources. May contain 'track',
@ -266,15 +255,6 @@ class LastGenrePlugin(plugins.BeetsPlugin):
valid_tags = [t for t in tags if self._is_valid(t)]
return valid_tags[:count]
def fetch_genre(
self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track
) -> list[str]:
"""Return genres for a pylast entity. Returns an empty list if
no suitable genres are found.
"""
min_weight = self.config["min_weight"].get(int)
return self._tags_for(lastfm_obj, min_weight)
def _is_valid(self, genre: str) -> bool:
"""Check if the genre is valid.
@ -285,48 +265,6 @@ class LastGenrePlugin(plugins.BeetsPlugin):
return True
return False
# Cached last.fm entity lookups.
def _last_lookup(
self, entity: str, method: Callable[..., Any], *args: str
) -> list[str]:
"""Get genres based on the named entity using the callable `method`
whose arguments are given in the sequence `args`. The genre lookup
is cached based on the entity name and the arguments.
Before the lookup, each argument has the "-" Unicode character replaced
with its rough ASCII equivalents in order to return better results from
the Last.fm database.
"""
# Shortcut if we're missing metadata.
if any(not s for s in args):
return []
key = f"{entity}.{'-'.join(str(a) for a in args)}"
if key not in self._genre_cache:
args_replaced = [a.replace("\u2010", "-") for a in args]
self._genre_cache[key] = self.fetch_genre(method(*args_replaced))
genre = self._genre_cache[key]
self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre)
return genre
def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]:
"""Return genres from Last.fm for the album by albumartist."""
return self._last_lookup(
"album", LASTFM.get_album, albumartist, albumtitle
)
def fetch_artist_genre(self, artist: str) -> list[str]:
"""Return genres from Last.fm for the artist."""
return self._last_lookup("artist", LASTFM.get_artist, artist)
def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]:
"""Return genres from Last.fm for the track by artist."""
return self._last_lookup(
"track", LASTFM.get_track, trackartist, tracktitle
)
# Main processing: _get_genre() and helpers.
def _format_and_stringify(self, tags: list[str]) -> str:
@ -414,14 +352,18 @@ class LastGenrePlugin(plugins.BeetsPlugin):
# Run through stages: track, album, artist,
# album artist, or most popular track genre.
if isinstance(obj, library.Item) and "track" in self.sources:
if new_genres := self.fetch_track_genre(obj.artist, obj.title):
if new_genres := self.client.fetch_track_genre(
obj.artist, obj.title
):
if result := _try_resolve_stage(
"track", keep_genres, new_genres
):
return result
if "album" in self.sources:
if new_genres := self.fetch_album_genre(obj.albumartist, obj.album):
if new_genres := self.client.fetch_album_genre(
obj.albumartist, obj.album
):
if result := _try_resolve_stage(
"album", keep_genres, new_genres
):
@ -430,22 +372,27 @@ class LastGenrePlugin(plugins.BeetsPlugin):
if "artist" in self.sources:
new_genres = []
if isinstance(obj, library.Item):
new_genres = self.fetch_artist_genre(obj.artist)
new_genres = self.client.fetch_artist_genre(obj.artist)
stage_label = "artist"
elif obj.albumartist != config["va_name"].as_str():
new_genres = self.fetch_artist_genre(obj.albumartist)
new_genres = self.client.fetch_artist_genre(obj.albumartist)
stage_label = "album artist"
if not new_genres:
self._tunelog(
tunelog(
self._log,
'No album artist genre found for "{}", '
"trying multi-valued field...",
obj.albumartist,
)
for albumartist in obj.albumartists:
self._tunelog(
'Fetching artist genre for "{}"', albumartist
tunelog(
self._log,
'Fetching artist genre for "{}"',
albumartist,
)
new_genres += self.client.fetch_artist_genre(
albumartist
)
new_genres += self.fetch_artist_genre(albumartist)
if new_genres:
stage_label = "multi-valued album artist"
else:
@ -455,11 +402,11 @@ class LastGenrePlugin(plugins.BeetsPlugin):
for item in obj.items():
item_genre = None
if "track" in self.sources:
item_genre = self.fetch_track_genre(
item_genre = self.client.fetch_track_genre(
item.artist, item.title
)
if not item_genre:
item_genre = self.fetch_artist_genre(item.artist)
item_genre = self.client.fetch_artist_genre(item.artist)
if item_genre:
item_genres += item_genre
if item_genres:
@ -607,43 +554,3 @@ class LastGenrePlugin(plugins.BeetsPlugin):
self, session: library.Session, task: library.ImportTask
) -> None:
self._process(task.album if task.is_album else task.item, write=False)
def _tags_for(
self,
obj: pylast.Album | pylast.Artist | pylast.Track,
min_weight: int | None = None,
) -> list[str]:
"""Core genre identification routine.
Given a pylast entity (album or track), return a list of
tag names for that entity. Return an empty list if the entity is
not found or another error occurs.
If `min_weight` is specified, tags are filtered by weight.
"""
# Work around an inconsistency in pylast where
# Album.get_top_tags() does not return TopItem instances.
# https://github.com/pylast/pylast/issues/86
obj_to_query: Any = obj
if isinstance(obj, pylast.Album):
obj_to_query = super(pylast.Album, obj)
try:
res: Any = obj_to_query.get_top_tags()
except PYLAST_EXCEPTIONS as exc:
self._log.debug("last.fm error: {}", exc)
return []
except Exception as exc:
# Isolate bugs in pylast.
self._log.debug("{}", traceback.format_exc())
self._log.error("error in pylast library: {}", exc)
return []
# Filter by weight (optionally).
if min_weight:
res = [el for el in res if (int(el.weight or 0)) >= min_weight]
# Get strings from tags.
tags: list[str] = [el.item.get_name().lower() for el in res]
return tags

View file

@ -0,0 +1,141 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Last.fm API client for genre lookups."""
from __future__ import annotations
import traceback
from typing import TYPE_CHECKING, Any
import pylast
from beets import plugins
from .utils import tunelog
if TYPE_CHECKING:
from collections.abc import Callable
from beets.logging import Logger
LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)
PYLAST_EXCEPTIONS = (
pylast.WSError,
pylast.MalformedResponseError,
pylast.NetworkError,
)
class LastFmClient:
"""Client for fetching genres from Last.fm."""
def __init__(self, log: Logger, min_weight: int):
"""Initialize the client.
The min_weight parameter filters tags by their minimum weight.
"""
self._log = log
self._min_weight = min_weight
self._genre_cache: dict[str, list[str]] = {}
def fetch_genre(
self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track
) -> list[str]:
"""Return genres for a pylast entity. Returns an empty list if
no suitable genres are found.
"""
return self._tags_for(lastfm_obj, self._min_weight)
def _tags_for(
self,
obj: pylast.Album | pylast.Artist | pylast.Track,
min_weight: int | None = None,
) -> list[str]:
"""Core genre identification routine.
Given a pylast entity (album or track), return a list of
tag names for that entity. Return an empty list if the entity is
not found or another error occurs.
If `min_weight` is specified, tags are filtered by weight.
"""
# Work around an inconsistency in pylast where
# Album.get_top_tags() does not return TopItem instances.
# https://github.com/pylast/pylast/issues/86
obj_to_query: Any = obj
if isinstance(obj, pylast.Album):
obj_to_query = super(pylast.Album, obj)
try:
res: Any = obj_to_query.get_top_tags()
except PYLAST_EXCEPTIONS as exc:
self._log.debug("last.fm error: {}", exc)
return []
except Exception as exc:
# Isolate bugs in pylast.
self._log.debug("{}", traceback.format_exc())
self._log.error("error in pylast library: {}", exc)
return []
# Filter by weight (optionally).
if min_weight:
res = [el for el in res if (int(el.weight or 0)) >= min_weight]
# Get strings from tags.
tags: list[str] = [el.item.get_name().lower() for el in res]
return tags
def _last_lookup(
self, entity: str, method: Callable[..., Any], *args: str
) -> list[str]:
"""Get genres based on the named entity using the callable `method`
whose arguments are given in the sequence `args`. The genre lookup
is cached based on the entity name and the arguments.
Before the lookup, each argument has the "-" Unicode character replaced
with its rough ASCII equivalents in order to return better results from
the Last.fm database.
"""
# Shortcut if we're missing metadata.
if any(not s for s in args):
return []
key = f"{entity}.{'-'.join(str(a) for a in args)}"
if key not in self._genre_cache:
args_replaced = [a.replace("\u2010", "-") for a in args]
self._genre_cache[key] = self.fetch_genre(method(*args_replaced))
genre = self._genre_cache[key]
tunelog(self._log, "last.fm (unfiltered) {} tags: {}", entity, genre)
return genre
def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]:
"""Return genres from Last.fm for the album by albumartist."""
return self._last_lookup(
"album", LASTFM.get_album, albumartist, albumtitle
)
def fetch_artist_genre(self, artist: str) -> list[str]:
"""Return genres from Last.fm for the artist."""
return self._last_lookup("artist", LASTFM.get_artist, artist)
def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]:
"""Return genres from Last.fm for the track by artist."""
return self._last_lookup(
"track", LASTFM.get_track, trackartist, tracktitle
)

View file

@ -0,0 +1,31 @@
# This file is part of beets.
# Copyright 2026, J0J0 Todos.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Shared utility functions for the lastgenre plugin."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from beets import config
if TYPE_CHECKING:
from beets.logging import Logger
def tunelog(log: Logger, msg: str, *args: Any, **kwargs: Any) -> None:
"""Log tuning messages at DEBUG level when verbosity level is high enough."""
if config["verbose"].as_number() >= 3:
log.debug(msg, *args, **kwargs)