diff --git a/beets/autotag/distance.py b/beets/autotag/distance.py index d146c27f0..39d16858f 100644 --- a/beets/autotag/distance.py +++ b/beets/autotag/distance.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any from jellyfish import levenshtein_distance from unidecode import unidecode -from beets import config, plugins +from beets import config, metadata_plugins from beets.util import as_string, cached_classproperty, get_most_common_tags if TYPE_CHECKING: @@ -409,7 +409,7 @@ def track_distance( dist.add_expr("medium", item.disc != track_info.medium) # Plugins. - dist.update(plugins.track_distance(item, track_info)) + dist.update(metadata_plugins.track_distance(item, track_info)) return dist @@ -526,6 +526,6 @@ def distance( dist.add("unmatched_tracks", 1.0) # Plugins. - dist.update(plugins.album_distance(items, album_info, mapping)) + dist.update(metadata_plugins.album_distance(items, album_info, mapping)) return dist diff --git a/beets/autotag/match.py b/beets/autotag/match.py index 64572cf3b..e74d21755 100644 --- a/beets/autotag/match.py +++ b/beets/autotag/match.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar import lap import numpy as np -from beets import config, logging, plugins +from beets import config, logging, metadata_plugins from beets.autotag import AlbumInfo, AlbumMatch, TrackInfo, TrackMatch, hooks from beets.util import get_most_common_tags @@ -119,7 +119,7 @@ def match_by_id(items: Iterable[Item]) -> AlbumInfo | None: return None # If all album IDs are equal, look up the album. log.debug("Searching for discovered album ID: {0}", first) - return plugins.album_for_id(first) + return metadata_plugins.album_for_id(first) def _recommendation( @@ -274,7 +274,7 @@ def tag_album( if search_ids: for search_id in search_ids: log.debug("Searching for album ID: {0}", search_id) - if info := plugins.album_for_id(search_id): + if info := metadata_plugins.album_for_id(search_id): _add_candidate(items, candidates, info) # Use existing metadata or text search. @@ -311,7 +311,7 @@ def tag_album( log.debug("Album might be VA: {0}", va_likely) # Get the results from the data sources. - for matched_candidate in plugins.candidates( + for matched_candidate in metadata_plugins.candidates( items, search_artist, search_album, va_likely ): _add_candidate(items, candidates, matched_candidate) @@ -346,7 +346,7 @@ def tag_item( if trackids: for trackid in trackids: log.debug("Searching for track ID: {0}", trackid) - if info := plugins.track_for_id(trackid): + if info := metadata_plugins.track_for_id(trackid): dist = track_distance(item, info, incl_artist=True) candidates[info.track_id] = hooks.TrackMatch(dist, info) # If this is a good match, then don't keep searching. @@ -372,7 +372,7 @@ def tag_item( log.debug("Item search terms: {0} - {1}", search_artist, search_title) # Get and evaluate candidate metadata. - for track_info in plugins.item_candidates( + for track_info in metadata_plugins.item_candidates( item, search_artist, search_title ): dist = track_distance(item, track_info, incl_artist=True) diff --git a/beets/metadata_plugins.py b/beets/metadata_plugins.py new file mode 100644 index 000000000..5b11dc4ec --- /dev/null +++ b/beets/metadata_plugins.py @@ -0,0 +1,397 @@ +"""Metadata source plugin interface. + +This allows beets to lookup metadata from various sources. We define +a common interface for all metadata sources which need to be +implemented as plugins. +""" + +from __future__ import annotations + +import abc +import inspect +import re +import sys +import warnings +from typing import TYPE_CHECKING, Generic, Literal, Sequence, TypedDict, TypeVar + +from beets.util import cached_classproperty +from beets.util.id_extractors import extract_release_id + +from .plugins import BeetsPlugin, find_plugins, notify_info_yielded, send + +if sys.version_info >= (3, 11): + from typing import NotRequired +else: + from typing_extensions import NotRequired + +if TYPE_CHECKING: + from collections.abc import Iterable + + from confuse import ConfigView + + from .autotag import Distance + from .autotag.hooks import AlbumInfo, Item, TrackInfo + + +def find_metadata_source_plugins() -> list[MetadataSourcePlugin]: + """Returns a list of MetadataSourcePlugin subclass instances + + Resolved from all currently loaded beets plugins. + """ + + all_plugins = find_plugins() + metadata_plugins: list[MetadataSourcePlugin | BeetsPlugin] = [] + for plugin in all_plugins: + if isinstance(plugin, MetadataSourcePlugin): + metadata_plugins.append(plugin) + elif hasattr(plugin, "data_source"): + # TODO: Remove this in the future major release, v3.0.0 + warnings.warn( + f"{plugin.__class__.__name__} is used as a legacy metadata source. " + "It should extend MetadataSourcePlugin instead of BeetsPlugin. " + "Support for this will be removed in the v3.0.0 release!", + DeprecationWarning, + stacklevel=2, + ) + metadata_plugins.append(plugin) + + # typeignore: BeetsPlugin is not a MetadataSourcePlugin (legacy support) + return metadata_plugins # type: ignore[return-value] + + +@notify_info_yielded("albuminfo_received") +def candidates(*args, **kwargs) -> Iterable[AlbumInfo]: + """Return matching album candidates from all metadata source plugins.""" + for plugin in find_metadata_source_plugins(): + yield from plugin.candidates(*args, **kwargs) + + +@notify_info_yielded("trackinfo_received") +def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]: + """Return matching track candidates fromm all metadata source plugins.""" + for plugin in find_metadata_source_plugins(): + yield from plugin.item_candidates(*args, **kwargs) + + +def album_for_id(_id: str) -> AlbumInfo | None: + """Get AlbumInfo object for the given ID string. + + A single ID can yield just a single album, so we return the first match. + """ + for plugin in find_metadata_source_plugins(): + if info := plugin.album_for_id(album_id=_id): + send("albuminfo_received", info=info) + return info + + return None + + +def track_for_id(_id: str) -> TrackInfo | None: + """Get TrackInfo object for the given ID string. + + A single ID can yield just a single track, so we return the first match. + """ + for plugin in find_metadata_source_plugins(): + if info := plugin.track_for_id(_id): + send("trackinfo_received", info=info) + return info + + return None + + +def track_distance(item: Item, info: TrackInfo) -> Distance: + """Returns the track distance for an item and trackinfo. + + Returns a Distance object is populated by all metadata source plugins + that implement the :py:meth:`MetadataSourcePlugin.track_distance` method. + """ + from beets.autotag.distance import Distance + + dist = Distance() + for plugin in find_metadata_source_plugins(): + dist.update(plugin.track_distance(item, info)) + return dist + + +def album_distance( + items: Sequence[Item], + album_info: AlbumInfo, + mapping: dict[Item, TrackInfo], +) -> Distance: + """Returns the album distance calculated by plugins.""" + from beets.autotag.distance import Distance + + dist = Distance() + for plugin in find_metadata_source_plugins(): + dist.update(plugin.album_distance(items, album_info, mapping)) + return dist + + +def _get_distance( + config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo +) -> Distance: + """Returns the ``data_source`` weight and the maximum source weight + for albums or individual tracks. + """ + from beets.autotag.distance import Distance + + dist = Distance() + if info.data_source == data_source: + dist.add("source", config["source_weight"].as_number()) + return dist + + +class MetadataSourcePlugin(BeetsPlugin, metaclass=abc.ABCMeta): + """A plugin that provides metadata from a specific source. + + This base class implements a contract for plugins that provide metadata + from a specific source. The plugin must implement the methods to search for albums + and tracks, and to retrieve album and track information by ID. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.config.add({"source_weight": 0.5}) + + @abc.abstractmethod + def album_for_id(self, album_id: str) -> AlbumInfo | None: + """Return :py:class:`AlbumInfo` object or None if no matching release was + found.""" + raise NotImplementedError + + @abc.abstractmethod + def track_for_id(self, track_id: str) -> TrackInfo | None: + """Return a :py:class:`TrackInfo` object or None if no matching release was + found. + """ + raise NotImplementedError + + # ---------------------------------- search ---------------------------------- # + + @abc.abstractmethod + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterable[AlbumInfo]: + """Return :py:class:`AlbumInfo` candidates that match the given album. + + Used in the autotag functionality to search for albums. + + :param items: List of items in the album + :param artist: Album artist + :param album: Album name + :param va_likely: Whether the album is likely to be by various artists + """ + raise NotImplementedError + + @abc.abstractmethod + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: + """Return :py:class:`TrackInfo` candidates that match the given track. + + Used in the autotag functionality to search for tracks. + + :param item: Track item + :param artist: Track artist + :param title: Track title + """ + raise NotImplementedError + + def albums_for_ids(self, ids: Sequence[str]) -> Iterable[AlbumInfo | None]: + """Batch lookup of album metadata for a list of album IDs. + + Given a list of album identifiers, yields corresponding AlbumInfo objects. + Missing albums result in None values in the output iterator. + Plugins may implement this for optimized batched lookups instead of + single calls to album_for_id. + """ + + return (self.album_for_id(id) for id in ids) + + def tracks_for_ids(self, ids: Sequence[str]) -> Iterable[TrackInfo | None]: + """Batch lookup of track metadata for a list of track IDs. + + Given a list of track identifiers, yields corresponding TrackInfo objects. + Missing tracks result in None values in the output iterator. + Plugins may implement this for optimized batched lookups instead of + single calls to track_for_id. + """ + + return (self.track_for_id(id) for id in ids) + + def album_distance( + self, + items: Sequence[Item], + album_info: AlbumInfo, + mapping: dict[Item, TrackInfo], + ) -> Distance: + """Calculate the distance for an album based on its items and album info.""" + return _get_distance( + data_source=self.data_source, info=album_info, config=self.config + ) + + def track_distance( + self, + item: Item, + info: TrackInfo, + ) -> Distance: + """Calculate the distance for a track based on its item and track info.""" + return _get_distance( + data_source=self.data_source, info=info, config=self.config + ) + + @cached_classproperty + def data_source(cls) -> str: + """The data source name for this plugin. + + This is inferred from the plugin name. + """ + return cls.__name__.replace("Plugin", "") # type: ignore[attr-defined] + + def _extract_id(self, url: str) -> str | None: + """Extract an ID from a URL for this metadata source plugin. + + Uses the plugin's data source name to determine the ID format and + extracts the ID from a given URL. + """ + return extract_release_id(self.data_source, url) + + @staticmethod + def get_artist( + artists: Iterable[dict[str | int, str]], + id_key: str | int = "id", + name_key: str | int = "name", + join_key: str | int | None = None, + ) -> tuple[str, str | None]: + """Returns an artist string (all artists) and an artist_id (the main + artist) for a list of artist object dicts. + + For each artist, this function moves articles (such as 'a', 'an', + and 'the') to the front and strips trailing disambiguation numbers. It + returns a tuple containing the comma-separated string of all + normalized artists and the ``id`` of the main/first artist. + Alternatively a keyword can be used to combine artists together into a + single string by passing the join_key argument. + + :param artists: Iterable of artist dicts or lists returned by API. + :param id_key: Key or index corresponding to the value of ``id`` for + the main/first artist. Defaults to 'id'. + :param name_key: Key or index corresponding to values of names + to concatenate for the artist string (containing all artists). + Defaults to 'name'. + :param join_key: Key or index corresponding to a field containing a + keyword to use for combining artists into a single string, for + example "Feat.", "Vs.", "And" or similar. The default is None + which keeps the default behaviour (comma-separated). + :return: Normalized artist string. + """ + artist_id = None + artist_string = "" + artists = list(artists) # In case a generator was passed. + total = len(artists) + for idx, artist in enumerate(artists): + if not artist_id: + artist_id = artist[id_key] + name = artist[name_key] + # Strip disambiguation number. + name = re.sub(r" \(\d+\)$", "", name) + # Move articles to the front. + name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I) + # Use a join keyword if requested and available. + if idx < (total - 1): # Skip joining on last. + if join_key and artist.get(join_key, None): + name += f" {artist[join_key]} " + else: + name += ", " + artist_string += name + + return artist_string, artist_id + + +class IDResponse(TypedDict): + """Response from the API containing an ID.""" + + id: str + + +class SearchFilter(TypedDict): + artist: NotRequired[str] + album: NotRequired[str] + + +R = TypeVar("R", bound=IDResponse) + + +class SearchApiMetadataSourcePlugin( + Generic[R], MetadataSourcePlugin, metaclass=abc.ABCMeta +): + """Helper class to implement a metadata source plugin with an API. + + Plugins using this ABC must implement an API search method to + retrieve album and track information by ID, + i.e. `album_for_id` and `track_for_id`, and a search method to + perform a search on the API. The search method should return a list + of identifiers for the requested type (album or track). + """ + + @abc.abstractmethod + def _search_api( + self, + query_type: Literal["album", "track"], + filters: SearchFilter, + keywords: str = "", + ) -> Sequence[R]: + """Perform a search on the API. + + :param query_type: The type of query to perform. + :param filters: A dictionary of filters to apply to the search. + :param keywords: Additional keywords to include in the search. + + Should return a list of identifiers for the requested type (album or track). + """ + raise NotImplementedError + + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterable[AlbumInfo]: + query_filters: SearchFilter = {"album": album} + if not va_likely: + query_filters["artist"] = artist + + results = self._search_api("album", query_filters) + if not results: + return [] + + return filter( + None, self.albums_for_ids([result["id"] for result in results]) + ) + + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: + results = self._search_api("track", {"artist": artist}, keywords=title) + if not results: + return [] + + return filter( + None, + self.tracks_for_ids([result["id"] for result in results if result]), + ) + + +# Dynamically copy methods to BeetsPlugin for legacy support +# TODO: Remove this in the future major release, v3.0.0 + +for name, method in inspect.getmembers( + MetadataSourcePlugin, predicate=inspect.isfunction +): + if not hasattr(BeetsPlugin, name): + setattr(BeetsPlugin, name, method) diff --git a/beets/plugins.py b/beets/plugins.py index 983d15402..821a96152 100644 --- a/beets/plugins.py +++ b/beets/plugins.py @@ -23,22 +23,13 @@ import sys import traceback from collections import defaultdict from functools import wraps -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Generic, - Literal, - Sequence, - TypedDict, - TypeVar, -) +from types import GenericAlias +from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar import mediafile import beets from beets import logging -from beets.util.id_extractors import extract_release_id if TYPE_CHECKING: from beets.event_types import EventType @@ -54,8 +45,6 @@ if TYPE_CHECKING: from confuse import ConfigView - from beets.autotag import AlbumInfo, TrackInfo - from beets.autotag.distance import Distance from beets.dbcore import Query from beets.dbcore.db import FieldQueryType from beets.dbcore.types import Type @@ -115,7 +104,7 @@ class PluginLogFilter(logging.Filter): # Managing the plugins themselves. -class BeetsPlugin: +class BeetsPlugin(metaclass=abc.ABCMeta): """The base class for all beets plugins. Plugins provide functionality by defining a subclass of BeetsPlugin and overriding the abstract methods defined here. @@ -218,66 +207,6 @@ class BeetsPlugin: """Return a dict mapping prefixes to Query subclasses.""" return {} - def track_distance( - self, - item: Item, - info: TrackInfo, - ) -> Distance: - """Should return a Distance object to be added to the - distance for every track comparison. - """ - from beets.autotag.distance import Distance - - return Distance() - - def album_distance( - self, - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], - ) -> Distance: - """Should return a Distance object to be added to the - distance for every album-level comparison. - """ - from beets.autotag.distance import Distance - - return Distance() - - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool - ) -> Iterable[AlbumInfo]: - """Return :py:class:`AlbumInfo` candidates that match the given album. - - :param items: List of items in the album - :param artist: Album artist - :param album: Album name - :param va_likely: Whether the album is likely to be by various artists - """ - yield from () - - def item_candidates( - self, item: Item, artist: str, title: str - ) -> Iterable[TrackInfo]: - """Return :py:class:`TrackInfo` candidates that match the given track. - - :param item: Track item - :param artist: Track artist - :param title: Track title - """ - yield from () - - def album_for_id(self, album_id: str) -> AlbumInfo | None: - """Return an AlbumInfo object or None if no matching release was - found. - """ - return None - - def track_for_id(self, track_id: str) -> TrackInfo | None: - """Return a TrackInfo object or None if no matching release was - found. - """ - return None - def add_media_field( self, name: str, descriptor: mediafile.MediaField ) -> None: @@ -369,10 +298,13 @@ def load_plugins(names: Sequence[str] = ()) -> None: else: for obj in getattr(namespace, name).__dict__.values(): if ( - isinstance(obj, type) + inspect.isclass(obj) + and not isinstance( + obj, GenericAlias + ) # seems to be needed for python <= 3.9 only and issubclass(obj, BeetsPlugin) and obj != BeetsPlugin - and obj != MetadataSourcePlugin + and not inspect.isabstract(obj) and obj not in _classes ): _classes.add(obj) @@ -456,32 +388,6 @@ def named_queries(model_cls: type[AnyModel]) -> dict[str, FieldQueryType]: return queries -def track_distance(item: Item, info: TrackInfo) -> Distance: - """Gets the track distance calculated by all loaded plugins. - Returns a Distance object. - """ - from beets.autotag.distance import Distance - - dist = Distance() - for plugin in find_plugins(): - dist.update(plugin.track_distance(item, info)) - return dist - - -def album_distance( - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], -) -> Distance: - """Returns the album distance calculated by plugins.""" - from beets.autotag.distance import Distance - - dist = Distance() - for plugin in find_plugins(): - dist.update(plugin.album_distance(items, album_info, mapping)) - return dist - - def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]: """Makes a generator send the event 'event' every time it yields. This decorator is supposed to decorate a generator, but any function @@ -502,46 +408,6 @@ def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]: return decorator -@notify_info_yielded("albuminfo_received") -def candidates(*args, **kwargs) -> Iterable[AlbumInfo]: - """Return matching album candidates from all plugins.""" - for plugin in find_plugins(): - yield from plugin.candidates(*args, **kwargs) - - -@notify_info_yielded("trackinfo_received") -def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]: - """Return matching track candidates from all plugins.""" - for plugin in find_plugins(): - yield from plugin.item_candidates(*args, **kwargs) - - -def album_for_id(_id: str) -> AlbumInfo | None: - """Get AlbumInfo object for the given ID string. - - A single ID can yield just a single album, so we return the first match. - """ - for plugin in find_plugins(): - if info := plugin.album_for_id(_id): - send("albuminfo_received", info=info) - return info - - return None - - -def track_for_id(_id: str) -> TrackInfo | None: - """Get TrackInfo object for the given ID string. - - A single ID can yield just a single track, so we return the first match. - """ - for plugin in find_plugins(): - if info := plugin.track_for_id(_id): - send("trackinfo_received", info=info) - return info - - return None - - def template_funcs() -> TFuncMap[str]: """Get all the template functions declared by plugins as a dictionary. @@ -656,20 +522,6 @@ def feat_tokens(for_artist: bool = True) -> str: ) -def get_distance( - config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo -) -> Distance: - """Returns the ``data_source`` weight and the maximum source weight - for albums or individual tracks. - """ - from beets.autotag.distance import Distance - - dist = Distance() - if info.data_source == data_source: - dist.add("source", config["source_weight"].as_number()) - return dist - - def apply_item_changes( lib: Library, item: Item, move: bool, pretend: bool, write: bool ) -> None: @@ -695,149 +547,3 @@ def apply_item_changes( item.try_write() item.store() - - -class Response(TypedDict): - """A dictionary with the response of a plugin API call. - - May be extended by plugins to include additional information, but `id` - is required. - """ - - id: str - - -R = TypeVar("R", bound=Response) - - -class MetadataSourcePlugin(Generic[R], BeetsPlugin, metaclass=abc.ABCMeta): - def __init__(self): - super().__init__() - self.config.add({"source_weight": 0.5}) - - @property - @abc.abstractmethod - def data_source(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def search_url(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def album_url(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def track_url(self) -> str: - raise NotImplementedError - - @abc.abstractmethod - def _search_api( - self, - query_type: Literal["album", "track"], - filters: dict[str, str], - keywords: str = "", - ) -> Sequence[R]: - raise NotImplementedError - - @abc.abstractmethod - def album_for_id(self, album_id: str) -> AlbumInfo | None: - raise NotImplementedError - - @abc.abstractmethod - def track_for_id(self, track_id: str) -> TrackInfo | None: - raise NotImplementedError - - @staticmethod - def get_artist( - artists, - id_key: str | int = "id", - name_key: str | int = "name", - join_key: str | int | None = None, - ) -> tuple[str, str | None]: - """Returns an artist string (all artists) and an artist_id (the main - artist) for a list of artist object dicts. - - For each artist, this function moves articles (such as 'a', 'an', - and 'the') to the front and strips trailing disambiguation numbers. It - returns a tuple containing the comma-separated string of all - normalized artists and the ``id`` of the main/first artist. - Alternatively a keyword can be used to combine artists together into a - single string by passing the join_key argument. - - :param artists: Iterable of artist dicts or lists returned by API. - :type artists: list[dict] or list[list] - :param id_key: Key or index corresponding to the value of ``id`` for - the main/first artist. Defaults to 'id'. - :param name_key: Key or index corresponding to values of names - to concatenate for the artist string (containing all artists). - Defaults to 'name'. - :param join_key: Key or index corresponding to a field containing a - keyword to use for combining artists into a single string, for - example "Feat.", "Vs.", "And" or similar. The default is None - which keeps the default behaviour (comma-separated). - :return: Normalized artist string. - """ - artist_id = None - artist_string = "" - artists = list(artists) # In case a generator was passed. - total = len(artists) - for idx, artist in enumerate(artists): - if not artist_id: - artist_id = artist[id_key] - name = artist[name_key] - # Strip disambiguation number. - name = re.sub(r" \(\d+\)$", "", name) - # Move articles to the front. - name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I) - # Use a join keyword if requested and available. - if idx < (total - 1): # Skip joining on last. - if join_key and artist.get(join_key, None): - name += f" {artist[join_key]} " - else: - name += ", " - artist_string += name - - return artist_string, artist_id - - def _get_id(self, id_string: str) -> str | None: - """Parse release ID from the given ID string.""" - return extract_release_id(self.data_source.lower(), id_string) - - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool - ) -> Iterable[AlbumInfo]: - query_filters = {"album": album} - if not va_likely: - query_filters["artist"] = artist - for result in self._search_api("album", query_filters): - if info := self.album_for_id(result["id"]): - yield info - - def item_candidates( - self, item: Item, artist: str, title: str - ) -> Iterable[TrackInfo]: - for result in self._search_api( - "track", {"artist": artist}, keywords=title - ): - if info := self.track_for_id(result["id"]): - yield info - - def album_distance( - self, - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], - ) -> Distance: - return get_distance( - data_source=self.data_source, info=album_info, config=self.config - ) - - def track_distance(self, item: Item, info: TrackInfo) -> Distance: - return get_distance( - data_source=self.data_source, info=info, config=self.config - ) diff --git a/beets/test/helper.py b/beets/test/helper.py index db753a760..4f26e8448 100644 --- a/beets/test/helper.py +++ b/beets/test/helper.py @@ -799,10 +799,12 @@ class AutotagStub: def install(self): self.patchers = [ - patch("beets.plugins.album_for_id", lambda *_: None), - patch("beets.plugins.track_for_id", lambda *_: None), - patch("beets.plugins.candidates", self.candidates), - patch("beets.plugins.item_candidates", self.item_candidates), + patch("beets.metadata_plugins.album_for_id", lambda *_: None), + patch("beets.metadata_plugins.track_for_id", lambda *_: None), + patch("beets.metadata_plugins.candidates", self.candidates), + patch( + "beets.metadata_plugins.item_candidates", self.item_candidates + ), ] for p in self.patchers: p.start() diff --git a/beets/util/id_extractors.py b/beets/util/id_extractors.py index bbe2c32a4..6cdb787d1 100644 --- a/beets/util/id_extractors.py +++ b/beets/util/id_extractors.py @@ -18,6 +18,11 @@ from __future__ import annotations import re +from beets import logging + +log = logging.getLogger("beets") + + PATTERN_BY_SOURCE = { "spotify": re.compile(r"(?:^|open\.spotify\.com/[^/]+/)([0-9A-Za-z]{22})"), "deezer": re.compile(r"(?:^|deezer\.com/)(?:[a-z]*/)?(?:[^/]+/)?(\d+)"), @@ -43,6 +48,21 @@ PATTERN_BY_SOURCE = { def extract_release_id(source: str, id_: str) -> str | None: - if m := PATTERN_BY_SOURCE[source].search(str(id_)): + """Extract the release ID from a given source and ID. + + Normally, the `id_` is a url string which contains the ID of the + release. This function extracts the ID from the URL based on the + `source` provided. + """ + try: + source_pattern = PATTERN_BY_SOURCE[source.lower()] + except KeyError: + log.debug( + f"Unknown source '{source}' for ID extraction. Returning id/url as-is." + ) + return id_ + + if m := source_pattern.search(str(id_)): return m[1] + return None diff --git a/beetsplug/beatport.py b/beetsplug/beatport.py index 20147b5cc..16e0dc896 100644 --- a/beetsplug/beatport.py +++ b/beetsplug/beatport.py @@ -14,9 +14,19 @@ """Adds Beatport release and track search support to the autotagger""" +from __future__ import annotations + import json import re from datetime import datetime, timedelta +from typing import ( + TYPE_CHECKING, + Iterable, + Iterator, + Literal, + Sequence, + overload, +) import confuse from requests_oauthlib import OAuth1Session @@ -29,7 +39,13 @@ from requests_oauthlib.oauth1_session import ( import beets import beets.ui from beets.autotag.hooks import AlbumInfo, TrackInfo -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance +from beets.metadata_plugins import MetadataSourcePlugin + +if TYPE_CHECKING: + from beets.importer import ImportSession + from beets.library import Item + + from ._typing import JSONDict AUTH_ERRORS = (TokenRequestDenied, TokenMissing, VerifierMissing) USER_AGENT = f"beets/{beets.__version__} +https://beets.io/" @@ -39,20 +55,6 @@ class BeatportAPIError(Exception): pass -class BeatportObject: - def __init__(self, data): - self.beatport_id = data["id"] - self.name = str(data["name"]) - if "releaseDate" in data: - self.release_date = datetime.strptime( - data["releaseDate"], "%Y-%m-%d" - ) - if "artists" in data: - self.artists = [(x["id"], str(x["name"])) for x in data["artists"]] - if "genres" in data: - self.genres = [str(x["name"]) for x in data["genres"]] - - class BeatportClient: _api_base = "https://oauth-api.beatport.com" @@ -77,7 +79,7 @@ class BeatportClient: ) self.api.headers = {"User-Agent": USER_AGENT} - def get_authorize_url(self): + def get_authorize_url(self) -> str: """Generate the URL for the user to authorize the application. Retrieves a request token from the Beatport API and returns the @@ -99,15 +101,13 @@ class BeatportClient: self._make_url("/identity/1/oauth/authorize") ) - def get_access_token(self, auth_data): + def get_access_token(self, auth_data: str) -> tuple[str, str]: """Obtain the final access token and secret for the API. :param auth_data: URL-encoded authorization data as displayed at the authorization url (obtained via :py:meth:`get_authorize_url`) after signing in - :type auth_data: unicode - :returns: OAuth resource owner key and secret - :rtype: (unicode, unicode) tuple + :returns: OAuth resource owner key and secret as unicode """ self.api.parse_authorization_response( "https://beets.io/auth?" + auth_data @@ -117,20 +117,37 @@ class BeatportClient: ) return access_data["oauth_token"], access_data["oauth_token_secret"] - def search(self, query, release_type="release", details=True): + @overload + def search( + self, + query: str, + release_type: Literal["release"], + details: bool = True, + ) -> Iterator[BeatportRelease]: ... + + @overload + def search( + self, + query: str, + release_type: Literal["track"], + details: bool = True, + ) -> Iterator[BeatportTrack]: ... + + def search( + self, + query: str, + release_type: Literal["release", "track"], + details=True, + ) -> Iterator[BeatportRelease | BeatportTrack]: """Perform a search of the Beatport catalogue. :param query: Query string - :param release_type: Type of releases to search for, can be - 'release' or 'track' + :param release_type: Type of releases to search for. :param details: Retrieve additional information about the search results. Currently this will fetch the tracklist for releases and do nothing for tracks :returns: Search results - :rtype: generator that yields - py:class:`BeatportRelease` or - :py:class:`BeatportTrack` """ response = self._get( "catalog/3/search", @@ -140,20 +157,18 @@ class BeatportClient: ) for item in response: if release_type == "release": + release = BeatportRelease(item) if details: - release = self.get_release(item["id"]) - else: - release = BeatportRelease(item) + release.tracks = self.get_release_tracks(item["id"]) yield release elif release_type == "track": yield BeatportTrack(item) - def get_release(self, beatport_id): + def get_release(self, beatport_id: str) -> BeatportRelease | None: """Get information about a single release. :param beatport_id: Beatport ID of the release :returns: The matching release - :rtype: :py:class:`BeatportRelease` """ response = self._get("/catalog/3/releases", id=beatport_id) if response: @@ -162,35 +177,33 @@ class BeatportClient: return release return None - def get_release_tracks(self, beatport_id): + def get_release_tracks(self, beatport_id: str) -> list[BeatportTrack]: """Get all tracks for a given release. :param beatport_id: Beatport ID of the release :returns: Tracks in the matching release - :rtype: list of :py:class:`BeatportTrack` """ response = self._get( "/catalog/3/tracks", releaseId=beatport_id, perPage=100 ) return [BeatportTrack(t) for t in response] - def get_track(self, beatport_id): + def get_track(self, beatport_id: str) -> BeatportTrack: """Get information about a single track. :param beatport_id: Beatport ID of the track :returns: The matching track - :rtype: :py:class:`BeatportTrack` """ response = self._get("/catalog/3/tracks", id=beatport_id) return BeatportTrack(response[0]) - def _make_url(self, endpoint): + def _make_url(self, endpoint: str) -> str: """Get complete URL for a given API endpoint.""" if not endpoint.startswith("/"): endpoint = "/" + endpoint return self._api_base + endpoint - def _get(self, endpoint, **kwargs): + def _get(self, endpoint: str, **kwargs) -> list[JSONDict]: """Perform a GET request on a given API endpoint. Automatically extracts result data from the response and converts HTTP @@ -211,48 +224,81 @@ class BeatportClient: return response.json()["results"] -class BeatportRelease(BeatportObject): - def __str__(self): - if len(self.artists) < 4: - artist_str = ", ".join(x[1] for x in self.artists) +class BeatportObject: + beatport_id: str + name: str + + release_date: datetime | None = None + + artists: list[tuple[str, str]] | None = None + # tuple of artist id and artist name + + def __init__(self, data: JSONDict): + self.beatport_id = str(data["id"]) # given as int in the response + self.name = str(data["name"]) + if "releaseDate" in data: + self.release_date = datetime.strptime( + data["releaseDate"], "%Y-%m-%d" + ) + if "artists" in data: + self.artists = [(x["id"], str(x["name"])) for x in data["artists"]] + if "genres" in data: + self.genres = [str(x["name"]) for x in data["genres"]] + + def artists_str(self) -> str | None: + if self.artists is not None: + if len(self.artists) < 4: + artist_str = ", ".join(x[1] for x in self.artists) + else: + artist_str = "Various Artists" else: - artist_str = "Various Artists" - return "".format( - artist_str, - self.name, - self.catalog_number, - ) + artist_str = None - def __repr__(self): - return str(self).encode("utf-8") + return artist_str + + +class BeatportRelease(BeatportObject): + catalog_number: str | None + label_name: str | None + category: str | None + url: str | None + genre: str | None + + tracks: list[BeatportTrack] | None = None + + def __init__(self, data: JSONDict): + super().__init__(data) + + self.catalog_number = data.get("catalogNumber") + self.label_name = data.get("label", {}).get("name") + self.category = data.get("category") + self.genre = data.get("genre") - def __init__(self, data): - BeatportObject.__init__(self, data) - if "catalogNumber" in data: - self.catalog_number = data["catalogNumber"] - if "label" in data: - self.label_name = data["label"]["name"] - if "category" in data: - self.category = data["category"] if "slug" in data: self.url = "https://beatport.com/release/{}/{}".format( data["slug"], data["id"] ) - self.genre = data.get("genre") + + def __str__(self) -> str: + return "".format( + self.artists_str(), + self.name, + self.catalog_number, + ) class BeatportTrack(BeatportObject): - def __str__(self): - artist_str = ", ".join(x[1] for x in self.artists) - return "".format( - artist_str, self.name, self.mix_name - ) + title: str | None + mix_name: str | None + length: timedelta + url: str | None + track_number: int | None + bpm: str | None + initial_key: str | None + genre: str | None - def __repr__(self): - return str(self).encode("utf-8") - - def __init__(self, data): - BeatportObject.__init__(self, data) + def __init__(self, data: JSONDict): + super().__init__(data) if "title" in data: self.title = str(data["title"]) if "mixName" in data: @@ -279,8 +325,8 @@ class BeatportTrack(BeatportObject): self.genre = str(data["genres"][0].get("name")) -class BeatportPlugin(BeetsPlugin): - data_source = "Beatport" +class BeatportPlugin(MetadataSourcePlugin): + _client: BeatportClient | None = None def __init__(self): super().__init__() @@ -294,12 +340,19 @@ class BeatportPlugin(BeetsPlugin): ) self.config["apikey"].redact = True self.config["apisecret"].redact = True - self.client = None self.register_listener("import_begin", self.setup) - def setup(self, session=None): - c_key = self.config["apikey"].as_str() - c_secret = self.config["apisecret"].as_str() + @property + def client(self) -> BeatportClient: + if self._client is None: + raise ValueError( + "Beatport client not initialized. Call setup() first." + ) + return self._client + + def setup(self, session: ImportSession): + c_key: str = self.config["apikey"].as_str() + c_secret: str = self.config["apisecret"].as_str() # Get the OAuth token from a file or log in. try: @@ -312,9 +365,9 @@ class BeatportPlugin(BeetsPlugin): token = tokendata["token"] secret = tokendata["secret"] - self.client = BeatportClient(c_key, c_secret, token, secret) + self._client = BeatportClient(c_key, c_secret, token, secret) - def authenticate(self, c_key, c_secret): + def authenticate(self, c_key: str, c_secret: str) -> tuple[str, str]: # Get the link for the OAuth page. auth_client = BeatportClient(c_key, c_secret) try: @@ -341,44 +394,30 @@ class BeatportPlugin(BeetsPlugin): return token, secret - def _tokenfile(self): + def _tokenfile(self) -> str: """Get the path to the JSON file for storing the OAuth token.""" return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True)) - def album_distance(self, items, album_info, mapping): - """Returns the Beatport source weight and the maximum source weight - for albums. - """ - return get_distance( - data_source=self.data_source, info=album_info, config=self.config - ) - - def track_distance(self, item, track_info): - """Returns the Beatport source weight and the maximum source weight - for individual tracks. - """ - return get_distance( - data_source=self.data_source, info=track_info, config=self.config - ) - - def candidates(self, items, artist, release, va_likely): - """Returns a list of AlbumInfo objects for beatport search results - matching release and artist (if not various). - """ + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterator[AlbumInfo]: if va_likely: - query = release + query = album else: - query = f"{artist} {release}" + query = f"{artist} {album}" try: - return self._get_releases(query) + yield from self._get_releases(query) except BeatportAPIError as e: self._log.debug("API Error: {0} (query: {1})", e, query) - return [] + return - def item_candidates(self, item, artist, title): - """Returns a list of TrackInfo objects for beatport search results - matching title and artist. - """ + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: query = f"{artist} {title}" try: return self._get_tracks(query) @@ -386,13 +425,13 @@ class BeatportPlugin(BeetsPlugin): self._log.debug("API Error: {0} (query: {1})", e, query) return [] - def album_for_id(self, release_id): + def album_for_id(self, album_id: str): """Fetches a release by its Beatport ID and returns an AlbumInfo object or None if the query is not a valid ID or release is not found. """ - self._log.debug("Searching for release {0}", release_id) + self._log.debug("Searching for release {0}", album_id) - if not (release_id := self._get_id(release_id)): + if not (release_id := self._extract_id(album_id)): self._log.debug("Not a valid Beatport release ID.") return None @@ -401,11 +440,12 @@ class BeatportPlugin(BeetsPlugin): return self._get_album_info(release) return None - def track_for_id(self, track_id): + def track_for_id(self, track_id: str): """Fetches a track by its Beatport ID and returns a TrackInfo object or None if the track is not a valid Beatport ID or track is not found. """ self._log.debug("Searching for track {0}", track_id) + # TODO: move to extractor match = re.search(r"(^|beatport\.com/track/.+/)(\d+)$", track_id) if not match: self._log.debug("Not a valid Beatport track ID.") @@ -415,7 +455,7 @@ class BeatportPlugin(BeetsPlugin): return self._get_track_info(bp_track) return None - def _get_releases(self, query): + def _get_releases(self, query: str) -> Iterator[AlbumInfo]: """Returns a list of AlbumInfo objects for a beatport search query.""" # Strip non-word characters from query. Things like "!" and "-" can # cause a query to return no results, even if they match the artist or @@ -425,16 +465,22 @@ class BeatportPlugin(BeetsPlugin): # Strip medium information from query, Things like "CD1" and "disk 1" # can also negate an otherwise positive result. query = re.sub(r"\b(CD|disc)\s*\d+", "", query, flags=re.I) - albums = [self._get_album_info(x) for x in self.client.search(query)] - return albums + for beatport_release in self.client.search(query, "release"): + if beatport_release is None: + continue + yield self._get_album_info(beatport_release) - def _get_album_info(self, release): + def _get_album_info(self, release: BeatportRelease) -> AlbumInfo: """Returns an AlbumInfo object for a Beatport Release object.""" - va = len(release.artists) > 3 + va = release.artists is not None and len(release.artists) > 3 artist, artist_id = self._get_artist(release.artists) if va: artist = "Various Artists" - tracks = [self._get_track_info(x) for x in release.tracks] + tracks: list[TrackInfo] = [] + if release.tracks is not None: + tracks = [self._get_track_info(x) for x in release.tracks] + + release_date = release.release_date return AlbumInfo( album=release.name, @@ -445,18 +491,18 @@ class BeatportPlugin(BeetsPlugin): tracks=tracks, albumtype=release.category, va=va, - year=release.release_date.year, - month=release.release_date.month, - day=release.release_date.day, label=release.label_name, catalognum=release.catalog_number, media="Digital", data_source=self.data_source, data_url=release.url, genre=release.genre, + year=release_date.year if release_date else None, + month=release_date.month if release_date else None, + day=release_date.day if release_date else None, ) - def _get_track_info(self, track): + def _get_track_info(self, track: BeatportTrack) -> TrackInfo: """Returns a TrackInfo object for a Beatport Track object.""" title = track.name if track.mix_name != "Original Mix": @@ -482,9 +528,7 @@ class BeatportPlugin(BeetsPlugin): """Returns an artist string (all artists) and an artist_id (the main artist) for a list of Beatport release or track artists. """ - return MetadataSourcePlugin.get_artist( - artists=artists, id_key=0, name_key=1 - ) + return self.get_artist(artists=artists, id_key=0, name_key=1) def _get_tracks(self, query): """Returns a list of TrackInfo objects for a Beatport query.""" diff --git a/beetsplug/chroma.py b/beetsplug/chroma.py index de3ac525a..21098ea81 100644 --- a/beetsplug/chroma.py +++ b/beetsplug/chroma.py @@ -19,12 +19,14 @@ autotagger. Requires the pyacoustid library. import re from collections import defaultdict from functools import cached_property, partial +from typing import Iterable import acoustid import confuse -from beets import config, plugins, ui, util +from beets import config, ui, util from beets.autotag.distance import Distance +from beets.metadata_plugins import MetadataSourcePlugin, TrackInfo from beetsplug.musicbrainz import MusicBrainzPlugin API_KEY = "1vOwZtEn" @@ -168,10 +170,8 @@ def _all_releases(items): yield release_id -class AcoustidPlugin(plugins.BeetsPlugin): +class AcoustidPlugin(MetadataSourcePlugin): def __init__(self): - super().__init__() - self.config.add( { "auto": True, @@ -210,7 +210,7 @@ class AcoustidPlugin(plugins.BeetsPlugin): self._log.debug("acoustid album candidates: {0}", len(albums)) return albums - def item_candidates(self, item, artist, title): + def item_candidates(self, item, artist, title) -> Iterable[TrackInfo]: if item.path not in _matches: return [] @@ -223,6 +223,14 @@ class AcoustidPlugin(plugins.BeetsPlugin): self._log.debug("acoustid item candidates: {0}", len(tracks)) return tracks + def album_for_id(self, *args, **kwargs): + # Lookup by fingerprint ID does not make too much sense. + return None + + def track_for_id(self, *args, **kwargs): + # Lookup by fingerprint ID does not make too much sense. + return None + def commands(self): submit_cmd = ui.Subcommand( "submit", help="submit Acoustid fingerprints" diff --git a/beetsplug/deezer.py b/beetsplug/deezer.py index 7e4896437..8815e3d59 100644 --- a/beetsplug/deezer.py +++ b/beetsplug/deezer.py @@ -26,16 +26,19 @@ import unidecode from beets import ui from beets.autotag import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response +from beets.metadata_plugins import ( + IDResponse, + SearchApiMetadataSourcePlugin, + SearchFilter, +) if TYPE_CHECKING: from beets.library import Item, Library - from beetsplug._typing import JSONDict + + from ._typing import JSONDict -class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): - data_source = "Deezer" - +class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]): item_types = { "deezer_track_rank": types.INTEGER, "deezer_track_id": types.INTEGER, @@ -63,7 +66,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): def album_for_id(self, album_id: str) -> AlbumInfo | None: """Fetch an album by its Deezer ID or URL.""" - if not (deezer_id := self._get_id(album_id)): + if not (deezer_id := self._extract_id(album_id)): return None album_url = f"{self.album_url}{deezer_id}" @@ -145,11 +148,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): ) def track_for_id(self, track_id: str) -> None | TrackInfo: - """Fetch a track by its Deezer ID or URL. + """Fetch a track by its Deezer ID or URL and return a + TrackInfo object or None if the track is not found. + + :param track_id: (Optional) Deezer ID or URL for the track. Either + ``track_id`` or ``track_data`` must be provided. - Returns a TrackInfo object or None if the track is not found. """ - if not (deezer_id := self._get_id(track_id)): + if not (deezer_id := self._extract_id(track_id)): self._log.debug("Invalid Deezer track_id: {}", track_id) return None @@ -162,11 +168,13 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): # Get album's tracks to set `track.index` (position on the entire # release) and `track.medium_total` (total number of tracks on # the track's disc). - album_tracks_obj = self.fetch_data( - self.album_url + str(track_data["album"]["id"]) + "/tracks" - ) - if album_tracks_obj is None: + if not ( + album_tracks_obj := self.fetch_data( + self.album_url + str(track_data["album"]["id"]) + "/tracks" + ) + ): return None + try: album_tracks_data = album_tracks_obj["data"] except KeyError: @@ -187,7 +195,6 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): """Convert a Deezer track object dict to a TrackInfo object. :param track_data: Deezer Track object dict - :return: TrackInfo object for track """ artist, artist_id = self.get_artist( track_data.get("contributors", [track_data["artist"]]) @@ -211,7 +218,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): @staticmethod def _construct_search_query( - filters: dict[str, str], keywords: str = "" + filters: SearchFilter, keywords: str = "" ) -> str: """Construct a query string with the specified filters and keywords to be provided to the Deezer Search API @@ -242,14 +249,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): "radio", "user", ], - filters: dict[str, str], + filters: SearchFilter, keywords="", - ) -> Sequence[Response]: + ) -> Sequence[IDResponse]: """Query the Deezer Search API for the specified ``keywords``, applying the provided ``filters``. - :param query_type: The Deezer Search API method to use. - :param keywords: (Optional) Query keywords to use. + :param filters: Field filters to apply. + :param keywords: Query keywords to use. :return: JSON data for the class:`Response ` object or None if no search results are returned. """ @@ -269,7 +276,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): e, ) return () - response_data = response.json().get("data", []) + response_data: Sequence[IDResponse] = response.json().get("data", []) self._log.debug( "Found {} result(s) from {} for '{}'", len(response_data), diff --git a/beetsplug/discogs.py b/beetsplug/discogs.py index 2408f3498..9765f317f 100644 --- a/beetsplug/discogs.py +++ b/beetsplug/discogs.py @@ -27,7 +27,7 @@ import time import traceback from functools import cache from string import ascii_lowercase -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence import confuse from discogs_client import Client, Master, Release @@ -40,8 +40,7 @@ import beets.ui from beets import config from beets.autotag.distance import string_dist from beets.autotag.hooks import AlbumInfo, TrackInfo -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance -from beets.util.id_extractors import extract_release_id +from beets.metadata_plugins import MetadataSourcePlugin if TYPE_CHECKING: from collections.abc import Callable, Iterable @@ -84,7 +83,7 @@ class ReleaseFormat(TypedDict): descriptions: list[str] | None -class DiscogsPlugin(BeetsPlugin): +class DiscogsPlugin(MetadataSourcePlugin): def __init__(self): super().__init__() self.config.add( @@ -169,20 +168,8 @@ class DiscogsPlugin(BeetsPlugin): return token, secret - def album_distance(self, items, album_info, mapping): - """Returns the album distance.""" - return get_distance( - data_source="Discogs", info=album_info, config=self.config - ) - - def track_distance(self, item, track_info): - """Returns the track distance.""" - return get_distance( - data_source="Discogs", info=track_info, config=self.config - ) - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool + self, items: Sequence[Item], artist: str, album: str, va_likely: bool ) -> Iterable[AlbumInfo]: return self.get_albums(f"{artist} {album}" if va_likely else album) @@ -217,7 +204,7 @@ class DiscogsPlugin(BeetsPlugin): """ self._log.debug("Searching for release {0}", album_id) - discogs_id = extract_release_id("discogs", album_id) + discogs_id = self._extract_id(album_id) if not discogs_id: return None @@ -272,7 +259,7 @@ class DiscogsPlugin(BeetsPlugin): exc_info=True, ) return [] - return map(self.get_album_info, releases) + return filter(None, map(self.get_album_info, releases)) @cache def get_master_year(self, master_id: str) -> int | None: @@ -334,7 +321,7 @@ class DiscogsPlugin(BeetsPlugin): self._log.warning("Release does not contain the required fields") return None - artist, artist_id = MetadataSourcePlugin.get_artist( + artist, artist_id = self.get_artist( [a.data for a in result.artists], join_key="join" ) album = re.sub(r" +", " ", result.title) @@ -359,7 +346,7 @@ class DiscogsPlugin(BeetsPlugin): else: genre = base_genre - discogs_albumid = extract_release_id("discogs", result.data.get("uri")) + discogs_albumid = self._extract_id(result.data.get("uri")) # Extract information for the optional AlbumInfo fields that are # contained on nested discogs fields. @@ -419,7 +406,7 @@ class DiscogsPlugin(BeetsPlugin): genre=genre, media=media, original_year=original_year, - data_source="Discogs", + data_source=self.data_source, data_url=data_url, discogs_albumid=discogs_albumid, discogs_labelid=labelid, @@ -638,7 +625,7 @@ class DiscogsPlugin(BeetsPlugin): title = f"{prefix}: {title}" track_id = None medium, medium_index, _ = self.get_track_index(track["position"]) - artist, artist_id = MetadataSourcePlugin.get_artist( + artist, artist_id = self.get_artist( track.get("artists", []), join_key="join" ) length = self.get_track_length(track["duration"]) diff --git a/beetsplug/mbsync.py b/beetsplug/mbsync.py index d38b25e9f..3f7daec6c 100644 --- a/beetsplug/mbsync.py +++ b/beetsplug/mbsync.py @@ -16,7 +16,7 @@ from collections import defaultdict -from beets import autotag, library, plugins, ui, util +from beets import autotag, library, metadata_plugins, ui, util from beets.plugins import BeetsPlugin, apply_item_changes @@ -78,7 +78,9 @@ class MBSyncPlugin(BeetsPlugin): ) continue - if not (track_info := plugins.track_for_id(item.mb_trackid)): + if not ( + track_info := metadata_plugins.track_for_id(item.mb_trackid) + ): self._log.info( "Recording ID not found: {0.mb_trackid} for track {0}", item ) @@ -99,7 +101,9 @@ class MBSyncPlugin(BeetsPlugin): self._log.info("Skipping album with no mb_albumid: {}", album) continue - if not (album_info := plugins.album_for_id(album.mb_albumid)): + if not ( + album_info := metadata_plugins.album_for_id(album.mb_albumid) + ): self._log.info( "Release ID {0.mb_albumid} not found for album {0}", album ) diff --git a/beetsplug/missing.py b/beetsplug/missing.py index 8c328e647..d0e956930 100644 --- a/beetsplug/missing.py +++ b/beetsplug/missing.py @@ -21,7 +21,7 @@ from collections.abc import Iterator import musicbrainzngs from musicbrainzngs.musicbrainz import MusicBrainzError -from beets import config, plugins +from beets import config, metadata_plugins from beets.dbcore import types from beets.library import Album, Item, Library from beets.plugins import BeetsPlugin @@ -222,7 +222,7 @@ class MissingPlugin(BeetsPlugin): item_mbids = {x.mb_trackid for x in album.items()} # fetch missing items # TODO: Implement caching that without breaking other stuff - if album_info := plugins.album_for_id(album.mb_albumid): + if album_info := metadata_plugins.album_for_id(album.mb_albumid): for track_info in album_info.tracks: if track_info.track_id not in item_mbids: self._log.debug( diff --git a/beetsplug/musicbrainz.py b/beetsplug/musicbrainz.py index e33cc4fce..b52e44b23 100644 --- a/beetsplug/musicbrainz.py +++ b/beetsplug/musicbrainz.py @@ -20,7 +20,7 @@ import traceback from collections import Counter from functools import cached_property from itertools import product -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Iterable, Sequence from urllib.parse import urljoin import musicbrainzngs @@ -28,11 +28,10 @@ import musicbrainzngs import beets import beets.autotag.hooks from beets import config, plugins, util -from beets.plugins import BeetsPlugin +from beets.metadata_plugins import MetadataSourcePlugin from beets.util.id_extractors import extract_release_id if TYPE_CHECKING: - from collections.abc import Iterator, Sequence from typing import Literal from beets.library import Item @@ -362,9 +361,7 @@ def _merge_pseudo_and_actual_album( return merged -class MusicBrainzPlugin(BeetsPlugin): - data_source = "Musicbrainz" - +class MusicBrainzPlugin(MetadataSourcePlugin): def __init__(self): """Set up the python-musicbrainz-ngs module according to settings from the beets configuration. This should be called at startup. @@ -421,7 +418,7 @@ class MusicBrainzPlugin(BeetsPlugin): medium=medium, medium_index=medium_index, medium_total=medium_total, - data_source="MusicBrainz", + data_source=self.data_source, data_url=track_url(recording["id"]), ) @@ -632,7 +629,7 @@ class MusicBrainzPlugin(BeetsPlugin): artists_sort=artists_sort_names, artist_credit=artist_credit_name, artists_credit=artists_credit_names, - data_source="MusicBrainz", + data_source=self.data_source, data_url=album_url(release["id"]), barcode=release.get("barcode"), ) @@ -767,7 +764,7 @@ class MusicBrainzPlugin(BeetsPlugin): return mb_field_by_tag def get_album_criteria( - self, items: list[Item], artist: str, album: str, va_likely: bool + self, items: Sequence[Item], artist: str, album: str, va_likely: bool ) -> dict[str, str]: criteria = { "release": album, @@ -813,12 +810,11 @@ class MusicBrainzPlugin(BeetsPlugin): def candidates( self, - items: list[Item], + items: Sequence[Item], artist: str, album: str, va_likely: bool, - extra_tags: dict[str, Any] | None = None, - ) -> Iterator[beets.autotag.hooks.AlbumInfo]: + ) -> Iterable[beets.autotag.hooks.AlbumInfo]: criteria = self.get_album_criteria(items, artist, album, va_likely) release_ids = (r["id"] for r in self._search_api("release", criteria)) @@ -826,7 +822,7 @@ class MusicBrainzPlugin(BeetsPlugin): def item_candidates( self, item: Item, artist: str, title: str - ) -> Iterator[beets.autotag.hooks.TrackInfo]: + ) -> Iterable[beets.autotag.hooks.TrackInfo]: criteria = {"artist": artist, "recording": title, "alias": title} yield from filter( @@ -841,7 +837,7 @@ class MusicBrainzPlugin(BeetsPlugin): MusicBrainzAPIError. """ self._log.debug("Requesting MusicBrainz release {}", album_id) - if not (albumid := extract_release_id("musicbrainz", album_id)): + if not (albumid := self._extract_id(album_id)): self._log.debug("Invalid MBID ({0}).", album_id) return None @@ -878,7 +874,7 @@ class MusicBrainzPlugin(BeetsPlugin): """Fetches a track by its MusicBrainz ID. Returns a TrackInfo object or None if no track is found. May raise a MusicBrainzAPIError. """ - if not (trackid := extract_release_id("musicbrainz", track_id)): + if not (trackid := self._extract_id(track_id)): self._log.debug("Invalid MBID ({0}).", track_id) return None diff --git a/beetsplug/spotify.py b/beetsplug/spotify.py index 36790b56b..7a4f4ec52 100644 --- a/beetsplug/spotify.py +++ b/beetsplug/spotify.py @@ -25,7 +25,7 @@ import json import re import time import webbrowser -from typing import TYPE_CHECKING, Any, Literal, Sequence +from typing import TYPE_CHECKING, Any, Literal, Sequence, Union import confuse import requests @@ -34,7 +34,12 @@ import unidecode from beets import ui from beets.autotag.hooks import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response +from beets.library import Library +from beets.metadata_plugins import ( + IDResponse, + SearchApiMetadataSourcePlugin, + SearchFilter, +) if TYPE_CHECKING: from beets.library import Library @@ -43,13 +48,41 @@ if TYPE_CHECKING: DEFAULT_WAITING_TIME = 5 -class SpotifyAPIError(Exception): +class SearchResponseAlbums(IDResponse): + """A response returned by the Spotify API. + + We only use items and disregard the pagination information. + i.e. res["albums"]["items"][0]. + + There are more fields in the response, but we only type + the ones we currently use. + + see https://developer.spotify.com/documentation/web-api/reference/search + """ + + album_type: str + available_markets: Sequence[str] + name: str + + +class SearchResponseTracks(IDResponse): + """A track response returned by the Spotify API.""" + + album: SearchResponseAlbums + available_markets: Sequence[str] + popularity: int + name: str + + +class APIError(Exception): pass -class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): - data_source = "Spotify" - +class SpotifyPlugin( + SearchApiMetadataSourcePlugin[ + Union[SearchResponseAlbums, SearchResponseTracks] + ] +): item_types = { "spotify_track_popularity": types.INTEGER, "spotify_acousticness": types.FLOAT, @@ -180,7 +213,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): """ if retry_count > max_retries: - raise SpotifyAPIError("Maximum retries reached.") + raise APIError("Maximum retries reached.") try: response = requests.request( @@ -194,14 +227,14 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return response.json() except requests.exceptions.ReadTimeout: self._log.error("ReadTimeout.") - raise SpotifyAPIError("Request timed out.") + raise APIError("Request timed out.") except requests.exceptions.ConnectionError as e: self._log.error(f"Network error: {e}") - raise SpotifyAPIError("Network error.") + raise APIError("Network error.") except requests.exceptions.RequestException as e: if e.response is None: self._log.error(f"Request failed: {e}") - raise SpotifyAPIError("Request failed.") + raise APIError("Request failed.") if e.response.status_code == 401: self._log.debug( f"{self.data_source} access token has expired. " @@ -215,7 +248,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): retry_count=retry_count + 1, ) elif e.response.status_code == 404: - raise SpotifyAPIError( + raise APIError( f"API Error: {e.response.status_code}\n" f"URL: {url}\nparams: {params}" ) @@ -235,18 +268,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): ) elif e.response.status_code == 503: self._log.error("Service Unavailable.") - raise SpotifyAPIError("Service Unavailable.") + raise APIError("Service Unavailable.") elif e.response.status_code == 502: self._log.error("Bad Gateway.") - raise SpotifyAPIError("Bad Gateway.") + raise APIError("Bad Gateway.") elif e.response is not None: - raise SpotifyAPIError( + raise APIError( f"{self.data_source} API error:\n{e.response.text}\n" f"URL:\n{url}\nparams:\n{params}" ) else: self._log.error(f"Request failed. Error: {e}") - raise SpotifyAPIError("Request failed.") + raise APIError("Request failed.") def album_for_id(self, album_id: str) -> AlbumInfo | None: """Fetch an album by its Spotify ID or URL and return an @@ -257,7 +290,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): :return: AlbumInfo object for album :rtype: beets.autotag.hooks.AlbumInfo or None """ - if not (spotify_id := self._get_id(album_id)): + if not (spotify_id := self._extract_id(album_id)): return None album_data = self._handle_response("get", self.album_url + spotify_id) @@ -360,7 +393,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): Returns a TrackInfo object or None if the track is not found. """ - if not (spotify_id := self._get_id(track_id)): + if not (spotify_id := self._extract_id(track_id)): self._log.debug("Invalid Spotify ID: {}", track_id) return None @@ -390,7 +423,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return track def _construct_search_query( - self, filters: dict[str, str], keywords: str = "" + self, filters: SearchFilter, keywords: str = "" ) -> str: """Construct a query string with the specified filters and keywords to be provided to the Spotify Search API @@ -400,9 +433,10 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): :param keywords: (Optional) Query keywords to use. :return: Query string to be provided to the Search API. """ + query_components = [ keywords, - " ".join(":".join((k, v)) for k, v in filters.items()), + " ".join(f"{k}:{v}" for k, v in filters.items()), ] query = " ".join([q for q in query_components if q]) if not isinstance(query, str): @@ -416,9 +450,9 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): def _search_api( self, query_type: Literal["album", "track"], - filters: dict[str, str], + filters: SearchFilter, keywords: str = "", - ) -> Sequence[Response]: + ) -> Sequence[SearchResponseAlbums | SearchResponseTracks]: """Query the Spotify Search API for the specified ``keywords``, applying the provided ``filters``. @@ -436,7 +470,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): self.search_url, params={"q": query, "type": query_type}, ) - except SpotifyAPIError as e: + except APIError as e: self._log.debug("Spotify API error: {}", e) return () response_data = response.get(query_type + "s", {}).get("items", []) @@ -557,7 +591,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): keywords = item[self.config["track_field"].get()] # Query the Web API for each track, look for the items' JSON data - query_filters = {"artist": artist, "album": album} + query_filters: SearchFilter = {"artist": artist, "album": album} response_data_tracks = self._search_api( query_type="track", keywords=keywords, filters=query_filters ) @@ -570,7 +604,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): continue # Apply market filter if requested - region_filter = self.config["region_filter"].get() + region_filter: str = self.config["region_filter"].get() if region_filter: response_data_tracks = [ track_data @@ -595,7 +629,11 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): len(response_data_tracks), ) chosen_result = max( - response_data_tracks, key=lambda x: x["popularity"] + response_data_tracks, + key=lambda x: x[ + # We are sure this is a track response! + "popularity" # type: ignore[typeddict-item] + ], ) results.append(chosen_result) @@ -691,16 +729,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): def track_info(self, track_id: str): """Fetch a track's popularity and external IDs using its Spotify ID.""" track_data = self._handle_response("get", self.track_url + track_id) + external_ids = track_data.get("external_ids", {}) + popularity = track_data.get("popularity") self._log.debug( "track_popularity: {} and track_isrc: {}", - track_data.get("popularity"), - track_data.get("external_ids").get("isrc"), + popularity, + external_ids.get("isrc"), ) return ( - track_data.get("popularity"), - track_data.get("external_ids").get("isrc"), - track_data.get("external_ids").get("ean"), - track_data.get("external_ids").get("upc"), + popularity, + external_ids.get("isrc"), + external_ids.get("ean"), + external_ids.get("upc"), ) def track_audio_features(self, track_id: str): @@ -709,6 +749,6 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return self._handle_response( "get", self.audio_features_url + track_id ) - except SpotifyAPIError as e: + except APIError as e: self._log.debug("Spotify API error: {}", e) return None diff --git a/docs/changelog.rst b/docs/changelog.rst index f9dafa00c..7fb81237e 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -66,9 +66,20 @@ For plugin developers: * The `fetchart` plugins has seen a few changes to function signatures and source registration in the process of introducing typings to the code. Custom art sources might need to be adapted. - +* We split the responsibilities of plugins into two base classes + #. :class:`beets.plugins.BeetsPlugin` + is the base class for all plugins, any plugin needs to inherit from this class. + #. :class:`beets.metadata_plugin.MetadataSourcePlugin` + allows plugins to act like metadata sources. E.g. used by the MusicBrainz plugin. All plugins + in the beets repo are opted into this class where applicable. If you are maintaining a plugin + that acts like a metadata source, i.e. you expose any of ``track_for_id``, + ``album_for_id``, ``candidates``, ``item_candidates``, ``album_distance``, ``track_distance`` methods, + please update your plugin to inherit from the new baseclass, as otherwise your plugin will + stop working with the next major release. + Other changes: +* Refactor: Split responsibilities of Plugins into MetaDataPlugins and general Plugins. * Documentation structure for auto generated API references changed slightly. Autogenerated API references are now located in the `docs/api` subdirectory. * :doc:`/plugins/substitute`: Fix rST formatting for example cases so that each diff --git a/setup.cfg b/setup.cfg index e3472b04c..e999b55d3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,3 +49,8 @@ disallow_untyped_decorators = true disallow_any_generics = true check_untyped_defs = true allow_redefinition = true + +[[mypy-beets.metadata_plugins]] +disallow_untyped_decorators = true +disallow_any_generics = true +check_untyped_defs = true diff --git a/test/plugins/test_mbsync.py b/test/plugins/test_mbsync.py index 088165ef5..bb88e5e63 100644 --- a/test/plugins/test_mbsync.py +++ b/test/plugins/test_mbsync.py @@ -23,7 +23,7 @@ class MbsyncCliTest(PluginTestCase): plugin = "mbsync" @patch( - "beets.plugins.album_for_id", + "beets.metadata_plugins.album_for_id", Mock( side_effect=lambda *_: AlbumInfo( album_id="album id", @@ -33,7 +33,7 @@ class MbsyncCliTest(PluginTestCase): ), ) @patch( - "beets.plugins.track_for_id", + "beets.metadata_plugins.track_for_id", Mock( side_effect=lambda *_: TrackInfo( track_id="singleton id", title="new title" diff --git a/test/test_importer.py b/test/test_importer.py index 9ec160568..14b163f73 100644 --- a/test/test_importer.py +++ b/test/test_importer.py @@ -913,7 +913,9 @@ def album_candidates_mock(*args, **kwargs): ) -@patch("beets.plugins.candidates", Mock(side_effect=album_candidates_mock)) +@patch( + "beets.metadata_plugins.candidates", Mock(side_effect=album_candidates_mock) +) class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase): plugin = "musicbrainz" @@ -1031,7 +1033,10 @@ def item_candidates_mock(*args, **kwargs): ) -@patch("beets.plugins.item_candidates", Mock(side_effect=item_candidates_mock)) +@patch( + "beets.metadata_plugins.item_candidates", + Mock(side_effect=item_candidates_mock), +) class ImportDuplicateSingletonTest(ImportTestCase): def setUp(self): super().setUp() @@ -1567,8 +1572,14 @@ def mocked_get_track_by_id(id_): ) -@patch("beets.plugins.track_for_id", Mock(side_effect=mocked_get_track_by_id)) -@patch("beets.plugins.album_for_id", Mock(side_effect=mocked_get_album_by_id)) +@patch( + "beets.metadata_plugins.track_for_id", + Mock(side_effect=mocked_get_track_by_id), +) +@patch( + "beets.metadata_plugins.album_for_id", + Mock(side_effect=mocked_get_album_by_id), +) class ImportIdTest(ImportTestCase): ID_RELEASE_0 = "00000000-0000-0000-0000-000000000000" ID_RELEASE_1 = "11111111-1111-1111-1111-111111111111" diff --git a/test/test_ui.py b/test/test_ui.py index 713e69891..664323e2a 100644 --- a/test/test_ui.py +++ b/test/test_ui.py @@ -1024,7 +1024,9 @@ class ConfigTest(TestPluginTestCase): file.write("plugins: test") self.run_command("--config", self.cli_config_path, "plugin", lib=None) - assert plugins.find_plugins()[0].is_test_plugin + plugs = plugins.find_plugins() + assert len(plugs) == 1 + assert plugs[0].is_test_plugin self.unload_plugins() def test_beetsdir_config(self):