mirror of
https://github.com/beetbox/beets.git
synced 2026-02-08 08:25:23 +01:00
Merge 5eabd4c68e into cdfb813910
This commit is contained in:
commit
933fd5e1a9
5 changed files with 387 additions and 220 deletions
|
|
@ -24,76 +24,23 @@ https://gist.github.com/1241307
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import traceback
|
||||
from functools import singledispatchmethod
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pylast
|
||||
import yaml
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from beets import config, library, plugins, ui
|
||||
from beets.library import Album, Item
|
||||
from beets.util import plurality, unique_list
|
||||
|
||||
from .client import LastFmClient
|
||||
from .loaders import DataFileLoader
|
||||
from .utils import make_tunelog
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import optparse
|
||||
from collections.abc import Callable
|
||||
|
||||
from beets.library import LibModel
|
||||
|
||||
LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)
|
||||
|
||||
PYLAST_EXCEPTIONS = (
|
||||
pylast.WSError,
|
||||
pylast.MalformedResponseError,
|
||||
pylast.NetworkError,
|
||||
)
|
||||
|
||||
|
||||
# Canonicalization tree processing.
|
||||
|
||||
|
||||
def flatten_tree(
|
||||
elem: dict[Any, Any] | list[Any] | str,
|
||||
path: list[str],
|
||||
branches: list[list[str]],
|
||||
) -> None:
|
||||
"""Flatten nested lists/dictionaries into lists of strings
|
||||
(branches).
|
||||
"""
|
||||
if not path:
|
||||
path = []
|
||||
|
||||
if isinstance(elem, dict):
|
||||
for k, v in elem.items():
|
||||
flatten_tree(v, [*path, k], branches)
|
||||
elif isinstance(elem, list):
|
||||
for sub in elem:
|
||||
flatten_tree(sub, path, branches)
|
||||
else:
|
||||
branches.append([*path, str(elem)])
|
||||
|
||||
|
||||
def find_parents(candidate: str, branches: list[list[str]]) -> list[str]:
|
||||
"""Find parents genre of a given genre, ordered from the closest to
|
||||
the further parent.
|
||||
"""
|
||||
for branch in branches:
|
||||
try:
|
||||
idx = branch.index(candidate.lower())
|
||||
return list(reversed(branch[: idx + 1]))
|
||||
except ValueError:
|
||||
continue
|
||||
return [candidate]
|
||||
|
||||
|
||||
# Main plugin logic.
|
||||
|
||||
WHITELIST = os.path.join(os.path.dirname(__file__), "genres.txt")
|
||||
C14N_TREE = os.path.join(os.path.dirname(__file__), "genres-tree.yaml")
|
||||
|
||||
|
||||
class LastGenrePlugin(plugins.BeetsPlugin):
|
||||
def __init__(self) -> None:
|
||||
|
|
@ -123,55 +70,17 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
if self.config["auto"]:
|
||||
self.import_stages = [self.imported]
|
||||
|
||||
self._genre_cache: dict[str, list[str]] = {}
|
||||
self.whitelist = self._load_whitelist()
|
||||
self.c14n_branches, self.canonicalize = self._load_c14n_tree()
|
||||
self._tunelog = make_tunelog(self._log)
|
||||
self.client = LastFmClient(
|
||||
self._log, self.config["min_weight"].get(int)
|
||||
)
|
||||
|
||||
def _load_whitelist(self) -> set[str]:
|
||||
"""Load the whitelist from a text file.
|
||||
|
||||
Default whitelist is used if config is True, empty string or set to "nothing".
|
||||
"""
|
||||
whitelist = set()
|
||||
wl_filename = self.config["whitelist"].get()
|
||||
if wl_filename in (True, "", None): # Indicates the default whitelist.
|
||||
wl_filename = WHITELIST
|
||||
if wl_filename:
|
||||
self._log.debug("Loading whitelist {}", wl_filename)
|
||||
text = Path(wl_filename).expanduser().read_text(encoding="utf-8")
|
||||
for line in text.splitlines():
|
||||
if (line := line.strip().lower()) and not line.startswith("#"):
|
||||
whitelist.add(line)
|
||||
|
||||
return whitelist
|
||||
|
||||
def _load_c14n_tree(self) -> tuple[list[list[str]], bool]:
|
||||
"""Load the canonicalization tree from a YAML file.
|
||||
|
||||
Default tree is used if config is True, empty string, set to "nothing"
|
||||
or if prefer_specific is enabled.
|
||||
"""
|
||||
c14n_branches: list[list[str]] = []
|
||||
c14n_filename = self.config["canonical"].get()
|
||||
canonicalize = c14n_filename is not False
|
||||
# Default tree
|
||||
if c14n_filename in (True, "", None) or (
|
||||
# prefer_specific requires a tree, load default tree
|
||||
not canonicalize and self.config["prefer_specific"].get()
|
||||
):
|
||||
c14n_filename = C14N_TREE
|
||||
# Read the tree
|
||||
if c14n_filename:
|
||||
self._log.debug("Loading canonicalization tree {}", c14n_filename)
|
||||
with Path(c14n_filename).expanduser().open(encoding="utf-8") as f:
|
||||
genres_tree = yaml.safe_load(f)
|
||||
flatten_tree(genres_tree, [], c14n_branches)
|
||||
return c14n_branches, canonicalize
|
||||
|
||||
def _tunelog(self, msg: str, *args: Any, **kwargs: Any) -> None:
|
||||
"""Log tuning messages at DEBUG level when verbosity level is high enough."""
|
||||
if config["verbose"].as_number() >= 3:
|
||||
self._log.debug(msg, *args, **kwargs)
|
||||
loader = DataFileLoader.from_config(
|
||||
self.config, self._log, Path(__file__).parent
|
||||
)
|
||||
self.whitelist = loader.whitelist
|
||||
self.c14n_branches = loader.c14n_branches
|
||||
self.canonicalize = loader.canonicalize
|
||||
|
||||
@property
|
||||
def sources(self) -> tuple[str, ...]:
|
||||
|
|
@ -187,7 +96,7 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
return ("artist",)
|
||||
return tuple()
|
||||
|
||||
# More canonicalization and general helpers.
|
||||
# Canonicalization and filtering.
|
||||
|
||||
def _get_depth(self, tag: str) -> int | None:
|
||||
"""Find the depth of a tag in the genres tree."""
|
||||
|
|
@ -207,6 +116,17 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
depth_tag_pairs.sort(reverse=True)
|
||||
return [p[1] for p in depth_tag_pairs]
|
||||
|
||||
@staticmethod
|
||||
def find_parents(candidate: str, branches: list[list[str]]) -> list[str]:
|
||||
"""Find parent genres of a given genre, ordered from closest to furthest."""
|
||||
for branch in branches:
|
||||
try:
|
||||
idx = branch.index(candidate.lower())
|
||||
return list(reversed(branch[: idx + 1]))
|
||||
except ValueError:
|
||||
continue
|
||||
return [candidate]
|
||||
|
||||
def _resolve_genres(self, tags: list[str]) -> list[str]:
|
||||
"""Canonicalize, sort and filter a list of genres.
|
||||
|
||||
|
|
@ -239,11 +159,11 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
if self.whitelist:
|
||||
parents = [
|
||||
x
|
||||
for x in find_parents(tag, self.c14n_branches)
|
||||
for x in self.find_parents(tag, self.c14n_branches)
|
||||
if self._is_valid(x)
|
||||
]
|
||||
else:
|
||||
parents = [find_parents(tag, self.c14n_branches)[-1]]
|
||||
parents = [self.find_parents(tag, self.c14n_branches)[-1]]
|
||||
|
||||
tags_all += parents
|
||||
# Stop if we have enough tags already, unless we need to find
|
||||
|
|
@ -266,15 +186,6 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
valid_tags = [t for t in tags if self._is_valid(t)]
|
||||
return valid_tags[:count]
|
||||
|
||||
def fetch_genre(
|
||||
self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track
|
||||
) -> list[str]:
|
||||
"""Return genres for a pylast entity. Returns an empty list if
|
||||
no suitable genres are found.
|
||||
"""
|
||||
min_weight = self.config["min_weight"].get(int)
|
||||
return self._tags_for(lastfm_obj, min_weight)
|
||||
|
||||
def _is_valid(self, genre: str) -> bool:
|
||||
"""Check if the genre is valid.
|
||||
|
||||
|
|
@ -285,48 +196,6 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
return True
|
||||
return False
|
||||
|
||||
# Cached last.fm entity lookups.
|
||||
|
||||
def _last_lookup(
|
||||
self, entity: str, method: Callable[..., Any], *args: str
|
||||
) -> list[str]:
|
||||
"""Get genres based on the named entity using the callable `method`
|
||||
whose arguments are given in the sequence `args`. The genre lookup
|
||||
is cached based on the entity name and the arguments.
|
||||
|
||||
Before the lookup, each argument has the "-" Unicode character replaced
|
||||
with its rough ASCII equivalents in order to return better results from
|
||||
the Last.fm database.
|
||||
"""
|
||||
# Shortcut if we're missing metadata.
|
||||
if any(not s for s in args):
|
||||
return []
|
||||
|
||||
key = f"{entity}.{'-'.join(str(a) for a in args)}"
|
||||
if key not in self._genre_cache:
|
||||
args_replaced = [a.replace("\u2010", "-") for a in args]
|
||||
self._genre_cache[key] = self.fetch_genre(method(*args_replaced))
|
||||
|
||||
genre = self._genre_cache[key]
|
||||
self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre)
|
||||
return genre
|
||||
|
||||
def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the album by albumartist."""
|
||||
return self._last_lookup(
|
||||
"album", LASTFM.get_album, albumartist, albumtitle
|
||||
)
|
||||
|
||||
def fetch_artist_genre(self, artist: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the artist."""
|
||||
return self._last_lookup("artist", LASTFM.get_artist, artist)
|
||||
|
||||
def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the track by artist."""
|
||||
return self._last_lookup(
|
||||
"track", LASTFM.get_track, trackartist, tracktitle
|
||||
)
|
||||
|
||||
# Main processing: _get_genre() and helpers.
|
||||
|
||||
def _format_and_stringify(self, tags: list[str]) -> str:
|
||||
|
|
@ -414,14 +283,18 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
# Run through stages: track, album, artist,
|
||||
# album artist, or most popular track genre.
|
||||
if isinstance(obj, library.Item) and "track" in self.sources:
|
||||
if new_genres := self.fetch_track_genre(obj.artist, obj.title):
|
||||
if new_genres := self.client.fetch_track_genre(
|
||||
obj.artist, obj.title
|
||||
):
|
||||
if result := _try_resolve_stage(
|
||||
"track", keep_genres, new_genres
|
||||
):
|
||||
return result
|
||||
|
||||
if "album" in self.sources:
|
||||
if new_genres := self.fetch_album_genre(obj.albumartist, obj.album):
|
||||
if new_genres := self.client.fetch_album_genre(
|
||||
obj.albumartist, obj.album
|
||||
):
|
||||
if result := _try_resolve_stage(
|
||||
"album", keep_genres, new_genres
|
||||
):
|
||||
|
|
@ -430,10 +303,10 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
if "artist" in self.sources:
|
||||
new_genres = []
|
||||
if isinstance(obj, library.Item):
|
||||
new_genres = self.fetch_artist_genre(obj.artist)
|
||||
new_genres = self.client.fetch_artist_genre(obj.artist)
|
||||
stage_label = "artist"
|
||||
elif obj.albumartist != config["va_name"].as_str():
|
||||
new_genres = self.fetch_artist_genre(obj.albumartist)
|
||||
new_genres = self.client.fetch_artist_genre(obj.albumartist)
|
||||
stage_label = "album artist"
|
||||
if not new_genres:
|
||||
self._tunelog(
|
||||
|
|
@ -443,9 +316,12 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
)
|
||||
for albumartist in obj.albumartists:
|
||||
self._tunelog(
|
||||
'Fetching artist genre for "{}"', albumartist
|
||||
'Fetching artist genre for "{}"',
|
||||
albumartist,
|
||||
)
|
||||
new_genres += self.client.fetch_artist_genre(
|
||||
albumartist
|
||||
)
|
||||
new_genres += self.fetch_artist_genre(albumartist)
|
||||
if new_genres:
|
||||
stage_label = "multi-valued album artist"
|
||||
else:
|
||||
|
|
@ -455,11 +331,11 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
for item in obj.items():
|
||||
item_genre = None
|
||||
if "track" in self.sources:
|
||||
item_genre = self.fetch_track_genre(
|
||||
item_genre = self.client.fetch_track_genre(
|
||||
item.artist, item.title
|
||||
)
|
||||
if not item_genre:
|
||||
item_genre = self.fetch_artist_genre(item.artist)
|
||||
item_genre = self.client.fetch_artist_genre(item.artist)
|
||||
if item_genre:
|
||||
item_genres += item_genre
|
||||
if item_genres:
|
||||
|
|
@ -482,13 +358,12 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
if obj.genre and self.config["keep_existing"]:
|
||||
if not self.whitelist or self._is_valid(obj.genre.lower()):
|
||||
return obj.genre, "original fallback"
|
||||
else:
|
||||
# If the original genre doesn't match a whitelisted genre, check
|
||||
# if we can canonicalize it to find a matching, whitelisted genre!
|
||||
if result := _try_resolve_stage(
|
||||
"original fallback", keep_genres, []
|
||||
):
|
||||
return result
|
||||
# If the original genre doesn't match a whitelisted genre, check
|
||||
# if we can canonicalize it to find a matching, whitelisted genre!
|
||||
if result := _try_resolve_stage(
|
||||
"original fallback", keep_genres, []
|
||||
):
|
||||
return result
|
||||
|
||||
# Return fallback string.
|
||||
if fallback := self.config["fallback"].get():
|
||||
|
|
@ -607,43 +482,3 @@ class LastGenrePlugin(plugins.BeetsPlugin):
|
|||
self, session: library.Session, task: library.ImportTask
|
||||
) -> None:
|
||||
self._process(task.album if task.is_album else task.item, write=False)
|
||||
|
||||
def _tags_for(
|
||||
self,
|
||||
obj: pylast.Album | pylast.Artist | pylast.Track,
|
||||
min_weight: int | None = None,
|
||||
) -> list[str]:
|
||||
"""Core genre identification routine.
|
||||
|
||||
Given a pylast entity (album or track), return a list of
|
||||
tag names for that entity. Return an empty list if the entity is
|
||||
not found or another error occurs.
|
||||
|
||||
If `min_weight` is specified, tags are filtered by weight.
|
||||
"""
|
||||
# Work around an inconsistency in pylast where
|
||||
# Album.get_top_tags() does not return TopItem instances.
|
||||
# https://github.com/pylast/pylast/issues/86
|
||||
obj_to_query: Any = obj
|
||||
if isinstance(obj, pylast.Album):
|
||||
obj_to_query = super(pylast.Album, obj)
|
||||
|
||||
try:
|
||||
res: Any = obj_to_query.get_top_tags()
|
||||
except PYLAST_EXCEPTIONS as exc:
|
||||
self._log.debug("last.fm error: {}", exc)
|
||||
return []
|
||||
except Exception as exc:
|
||||
# Isolate bugs in pylast.
|
||||
self._log.debug("{}", traceback.format_exc())
|
||||
self._log.error("error in pylast library: {}", exc)
|
||||
return []
|
||||
|
||||
# Filter by weight (optionally).
|
||||
if min_weight:
|
||||
res = [el for el in res if (int(el.weight or 0)) >= min_weight]
|
||||
|
||||
# Get strings from tags.
|
||||
tags: list[str] = [el.item.get_name().lower() for el in res]
|
||||
|
||||
return tags
|
||||
|
|
|
|||
142
beetsplug/lastgenre/client.py
Normal file
142
beetsplug/lastgenre/client.py
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
# This file is part of beets.
|
||||
# Copyright 2016, Adrian Sampson.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
|
||||
"""Last.fm API client for genre lookups."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pylast
|
||||
|
||||
from beets import plugins
|
||||
|
||||
from .utils import make_tunelog
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
|
||||
from beets.logging import Logger
|
||||
|
||||
LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)
|
||||
|
||||
PYLAST_EXCEPTIONS = (
|
||||
pylast.WSError,
|
||||
pylast.MalformedResponseError,
|
||||
pylast.NetworkError,
|
||||
)
|
||||
|
||||
|
||||
class LastFmClient:
|
||||
"""Client for fetching genres from Last.fm."""
|
||||
|
||||
def __init__(self, log: Logger, min_weight: int):
|
||||
"""Initialize the client.
|
||||
|
||||
The min_weight parameter filters tags by their minimum weight.
|
||||
"""
|
||||
self._log = log
|
||||
self._tunelog = make_tunelog(log)
|
||||
self._min_weight = min_weight
|
||||
self._genre_cache: dict[str, list[str]] = {}
|
||||
|
||||
def fetch_genre(
|
||||
self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track
|
||||
) -> list[str]:
|
||||
"""Return genres for a pylast entity. Returns an empty list if
|
||||
no suitable genres are found.
|
||||
"""
|
||||
return self._tags_for(lastfm_obj, self._min_weight)
|
||||
|
||||
def _tags_for(
|
||||
self,
|
||||
obj: pylast.Album | pylast.Artist | pylast.Track,
|
||||
min_weight: int | None = None,
|
||||
) -> list[str]:
|
||||
"""Core genre identification routine.
|
||||
|
||||
Given a pylast entity (album or track), return a list of
|
||||
tag names for that entity. Return an empty list if the entity is
|
||||
not found or another error occurs.
|
||||
|
||||
If `min_weight` is specified, tags are filtered by weight.
|
||||
"""
|
||||
# Work around an inconsistency in pylast where
|
||||
# Album.get_top_tags() does not return TopItem instances.
|
||||
# https://github.com/pylast/pylast/issues/86
|
||||
obj_to_query: Any = obj
|
||||
if isinstance(obj, pylast.Album):
|
||||
obj_to_query = super(pylast.Album, obj)
|
||||
|
||||
try:
|
||||
res: Any = obj_to_query.get_top_tags()
|
||||
except PYLAST_EXCEPTIONS as exc:
|
||||
self._log.debug("last.fm error: {}", exc)
|
||||
return []
|
||||
except Exception as exc:
|
||||
# Isolate bugs in pylast.
|
||||
self._log.debug("{}", traceback.format_exc())
|
||||
self._log.error("error in pylast library: {}", exc)
|
||||
return []
|
||||
|
||||
# Filter by weight (optionally).
|
||||
if min_weight:
|
||||
res = [el for el in res if (int(el.weight or 0)) >= min_weight]
|
||||
|
||||
# Get strings from tags.
|
||||
tags: list[str] = [el.item.get_name().lower() for el in res]
|
||||
|
||||
return tags
|
||||
|
||||
def _last_lookup(
|
||||
self, entity: str, method: Callable[..., Any], *args: str
|
||||
) -> list[str]:
|
||||
"""Get genres based on the named entity using the callable `method`
|
||||
whose arguments are given in the sequence `args`. The genre lookup
|
||||
is cached based on the entity name and the arguments.
|
||||
|
||||
Before the lookup, each argument has the "-" Unicode character replaced
|
||||
with its rough ASCII equivalents in order to return better results from
|
||||
the Last.fm database.
|
||||
"""
|
||||
# Shortcut if we're missing metadata.
|
||||
if any(not s for s in args):
|
||||
return []
|
||||
|
||||
key = f"{entity}.{'-'.join(str(a) for a in args)}"
|
||||
if key not in self._genre_cache:
|
||||
args_replaced = [a.replace("\u2010", "-") for a in args]
|
||||
self._genre_cache[key] = self.fetch_genre(method(*args_replaced))
|
||||
|
||||
genre = self._genre_cache[key]
|
||||
self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre)
|
||||
return genre
|
||||
|
||||
def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the album by albumartist."""
|
||||
return self._last_lookup(
|
||||
"album", LASTFM.get_album, albumartist, albumtitle
|
||||
)
|
||||
|
||||
def fetch_artist_genre(self, artist: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the artist."""
|
||||
return self._last_lookup("artist", LASTFM.get_artist, artist)
|
||||
|
||||
def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]:
|
||||
"""Return genres from Last.fm for the track by artist."""
|
||||
return self._last_lookup(
|
||||
"track", LASTFM.get_track, trackartist, tracktitle
|
||||
)
|
||||
149
beetsplug/lastgenre/loaders.py
Normal file
149
beetsplug/lastgenre/loaders.py
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
# This file is part of beets.
|
||||
# Copyright 2016, Adrian Sampson.
|
||||
# Copyright 2026, J0J0 Todos.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
|
||||
"""Data file loaders for the lastgenre plugin."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import yaml
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from confuse import ConfigView
|
||||
|
||||
from beets.logging import Logger
|
||||
|
||||
|
||||
class DataFileLoader:
|
||||
"""Loads genre-related data files for the lastgenre plugin."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
log: Logger,
|
||||
plugin_dir: Path,
|
||||
whitelist: set[str],
|
||||
c14n_branches: list[list[str]],
|
||||
canonicalize: bool,
|
||||
):
|
||||
"""Initialize with pre-loaded data.
|
||||
|
||||
Use from_config() classmethod to construct from plugin config.
|
||||
"""
|
||||
self._log = log
|
||||
self._plugin_dir = plugin_dir
|
||||
self.whitelist = whitelist
|
||||
self.c14n_branches = c14n_branches
|
||||
self.canonicalize = canonicalize
|
||||
|
||||
@classmethod
|
||||
def from_config(
|
||||
cls,
|
||||
config: ConfigView,
|
||||
log: Logger,
|
||||
plugin_dir: Path,
|
||||
) -> DataFileLoader:
|
||||
"""Create a DataFileLoader from plugin configuration.
|
||||
|
||||
Reads config values and loads all data files during construction.
|
||||
"""
|
||||
# Default paths
|
||||
default_whitelist = str(plugin_dir / "genres.txt")
|
||||
default_tree = str(plugin_dir / "genres-tree.yaml")
|
||||
|
||||
# Load whitelist
|
||||
whitelist = cls._load_whitelist(
|
||||
log, config["whitelist"].get(), default_whitelist
|
||||
)
|
||||
|
||||
# Load tree
|
||||
c14n_branches, canonicalize = cls._load_tree(
|
||||
log,
|
||||
config["canonical"].get(),
|
||||
default_tree,
|
||||
config["prefer_specific"].get(),
|
||||
)
|
||||
|
||||
return cls(log, plugin_dir, whitelist, c14n_branches, canonicalize)
|
||||
|
||||
@staticmethod
|
||||
def _load_whitelist(
|
||||
log: Logger, config_value: str | bool | None, default_path: str
|
||||
) -> set[str]:
|
||||
"""Load the whitelist from a text file.
|
||||
|
||||
Returns set of valid genre names (lowercase).
|
||||
"""
|
||||
whitelist = set()
|
||||
wl_filename = config_value
|
||||
if wl_filename in (True, "", None): # Indicates the default whitelist.
|
||||
wl_filename = default_path
|
||||
if wl_filename:
|
||||
log.debug("Loading whitelist {}", wl_filename)
|
||||
text = Path(wl_filename).expanduser().read_text(encoding="utf-8")
|
||||
for line in text.splitlines():
|
||||
if (line := line.strip().lower()) and not line.startswith("#"):
|
||||
whitelist.add(line)
|
||||
|
||||
return whitelist
|
||||
|
||||
@staticmethod
|
||||
def _load_tree(
|
||||
log: Logger,
|
||||
config_value: str | bool | None,
|
||||
default_path: str,
|
||||
prefer_specific: bool,
|
||||
) -> tuple[list[list[str]], bool]:
|
||||
"""Load the canonicalization tree from a YAML file.
|
||||
|
||||
Returns tuple of (branches, canonicalize_enabled).
|
||||
"""
|
||||
c14n_branches: list[list[str]] = []
|
||||
c14n_filename = config_value
|
||||
canonicalize = c14n_filename is not False
|
||||
# Default tree
|
||||
if c14n_filename in (True, "", None) or (
|
||||
# prefer_specific requires a tree, load default tree
|
||||
not canonicalize and prefer_specific
|
||||
):
|
||||
c14n_filename = default_path
|
||||
# Read the tree
|
||||
if c14n_filename:
|
||||
log.debug("Loading canonicalization tree {}", c14n_filename)
|
||||
with Path(c14n_filename).expanduser().open(encoding="utf-8") as f:
|
||||
genres_tree = yaml.safe_load(f)
|
||||
DataFileLoader.flatten_tree(genres_tree, [], c14n_branches)
|
||||
return c14n_branches, canonicalize
|
||||
|
||||
@staticmethod
|
||||
def flatten_tree(
|
||||
elem: dict | list | str,
|
||||
path: list[str],
|
||||
branches: list[list[str]],
|
||||
) -> None:
|
||||
"""Flatten nested lists/dictionaries into lists of strings (branches)."""
|
||||
if not path:
|
||||
path = []
|
||||
|
||||
if isinstance(elem, dict):
|
||||
for k, v in elem.items():
|
||||
DataFileLoader.flatten_tree(v, [*path, k], branches)
|
||||
elif isinstance(elem, list):
|
||||
for sub in elem:
|
||||
DataFileLoader.flatten_tree(sub, path, branches)
|
||||
else:
|
||||
branches.append([*path, str(elem)])
|
||||
41
beetsplug/lastgenre/utils.py
Normal file
41
beetsplug/lastgenre/utils.py
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
# This file is part of beets.
|
||||
# Copyright 2026, J0J0 Todos.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
|
||||
"""Shared utility functions for the lastgenre plugin."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from beets import config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
|
||||
from beets.logging import Logger
|
||||
|
||||
|
||||
def make_tunelog(log: Logger) -> Callable[..., None]:
|
||||
"""Create a tunelog function bound to a specific logger.
|
||||
|
||||
Returns a callable that logs tuning messages at DEBUG level when
|
||||
verbosity is high enough.
|
||||
"""
|
||||
|
||||
def tunelog(msg: str, *args: Any, **kwargs: Any) -> None:
|
||||
if config["verbose"].as_number() >= 3:
|
||||
log.debug(msg, *args, **kwargs)
|
||||
|
||||
return tunelog
|
||||
|
|
@ -184,9 +184,9 @@ class LastGenrePluginTest(PluginTestCase):
|
|||
return [tag1, tag2]
|
||||
|
||||
plugin = lastgenre.LastGenrePlugin()
|
||||
res = plugin._tags_for(MockPylastObj())
|
||||
res = plugin.client._tags_for(MockPylastObj())
|
||||
assert res == ["pop", "rap"]
|
||||
res = plugin._tags_for(MockPylastObj(), min_weight=50)
|
||||
res = plugin.client._tags_for(MockPylastObj(), min_weight=50)
|
||||
assert res == ["pop"]
|
||||
|
||||
def test_sort_by_depth(self):
|
||||
|
|
@ -583,9 +583,9 @@ def test_get_genre(config_values, item_genre, mock_genres, expected_result):
|
|||
# Mock the last.fm fetchers. When whitelist enabled, we can assume only
|
||||
# whitelisted genres get returned, the plugin's _resolve_genre method
|
||||
# ensures it.
|
||||
lastgenre.LastGenrePlugin.fetch_track_genre = mock_fetch_track_genre
|
||||
lastgenre.LastGenrePlugin.fetch_album_genre = mock_fetch_album_genre
|
||||
lastgenre.LastGenrePlugin.fetch_artist_genre = mock_fetch_artist_genre
|
||||
lastgenre.client.LastFmClient.fetch_track_genre = mock_fetch_track_genre
|
||||
lastgenre.client.LastFmClient.fetch_album_genre = mock_fetch_album_genre
|
||||
lastgenre.client.LastFmClient.fetch_artist_genre = mock_fetch_artist_genre
|
||||
|
||||
# Initialize plugin instance and item
|
||||
plugin = lastgenre.LastGenrePlugin()
|
||||
|
|
|
|||
Loading…
Reference in a new issue