diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 390878372..baeb52f18 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: if: matrix.platform == 'ubuntu-latest' run: | sudo apt update - sudo apt install ffmpeg gobject-introspection libcairo2-dev libgirepository-2.0-dev pandoc imagemagick + sudo apt install --yes --no-install-recommends ffmpeg gobject-introspection gstreamer1.0-plugins-base python3-gst-1.0 libcairo2-dev libgirepository-2.0-dev pandoc imagemagick - name: Get changed lyrics files id: lyrics-update diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c9b66f402..7900d247d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -105,7 +105,6 @@ jobs: - name: Type check code uses: liskin/gh-problem-matcher-wrap@v3 - continue-on-error: true with: linters: mypy run: poe check-types --show-column-numbers --no-error-summary ${{ needs.changed-files.outputs.changed_python_files }} diff --git a/beets/autotag/distance.py b/beets/autotag/distance.py index d146c27f0..39d16858f 100644 --- a/beets/autotag/distance.py +++ b/beets/autotag/distance.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any from jellyfish import levenshtein_distance from unidecode import unidecode -from beets import config, plugins +from beets import config, metadata_plugins from beets.util import as_string, cached_classproperty, get_most_common_tags if TYPE_CHECKING: @@ -409,7 +409,7 @@ def track_distance( dist.add_expr("medium", item.disc != track_info.medium) # Plugins. - dist.update(plugins.track_distance(item, track_info)) + dist.update(metadata_plugins.track_distance(item, track_info)) return dist @@ -526,6 +526,6 @@ def distance( dist.add("unmatched_tracks", 1.0) # Plugins. - dist.update(plugins.album_distance(items, album_info, mapping)) + dist.update(metadata_plugins.album_distance(items, album_info, mapping)) return dist diff --git a/beets/autotag/match.py b/beets/autotag/match.py index 64572cf3b..e74d21755 100644 --- a/beets/autotag/match.py +++ b/beets/autotag/match.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar import lap import numpy as np -from beets import config, logging, plugins +from beets import config, logging, metadata_plugins from beets.autotag import AlbumInfo, AlbumMatch, TrackInfo, TrackMatch, hooks from beets.util import get_most_common_tags @@ -119,7 +119,7 @@ def match_by_id(items: Iterable[Item]) -> AlbumInfo | None: return None # If all album IDs are equal, look up the album. log.debug("Searching for discovered album ID: {0}", first) - return plugins.album_for_id(first) + return metadata_plugins.album_for_id(first) def _recommendation( @@ -274,7 +274,7 @@ def tag_album( if search_ids: for search_id in search_ids: log.debug("Searching for album ID: {0}", search_id) - if info := plugins.album_for_id(search_id): + if info := metadata_plugins.album_for_id(search_id): _add_candidate(items, candidates, info) # Use existing metadata or text search. @@ -311,7 +311,7 @@ def tag_album( log.debug("Album might be VA: {0}", va_likely) # Get the results from the data sources. - for matched_candidate in plugins.candidates( + for matched_candidate in metadata_plugins.candidates( items, search_artist, search_album, va_likely ): _add_candidate(items, candidates, matched_candidate) @@ -346,7 +346,7 @@ def tag_item( if trackids: for trackid in trackids: log.debug("Searching for track ID: {0}", trackid) - if info := plugins.track_for_id(trackid): + if info := metadata_plugins.track_for_id(trackid): dist = track_distance(item, info, incl_artist=True) candidates[info.track_id] = hooks.TrackMatch(dist, info) # If this is a good match, then don't keep searching. @@ -372,7 +372,7 @@ def tag_item( log.debug("Item search terms: {0} - {1}", search_artist, search_title) # Get and evaluate candidate metadata. - for track_info in plugins.item_candidates( + for track_info in metadata_plugins.item_candidates( item, search_artist, search_title ): dist = track_distance(item, track_info, incl_artist=True) diff --git a/beets/dbcore/db.py b/beets/dbcore/db.py index 16ca54995..b780c5756 100755 --- a/beets/dbcore/db.py +++ b/beets/dbcore/db.py @@ -289,19 +289,22 @@ class Model(ABC, Generic[D]): terms. """ - _types: dict[str, types.Type] = {} - """Optional Types for non-fixed (i.e., flexible and computed) fields. - """ + @cached_classproperty + def _types(cls) -> dict[str, types.Type]: + """Optional types for non-fixed (flexible and computed) fields.""" + return {} _sorts: dict[str, type[FieldSort]] = {} """Optional named sort criteria. The keys are strings and the values are subclasses of `Sort`. """ - _queries: dict[str, FieldQueryType] = {} - """Named queries that use a field-like `name:value` syntax but which - do not relate to any specific field. - """ + @cached_classproperty + def _queries(cls) -> dict[str, FieldQueryType]: + """Named queries that use a field-like `name:value` syntax but which + do not relate to any specific field. + """ + return {} _always_dirty = False """By default, fields only become "dirty" when their value actually diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index ae8e0ddf6..49d7f6428 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -28,6 +28,7 @@ from re import Pattern from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union from beets import util +from beets.util.units import raw_seconds_short if TYPE_CHECKING: from beets.dbcore.db import AnyModel, Model @@ -892,7 +893,7 @@ class DurationQuery(NumericQuery): if not s: return None try: - return util.raw_seconds_short(s) + return raw_seconds_short(s) except ValueError: try: return float(s) diff --git a/beets/dbcore/types.py b/beets/dbcore/types.py index 30cabf42f..1b8434a0b 100644 --- a/beets/dbcore/types.py +++ b/beets/dbcore/types.py @@ -292,7 +292,7 @@ class DelimitedString(BaseString[list[str], list[str]]): containing delimiter-separated values. """ - model_type = list + model_type = list[str] def __init__(self, delimiter: str): self.delimiter = delimiter diff --git a/beets/importer/stages.py b/beets/importer/stages.py index 5b3540db4..24ff815f3 100644 --- a/beets/importer/stages.py +++ b/beets/importer/stages.py @@ -70,6 +70,7 @@ def query_tasks(session: ImportSession): Instead of finding files from the filesystem, a query is used to match items from the library. """ + task: ImportTask if session.config["singletons"]: # Search for items. for item in session.lib.items(session.query): @@ -143,9 +144,7 @@ def lookup_candidates(session: ImportSession, task: ImportTask): # Restrict the initial lookup to IDs specified by the user via the -m # option. Currently all the IDs are passed onto the tasks directly. - task.search_ids = session.config["search_ids"].as_str_seq() - - task.lookup_candidates() + task.lookup_candidates(session.config["search_ids"].as_str_seq()) @pipeline.stage diff --git a/beets/importer/tasks.py b/beets/importer/tasks.py index 441224b6b..abe2ca8a9 100644 --- a/beets/importer/tasks.py +++ b/beets/importer/tasks.py @@ -22,7 +22,7 @@ import time from collections import defaultdict from enum import Enum from tempfile import mkdtemp -from typing import TYPE_CHECKING, Callable, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence import mediafile @@ -32,6 +32,8 @@ from beets.dbcore.query import PathQuery from .state import ImportState if TYPE_CHECKING: + from beets.autotag.match import Recommendation + from .session import ImportSession # Global logger. @@ -159,6 +161,7 @@ class ImportTask(BaseImportTask): cur_album: str | None = None cur_artist: str | None = None candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] + rec: Recommendation | None = None def __init__( self, @@ -167,11 +170,9 @@ class ImportTask(BaseImportTask): items: Iterable[library.Item] | None, ): super().__init__(toppath, paths, items) - self.rec = None self.should_remove_duplicates = False self.should_merge_duplicates = False self.is_album = True - self.search_ids = [] # user-supplied candidate IDs. def set_choice( self, choice: Action | autotag.AlbumMatch | autotag.TrackMatch @@ -356,20 +357,17 @@ class ImportTask(BaseImportTask): tasks = [t for inner in tasks for t in inner] return tasks - def lookup_candidates(self): - """Retrieve and store candidates for this album. User-specified - candidate IDs are stored in self.search_ids: if present, the - initial lookup is restricted to only those IDs. - """ - artist, album, prop = autotag.tag_album( - self.items, search_ids=self.search_ids - ) - self.cur_artist = artist - self.cur_album = album - self.candidates = prop.candidates - self.rec = prop.recommendation + def lookup_candidates(self, search_ids: list[str]) -> None: + """Retrieve and store candidates for this album. - def find_duplicates(self, lib: library.Library): + If User-specified ``search_ids`` list is not empty, the lookup is + restricted to only those IDs. + """ + self.cur_artist, self.cur_album, (self.candidates, self.rec) = ( + autotag.tag_album(self.items, search_ids=search_ids) + ) + + def find_duplicates(self, lib: library.Library) -> list[library.Album]: """Return a list of albums from `lib` with the same artist and album name as the task. """ @@ -695,12 +693,12 @@ class SingletonImportTask(ImportTask): for item in self.imported_items(): plugins.send("item_imported", lib=lib, item=item) - def lookup_candidates(self): - prop = autotag.tag_item(self.item, search_ids=self.search_ids) - self.candidates = prop.candidates - self.rec = prop.recommendation + def lookup_candidates(self, search_ids: list[str]) -> None: + self.candidates, self.rec = autotag.tag_item( + self.item, search_ids=search_ids + ) - def find_duplicates(self, lib): + def find_duplicates(self, lib: library.Library) -> list[library.Item]: # type: ignore[override] # Need splitting Singleton and Album tasks into separate classes """Return a list of items from `lib` that have the same artist and title as the task. """ @@ -802,6 +800,11 @@ class SentinelImportTask(ImportTask): pass +ArchiveHandler = tuple[ + Callable[[util.StrPath], bool], Callable[[util.StrPath], Any] +] + + class ArchiveImportTask(SentinelImportTask): """An import task that represents the processing of an archive. @@ -827,13 +830,13 @@ class ArchiveImportTask(SentinelImportTask): if not os.path.isfile(path): return False - for path_test, _ in cls.handlers(): + for path_test, _ in cls.handlers: if path_test(os.fsdecode(path)): return True return False - @classmethod - def handlers(cls): + @util.cached_classproperty + def handlers(cls) -> list[ArchiveHandler]: """Returns a list of archive handlers. Each handler is a `(path_test, ArchiveClass)` tuple. `path_test` @@ -841,28 +844,27 @@ class ArchiveImportTask(SentinelImportTask): handled by `ArchiveClass`. `ArchiveClass` is a class that implements the same interface as `tarfile.TarFile`. """ - if not hasattr(cls, "_handlers"): - cls._handlers: list[tuple[Callable, ...]] = [] - from zipfile import ZipFile, is_zipfile + _handlers: list[ArchiveHandler] = [] + from zipfile import ZipFile, is_zipfile - cls._handlers.append((is_zipfile, ZipFile)) - import tarfile + _handlers.append((is_zipfile, ZipFile)) + import tarfile - cls._handlers.append((tarfile.is_tarfile, tarfile.open)) - try: - from rarfile import RarFile, is_rarfile - except ImportError: - pass - else: - cls._handlers.append((is_rarfile, RarFile)) - try: - from py7zr import SevenZipFile, is_7zfile - except ImportError: - pass - else: - cls._handlers.append((is_7zfile, SevenZipFile)) + _handlers.append((tarfile.is_tarfile, tarfile.open)) + try: + from rarfile import RarFile, is_rarfile + except ImportError: + pass + else: + _handlers.append((is_rarfile, RarFile)) + try: + from py7zr import SevenZipFile, is_7zfile + except ImportError: + pass + else: + _handlers.append((is_7zfile, SevenZipFile)) - return cls._handlers + return _handlers def cleanup(self, copy=False, delete=False, move=False): """Removes the temporary directory the archive was extracted to.""" @@ -879,7 +881,7 @@ class ArchiveImportTask(SentinelImportTask): """ assert self.toppath is not None, "toppath must be set" - for path_test, handler_class in self.handlers(): + for path_test, handler_class in self.handlers: if path_test(os.fsdecode(self.toppath)): break else: @@ -925,7 +927,7 @@ class ImportTaskFactory: self.imported = 0 # "Real" tasks created. self.is_archive = ArchiveImportTask.is_archive(util.syspath(toppath)) - def tasks(self): + def tasks(self) -> Iterable[ImportTask]: """Yield all import tasks for music found in the user-specified path `self.toppath`. Any necessary sentinel tasks are also produced. @@ -1114,7 +1116,10 @@ def albums_in_dir(path: util.PathBytes): a list of Items that is probably an album. Specifically, any folder containing any media files is an album. """ - collapse_pat = collapse_paths = collapse_items = None + collapse_paths: list[util.PathBytes] = [] + collapse_items: list[util.PathBytes] = [] + collapse_pat = None + ignore: list[str] = config["ignore"].as_str_seq() ignore_hidden: bool = config["ignore_hidden"].get(bool) @@ -1139,7 +1144,7 @@ def albums_in_dir(path: util.PathBytes): # proceed to process the current one. if collapse_items: yield collapse_paths, collapse_items - collapse_pat = collapse_paths = collapse_items = None + collapse_pat, collapse_paths, collapse_items = None, [], [] # Check whether this directory looks like the *first* directory # in a multi-disc sequence. There are two indicators: the file diff --git a/beets/library/models.py b/beets/library/models.py index 68c80b934..7501513a1 100644 --- a/beets/library/models.py +++ b/beets/library/models.py @@ -41,6 +41,18 @@ class LibModel(dbcore.Model["Library"]): _format_config_key: str path: bytes + @cached_classproperty + def _types(cls) -> dict[str, types.Type]: + """Return the types of the fields in this model.""" + return { + **plugins.types(cls), # type: ignore[arg-type] + "data_source": types.STRING, + } + + @cached_classproperty + def _queries(cls) -> dict[str, FieldQueryType]: + return plugins.named_queries(cls) # type: ignore[arg-type] + @cached_classproperty def writable_media_fields(cls) -> set[str]: return set(MediaFile.fields()) & cls._fields.keys() @@ -265,10 +277,9 @@ class Album(LibModel): _search_fields = ("album", "albumartist", "genre") - _types = { - "path": types.PathType(), - "data_source": types.STRING, - } + @cached_classproperty + def _types(cls) -> dict[str, types.Type]: + return {**super()._types, "path": types.PathType()} _sorts = { "albumartist": dbcore.query.SmartArtistSort, @@ -715,10 +726,6 @@ class Item(LibModel): "genre", ) - _types = { - "data_source": types.STRING, - } - # Set of item fields that are backed by `MediaFile` fields. # Any kind of field (fixed, flexible, and computed) may be a media # field. Only these fields are read from disk in `read` and written in @@ -737,7 +744,9 @@ class Item(LibModel): _sorts = {"artist": dbcore.query.SmartArtistSort} - _queries = {"singleton": dbcore.query.SingletonQuery} + @cached_classproperty + def _queries(cls) -> dict[str, FieldQueryType]: + return {**super()._queries, "singleton": dbcore.query.SingletonQuery} _format_config_key = "format_item" diff --git a/beets/metadata_plugins.py b/beets/metadata_plugins.py new file mode 100644 index 000000000..5b11dc4ec --- /dev/null +++ b/beets/metadata_plugins.py @@ -0,0 +1,397 @@ +"""Metadata source plugin interface. + +This allows beets to lookup metadata from various sources. We define +a common interface for all metadata sources which need to be +implemented as plugins. +""" + +from __future__ import annotations + +import abc +import inspect +import re +import sys +import warnings +from typing import TYPE_CHECKING, Generic, Literal, Sequence, TypedDict, TypeVar + +from beets.util import cached_classproperty +from beets.util.id_extractors import extract_release_id + +from .plugins import BeetsPlugin, find_plugins, notify_info_yielded, send + +if sys.version_info >= (3, 11): + from typing import NotRequired +else: + from typing_extensions import NotRequired + +if TYPE_CHECKING: + from collections.abc import Iterable + + from confuse import ConfigView + + from .autotag import Distance + from .autotag.hooks import AlbumInfo, Item, TrackInfo + + +def find_metadata_source_plugins() -> list[MetadataSourcePlugin]: + """Returns a list of MetadataSourcePlugin subclass instances + + Resolved from all currently loaded beets plugins. + """ + + all_plugins = find_plugins() + metadata_plugins: list[MetadataSourcePlugin | BeetsPlugin] = [] + for plugin in all_plugins: + if isinstance(plugin, MetadataSourcePlugin): + metadata_plugins.append(plugin) + elif hasattr(plugin, "data_source"): + # TODO: Remove this in the future major release, v3.0.0 + warnings.warn( + f"{plugin.__class__.__name__} is used as a legacy metadata source. " + "It should extend MetadataSourcePlugin instead of BeetsPlugin. " + "Support for this will be removed in the v3.0.0 release!", + DeprecationWarning, + stacklevel=2, + ) + metadata_plugins.append(plugin) + + # typeignore: BeetsPlugin is not a MetadataSourcePlugin (legacy support) + return metadata_plugins # type: ignore[return-value] + + +@notify_info_yielded("albuminfo_received") +def candidates(*args, **kwargs) -> Iterable[AlbumInfo]: + """Return matching album candidates from all metadata source plugins.""" + for plugin in find_metadata_source_plugins(): + yield from plugin.candidates(*args, **kwargs) + + +@notify_info_yielded("trackinfo_received") +def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]: + """Return matching track candidates fromm all metadata source plugins.""" + for plugin in find_metadata_source_plugins(): + yield from plugin.item_candidates(*args, **kwargs) + + +def album_for_id(_id: str) -> AlbumInfo | None: + """Get AlbumInfo object for the given ID string. + + A single ID can yield just a single album, so we return the first match. + """ + for plugin in find_metadata_source_plugins(): + if info := plugin.album_for_id(album_id=_id): + send("albuminfo_received", info=info) + return info + + return None + + +def track_for_id(_id: str) -> TrackInfo | None: + """Get TrackInfo object for the given ID string. + + A single ID can yield just a single track, so we return the first match. + """ + for plugin in find_metadata_source_plugins(): + if info := plugin.track_for_id(_id): + send("trackinfo_received", info=info) + return info + + return None + + +def track_distance(item: Item, info: TrackInfo) -> Distance: + """Returns the track distance for an item and trackinfo. + + Returns a Distance object is populated by all metadata source plugins + that implement the :py:meth:`MetadataSourcePlugin.track_distance` method. + """ + from beets.autotag.distance import Distance + + dist = Distance() + for plugin in find_metadata_source_plugins(): + dist.update(plugin.track_distance(item, info)) + return dist + + +def album_distance( + items: Sequence[Item], + album_info: AlbumInfo, + mapping: dict[Item, TrackInfo], +) -> Distance: + """Returns the album distance calculated by plugins.""" + from beets.autotag.distance import Distance + + dist = Distance() + for plugin in find_metadata_source_plugins(): + dist.update(plugin.album_distance(items, album_info, mapping)) + return dist + + +def _get_distance( + config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo +) -> Distance: + """Returns the ``data_source`` weight and the maximum source weight + for albums or individual tracks. + """ + from beets.autotag.distance import Distance + + dist = Distance() + if info.data_source == data_source: + dist.add("source", config["source_weight"].as_number()) + return dist + + +class MetadataSourcePlugin(BeetsPlugin, metaclass=abc.ABCMeta): + """A plugin that provides metadata from a specific source. + + This base class implements a contract for plugins that provide metadata + from a specific source. The plugin must implement the methods to search for albums + and tracks, and to retrieve album and track information by ID. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.config.add({"source_weight": 0.5}) + + @abc.abstractmethod + def album_for_id(self, album_id: str) -> AlbumInfo | None: + """Return :py:class:`AlbumInfo` object or None if no matching release was + found.""" + raise NotImplementedError + + @abc.abstractmethod + def track_for_id(self, track_id: str) -> TrackInfo | None: + """Return a :py:class:`TrackInfo` object or None if no matching release was + found. + """ + raise NotImplementedError + + # ---------------------------------- search ---------------------------------- # + + @abc.abstractmethod + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterable[AlbumInfo]: + """Return :py:class:`AlbumInfo` candidates that match the given album. + + Used in the autotag functionality to search for albums. + + :param items: List of items in the album + :param artist: Album artist + :param album: Album name + :param va_likely: Whether the album is likely to be by various artists + """ + raise NotImplementedError + + @abc.abstractmethod + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: + """Return :py:class:`TrackInfo` candidates that match the given track. + + Used in the autotag functionality to search for tracks. + + :param item: Track item + :param artist: Track artist + :param title: Track title + """ + raise NotImplementedError + + def albums_for_ids(self, ids: Sequence[str]) -> Iterable[AlbumInfo | None]: + """Batch lookup of album metadata for a list of album IDs. + + Given a list of album identifiers, yields corresponding AlbumInfo objects. + Missing albums result in None values in the output iterator. + Plugins may implement this for optimized batched lookups instead of + single calls to album_for_id. + """ + + return (self.album_for_id(id) for id in ids) + + def tracks_for_ids(self, ids: Sequence[str]) -> Iterable[TrackInfo | None]: + """Batch lookup of track metadata for a list of track IDs. + + Given a list of track identifiers, yields corresponding TrackInfo objects. + Missing tracks result in None values in the output iterator. + Plugins may implement this for optimized batched lookups instead of + single calls to track_for_id. + """ + + return (self.track_for_id(id) for id in ids) + + def album_distance( + self, + items: Sequence[Item], + album_info: AlbumInfo, + mapping: dict[Item, TrackInfo], + ) -> Distance: + """Calculate the distance for an album based on its items and album info.""" + return _get_distance( + data_source=self.data_source, info=album_info, config=self.config + ) + + def track_distance( + self, + item: Item, + info: TrackInfo, + ) -> Distance: + """Calculate the distance for a track based on its item and track info.""" + return _get_distance( + data_source=self.data_source, info=info, config=self.config + ) + + @cached_classproperty + def data_source(cls) -> str: + """The data source name for this plugin. + + This is inferred from the plugin name. + """ + return cls.__name__.replace("Plugin", "") # type: ignore[attr-defined] + + def _extract_id(self, url: str) -> str | None: + """Extract an ID from a URL for this metadata source plugin. + + Uses the plugin's data source name to determine the ID format and + extracts the ID from a given URL. + """ + return extract_release_id(self.data_source, url) + + @staticmethod + def get_artist( + artists: Iterable[dict[str | int, str]], + id_key: str | int = "id", + name_key: str | int = "name", + join_key: str | int | None = None, + ) -> tuple[str, str | None]: + """Returns an artist string (all artists) and an artist_id (the main + artist) for a list of artist object dicts. + + For each artist, this function moves articles (such as 'a', 'an', + and 'the') to the front and strips trailing disambiguation numbers. It + returns a tuple containing the comma-separated string of all + normalized artists and the ``id`` of the main/first artist. + Alternatively a keyword can be used to combine artists together into a + single string by passing the join_key argument. + + :param artists: Iterable of artist dicts or lists returned by API. + :param id_key: Key or index corresponding to the value of ``id`` for + the main/first artist. Defaults to 'id'. + :param name_key: Key or index corresponding to values of names + to concatenate for the artist string (containing all artists). + Defaults to 'name'. + :param join_key: Key or index corresponding to a field containing a + keyword to use for combining artists into a single string, for + example "Feat.", "Vs.", "And" or similar. The default is None + which keeps the default behaviour (comma-separated). + :return: Normalized artist string. + """ + artist_id = None + artist_string = "" + artists = list(artists) # In case a generator was passed. + total = len(artists) + for idx, artist in enumerate(artists): + if not artist_id: + artist_id = artist[id_key] + name = artist[name_key] + # Strip disambiguation number. + name = re.sub(r" \(\d+\)$", "", name) + # Move articles to the front. + name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I) + # Use a join keyword if requested and available. + if idx < (total - 1): # Skip joining on last. + if join_key and artist.get(join_key, None): + name += f" {artist[join_key]} " + else: + name += ", " + artist_string += name + + return artist_string, artist_id + + +class IDResponse(TypedDict): + """Response from the API containing an ID.""" + + id: str + + +class SearchFilter(TypedDict): + artist: NotRequired[str] + album: NotRequired[str] + + +R = TypeVar("R", bound=IDResponse) + + +class SearchApiMetadataSourcePlugin( + Generic[R], MetadataSourcePlugin, metaclass=abc.ABCMeta +): + """Helper class to implement a metadata source plugin with an API. + + Plugins using this ABC must implement an API search method to + retrieve album and track information by ID, + i.e. `album_for_id` and `track_for_id`, and a search method to + perform a search on the API. The search method should return a list + of identifiers for the requested type (album or track). + """ + + @abc.abstractmethod + def _search_api( + self, + query_type: Literal["album", "track"], + filters: SearchFilter, + keywords: str = "", + ) -> Sequence[R]: + """Perform a search on the API. + + :param query_type: The type of query to perform. + :param filters: A dictionary of filters to apply to the search. + :param keywords: Additional keywords to include in the search. + + Should return a list of identifiers for the requested type (album or track). + """ + raise NotImplementedError + + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterable[AlbumInfo]: + query_filters: SearchFilter = {"album": album} + if not va_likely: + query_filters["artist"] = artist + + results = self._search_api("album", query_filters) + if not results: + return [] + + return filter( + None, self.albums_for_ids([result["id"] for result in results]) + ) + + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: + results = self._search_api("track", {"artist": artist}, keywords=title) + if not results: + return [] + + return filter( + None, + self.tracks_for_ids([result["id"] for result in results if result]), + ) + + +# Dynamically copy methods to BeetsPlugin for legacy support +# TODO: Remove this in the future major release, v3.0.0 + +for name, method in inspect.getmembers( + MetadataSourcePlugin, predicate=inspect.isfunction +): + if not hasattr(BeetsPlugin, name): + setattr(BeetsPlugin, name, method) diff --git a/beets/plugins.py b/beets/plugins.py index 983d15402..81f423431 100644 --- a/beets/plugins.py +++ b/beets/plugins.py @@ -23,22 +23,13 @@ import sys import traceback from collections import defaultdict from functools import wraps -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Generic, - Literal, - Sequence, - TypedDict, - TypeVar, -) +from types import GenericAlias +from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar import mediafile import beets from beets import logging -from beets.util.id_extractors import extract_release_id if TYPE_CHECKING: from beets.event_types import EventType @@ -54,8 +45,6 @@ if TYPE_CHECKING: from confuse import ConfigView - from beets.autotag import AlbumInfo, TrackInfo - from beets.autotag.distance import Distance from beets.dbcore import Query from beets.dbcore.db import FieldQueryType from beets.dbcore.types import Type @@ -115,7 +104,7 @@ class PluginLogFilter(logging.Filter): # Managing the plugins themselves. -class BeetsPlugin: +class BeetsPlugin(metaclass=abc.ABCMeta): """The base class for all beets plugins. Plugins provide functionality by defining a subclass of BeetsPlugin and overriding the abstract methods defined here. @@ -218,66 +207,6 @@ class BeetsPlugin: """Return a dict mapping prefixes to Query subclasses.""" return {} - def track_distance( - self, - item: Item, - info: TrackInfo, - ) -> Distance: - """Should return a Distance object to be added to the - distance for every track comparison. - """ - from beets.autotag.distance import Distance - - return Distance() - - def album_distance( - self, - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], - ) -> Distance: - """Should return a Distance object to be added to the - distance for every album-level comparison. - """ - from beets.autotag.distance import Distance - - return Distance() - - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool - ) -> Iterable[AlbumInfo]: - """Return :py:class:`AlbumInfo` candidates that match the given album. - - :param items: List of items in the album - :param artist: Album artist - :param album: Album name - :param va_likely: Whether the album is likely to be by various artists - """ - yield from () - - def item_candidates( - self, item: Item, artist: str, title: str - ) -> Iterable[TrackInfo]: - """Return :py:class:`TrackInfo` candidates that match the given track. - - :param item: Track item - :param artist: Track artist - :param title: Track title - """ - yield from () - - def album_for_id(self, album_id: str) -> AlbumInfo | None: - """Return an AlbumInfo object or None if no matching release was - found. - """ - return None - - def track_for_id(self, track_id: str) -> TrackInfo | None: - """Return a TrackInfo object or None if no matching release was - found. - """ - return None - def add_media_field( self, name: str, descriptor: mediafile.MediaField ) -> None: @@ -369,10 +298,13 @@ def load_plugins(names: Sequence[str] = ()) -> None: else: for obj in getattr(namespace, name).__dict__.values(): if ( - isinstance(obj, type) + inspect.isclass(obj) + and not isinstance( + obj, GenericAlias + ) # seems to be needed for python <= 3.9 only and issubclass(obj, BeetsPlugin) and obj != BeetsPlugin - and obj != MetadataSourcePlugin + and not inspect.isabstract(obj) and obj not in _classes ): _classes.add(obj) @@ -430,7 +362,7 @@ def queries() -> dict[str, type[Query]]: def types(model_cls: type[AnyModel]) -> dict[str, Type]: - # Gives us `item_types` and `album_types` + """Return mapping between flex field names and types for the given model.""" attr_name = f"{model_cls.__name__.lower()}_types" types: dict[str, Type] = {} for plugin in find_plugins(): @@ -447,39 +379,13 @@ def types(model_cls: type[AnyModel]) -> dict[str, Type]: def named_queries(model_cls: type[AnyModel]) -> dict[str, FieldQueryType]: - # Gather `item_queries` and `album_queries` from the plugins. + """Return mapping between field names and queries for the given model.""" attr_name = f"{model_cls.__name__.lower()}_queries" - queries: dict[str, FieldQueryType] = {} - for plugin in find_plugins(): - plugin_queries = getattr(plugin, attr_name, {}) - queries.update(plugin_queries) - return queries - - -def track_distance(item: Item, info: TrackInfo) -> Distance: - """Gets the track distance calculated by all loaded plugins. - Returns a Distance object. - """ - from beets.autotag.distance import Distance - - dist = Distance() - for plugin in find_plugins(): - dist.update(plugin.track_distance(item, info)) - return dist - - -def album_distance( - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], -) -> Distance: - """Returns the album distance calculated by plugins.""" - from beets.autotag.distance import Distance - - dist = Distance() - for plugin in find_plugins(): - dist.update(plugin.album_distance(items, album_info, mapping)) - return dist + return { + field: query + for plugin in find_plugins() + for field, query in getattr(plugin, attr_name, {}).items() + } def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]: @@ -502,46 +408,6 @@ def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]: return decorator -@notify_info_yielded("albuminfo_received") -def candidates(*args, **kwargs) -> Iterable[AlbumInfo]: - """Return matching album candidates from all plugins.""" - for plugin in find_plugins(): - yield from plugin.candidates(*args, **kwargs) - - -@notify_info_yielded("trackinfo_received") -def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]: - """Return matching track candidates from all plugins.""" - for plugin in find_plugins(): - yield from plugin.item_candidates(*args, **kwargs) - - -def album_for_id(_id: str) -> AlbumInfo | None: - """Get AlbumInfo object for the given ID string. - - A single ID can yield just a single album, so we return the first match. - """ - for plugin in find_plugins(): - if info := plugin.album_for_id(_id): - send("albuminfo_received", info=info) - return info - - return None - - -def track_for_id(_id: str) -> TrackInfo | None: - """Get TrackInfo object for the given ID string. - - A single ID can yield just a single track, so we return the first match. - """ - for plugin in find_plugins(): - if info := plugin.track_for_id(_id): - send("trackinfo_received", info=info) - return info - - return None - - def template_funcs() -> TFuncMap[str]: """Get all the template functions declared by plugins as a dictionary. @@ -656,20 +522,6 @@ def feat_tokens(for_artist: bool = True) -> str: ) -def get_distance( - config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo -) -> Distance: - """Returns the ``data_source`` weight and the maximum source weight - for albums or individual tracks. - """ - from beets.autotag.distance import Distance - - dist = Distance() - if info.data_source == data_source: - dist.add("source", config["source_weight"].as_number()) - return dist - - def apply_item_changes( lib: Library, item: Item, move: bool, pretend: bool, write: bool ) -> None: @@ -695,149 +547,3 @@ def apply_item_changes( item.try_write() item.store() - - -class Response(TypedDict): - """A dictionary with the response of a plugin API call. - - May be extended by plugins to include additional information, but `id` - is required. - """ - - id: str - - -R = TypeVar("R", bound=Response) - - -class MetadataSourcePlugin(Generic[R], BeetsPlugin, metaclass=abc.ABCMeta): - def __init__(self): - super().__init__() - self.config.add({"source_weight": 0.5}) - - @property - @abc.abstractmethod - def data_source(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def search_url(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def album_url(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def track_url(self) -> str: - raise NotImplementedError - - @abc.abstractmethod - def _search_api( - self, - query_type: Literal["album", "track"], - filters: dict[str, str], - keywords: str = "", - ) -> Sequence[R]: - raise NotImplementedError - - @abc.abstractmethod - def album_for_id(self, album_id: str) -> AlbumInfo | None: - raise NotImplementedError - - @abc.abstractmethod - def track_for_id(self, track_id: str) -> TrackInfo | None: - raise NotImplementedError - - @staticmethod - def get_artist( - artists, - id_key: str | int = "id", - name_key: str | int = "name", - join_key: str | int | None = None, - ) -> tuple[str, str | None]: - """Returns an artist string (all artists) and an artist_id (the main - artist) for a list of artist object dicts. - - For each artist, this function moves articles (such as 'a', 'an', - and 'the') to the front and strips trailing disambiguation numbers. It - returns a tuple containing the comma-separated string of all - normalized artists and the ``id`` of the main/first artist. - Alternatively a keyword can be used to combine artists together into a - single string by passing the join_key argument. - - :param artists: Iterable of artist dicts or lists returned by API. - :type artists: list[dict] or list[list] - :param id_key: Key or index corresponding to the value of ``id`` for - the main/first artist. Defaults to 'id'. - :param name_key: Key or index corresponding to values of names - to concatenate for the artist string (containing all artists). - Defaults to 'name'. - :param join_key: Key or index corresponding to a field containing a - keyword to use for combining artists into a single string, for - example "Feat.", "Vs.", "And" or similar. The default is None - which keeps the default behaviour (comma-separated). - :return: Normalized artist string. - """ - artist_id = None - artist_string = "" - artists = list(artists) # In case a generator was passed. - total = len(artists) - for idx, artist in enumerate(artists): - if not artist_id: - artist_id = artist[id_key] - name = artist[name_key] - # Strip disambiguation number. - name = re.sub(r" \(\d+\)$", "", name) - # Move articles to the front. - name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I) - # Use a join keyword if requested and available. - if idx < (total - 1): # Skip joining on last. - if join_key and artist.get(join_key, None): - name += f" {artist[join_key]} " - else: - name += ", " - artist_string += name - - return artist_string, artist_id - - def _get_id(self, id_string: str) -> str | None: - """Parse release ID from the given ID string.""" - return extract_release_id(self.data_source.lower(), id_string) - - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool - ) -> Iterable[AlbumInfo]: - query_filters = {"album": album} - if not va_likely: - query_filters["artist"] = artist - for result in self._search_api("album", query_filters): - if info := self.album_for_id(result["id"]): - yield info - - def item_candidates( - self, item: Item, artist: str, title: str - ) -> Iterable[TrackInfo]: - for result in self._search_api( - "track", {"artist": artist}, keywords=title - ): - if info := self.track_for_id(result["id"]): - yield info - - def album_distance( - self, - items: Sequence[Item], - album_info: AlbumInfo, - mapping: dict[Item, TrackInfo], - ) -> Distance: - return get_distance( - data_source=self.data_source, info=album_info, config=self.config - ) - - def track_distance(self, item: Item, info: TrackInfo) -> Distance: - return get_distance( - data_source=self.data_source, info=info, config=self.config - ) diff --git a/beets/test/helper.py b/beets/test/helper.py index db753a760..eb024a7aa 100644 --- a/beets/test/helper.py +++ b/beets/test/helper.py @@ -52,12 +52,13 @@ import beets.plugins from beets import importer, logging, util from beets.autotag.hooks import AlbumInfo, TrackInfo from beets.importer import ImportSession -from beets.library import Album, Item, Library +from beets.library import Item, Library from beets.test import _common from beets.ui.commands import TerminalImportSession from beets.util import ( MoveOperation, bytestring_path, + cached_classproperty, clean_module_tempdir, syspath, ) @@ -471,11 +472,6 @@ class PluginMixin(ConfigMixin): plugin: ClassVar[str] preload_plugin: ClassVar[bool] = True - original_item_types = dict(Item._types) - original_album_types = dict(Album._types) - original_item_queries = dict(Item._queries) - original_album_queries = dict(Album._queries) - def setup_beets(self): super().setup_beets() if self.preload_plugin: @@ -494,16 +490,11 @@ class PluginMixin(ConfigMixin): # FIXME this should eventually be handled by a plugin manager plugins = (self.plugin,) if hasattr(self, "plugin") else plugins self.config["plugins"] = plugins + cached_classproperty.cache.clear() beets.plugins.load_plugins(plugins) + beets.plugins.send("pluginload") beets.plugins.find_plugins() - # Take a backup of the original _types and _queries to restore - # when unloading. - Item._types.update(beets.plugins.types(Item)) - Album._types.update(beets.plugins.types(Album)) - Item._queries.update(beets.plugins.named_queries(Item)) - Album._queries.update(beets.plugins.named_queries(Album)) - def unload_plugins(self) -> None: """Unload all plugins and remove them from the configuration.""" # FIXME this should eventually be handled by a plugin manager @@ -512,10 +503,6 @@ class PluginMixin(ConfigMixin): self.config["plugins"] = [] beets.plugins._classes = set() beets.plugins._instances = {} - Item._types = self.original_item_types - Album._types = self.original_album_types - Item._queries = self.original_item_queries - Album._queries = self.original_album_queries @contextmanager def configure_plugin(self, config: Any): @@ -799,10 +786,12 @@ class AutotagStub: def install(self): self.patchers = [ - patch("beets.plugins.album_for_id", lambda *_: None), - patch("beets.plugins.track_for_id", lambda *_: None), - patch("beets.plugins.candidates", self.candidates), - patch("beets.plugins.item_candidates", self.item_candidates), + patch("beets.metadata_plugins.album_for_id", lambda *_: None), + patch("beets.metadata_plugins.track_for_id", lambda *_: None), + patch("beets.metadata_plugins.candidates", self.candidates), + patch( + "beets.metadata_plugins.item_candidates", self.item_candidates + ), ] for p in self.patchers: p.start() diff --git a/beets/ui/__init__.py b/beets/ui/__init__.py index 74dee550c..8b2419a07 100644 --- a/beets/ui/__init__.py +++ b/beets/ui/__init__.py @@ -1609,17 +1609,6 @@ def _setup(options, lib=None): plugins = _load_plugins(options, config) - # Add types and queries defined by plugins. - plugin_types_album = plugins.types(library.Album) - library.Album._types.update(plugin_types_album) - item_types = plugin_types_album.copy() - item_types.update(library.Item._types) - item_types.update(plugins.types(library.Item)) - library.Item._types = item_types - - library.Item._queries.update(plugins.named_queries(library.Item)) - library.Album._queries.update(plugins.named_queries(library.Album)) - plugins.send("pluginload") # Get the default subcommands. diff --git a/beets/ui/commands.py b/beets/ui/commands.py index 7b22c2462..12a8d6875 100755 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -1343,7 +1343,7 @@ def import_func(lib, opts, args: list[str]): if opts.library: query = args - paths = [] + byte_paths = [] else: query = None paths = args diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 00c9ce05d..58b08c844 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -41,6 +41,7 @@ from typing import ( Any, AnyStr, Callable, + ClassVar, Generic, NamedTuple, TypeVar, @@ -63,6 +64,7 @@ MAX_FILENAME_LENGTH = 200 WINDOWS_MAGIC_PREFIX = "\\\\?\\" T = TypeVar("T") PathLike = Union[str, bytes, Path] +StrPath = Union[str, Path] Replacements = Sequence[tuple[Pattern[str], str]] # Here for now to allow for a easy replace later on @@ -1051,20 +1053,46 @@ def par_map(transform: Callable[[T], Any], items: Sequence[T]) -> None: class cached_classproperty: - """A decorator implementing a read-only property that is *lazy* in - the sense that the getter is only invoked once. Subsequent accesses - through *any* instance use the cached result. + """Descriptor implementing cached class properties. + + Provides class-level dynamic property behavior where the getter function is + called once per class and the result is cached for subsequent access. Unlike + instance properties, this operates on the class rather than instances. """ - def __init__(self, getter): + cache: ClassVar[dict[tuple[Any, str], Any]] = {} + + name: str + + # Ideally, we would like to use `Callable[[type[T]], Any]` here, + # however, `mypy` is unable to see this as a **class** property, and thinks + # that this callable receives an **instance** of the object, failing the + # type check, for example: + # >>> class Album: + # >>> @cached_classproperty + # >>> def foo(cls): + # >>> reveal_type(cls) # mypy: revealed type is "Album" + # >>> return cls.bar + # + # Argument 1 to "cached_classproperty" has incompatible type + # "Callable[[Album], ...]"; expected "Callable[[type[Album]], ...]" + # + # Therefore, we just use `Any` here, which is not ideal, but works. + def __init__(self, getter: Callable[[Any], Any]) -> None: + """Initialize the descriptor with the property getter function.""" self.getter = getter - self.cache = {} - def __get__(self, instance, owner): - if owner not in self.cache: - self.cache[owner] = self.getter(owner) + def __set_name__(self, owner: Any, name: str) -> None: + """Capture the attribute name this descriptor is assigned to.""" + self.name = name - return self.cache[owner] + def __get__(self, instance: Any, owner: type[Any]) -> Any: + """Compute and cache if needed, and return the property value.""" + key = owner, self.name + if key not in self.cache: + self.cache[key] = self.getter(owner) + + return self.cache[key] class LazySharedInstance(Generic[T]): diff --git a/beets/util/id_extractors.py b/beets/util/id_extractors.py index bbe2c32a4..6cdb787d1 100644 --- a/beets/util/id_extractors.py +++ b/beets/util/id_extractors.py @@ -18,6 +18,11 @@ from __future__ import annotations import re +from beets import logging + +log = logging.getLogger("beets") + + PATTERN_BY_SOURCE = { "spotify": re.compile(r"(?:^|open\.spotify\.com/[^/]+/)([0-9A-Za-z]{22})"), "deezer": re.compile(r"(?:^|deezer\.com/)(?:[a-z]*/)?(?:[^/]+/)?(\d+)"), @@ -43,6 +48,21 @@ PATTERN_BY_SOURCE = { def extract_release_id(source: str, id_: str) -> str | None: - if m := PATTERN_BY_SOURCE[source].search(str(id_)): + """Extract the release ID from a given source and ID. + + Normally, the `id_` is a url string which contains the ID of the + release. This function extracts the ID from the URL based on the + `source` provided. + """ + try: + source_pattern = PATTERN_BY_SOURCE[source.lower()] + except KeyError: + log.debug( + f"Unknown source '{source}' for ID extraction. Returning id/url as-is." + ) + return id_ + + if m := source_pattern.search(str(id_)): return m[1] + return None diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index cebde0f23..140407f04 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -48,6 +48,8 @@ POISON = "__PIPELINE_POISON__" DEFAULT_QUEUE_SIZE = 16 +Tq = TypeVar("Tq") + def _invalidate_queue(q, val=None, sync=True): """Breaks a Queue such that it never blocks, always has size 1, @@ -91,7 +93,7 @@ def _invalidate_queue(q, val=None, sync=True): q.mutex.release() -class CountedQueue(queue.Queue): +class CountedQueue(queue.Queue[Tq]): """A queue that keeps track of the number of threads that are still feeding into it. The queue is poisoned when all threads are finished with the queue. diff --git a/beetsplug/advancedrewrite.py b/beetsplug/advancedrewrite.py index 9a5feaaff..8bc63c0cb 100644 --- a/beetsplug/advancedrewrite.py +++ b/beetsplug/advancedrewrite.py @@ -58,7 +58,9 @@ class AdvancedRewritePlugin(BeetsPlugin): def __init__(self): """Parse configuration and register template fields for rewriting.""" super().__init__() + self.register_listener("pluginload", self.loaded) + def loaded(self): template = confuse.Sequence( confuse.OneOf( [ diff --git a/beetsplug/autobpm.py b/beetsplug/autobpm.py index 9c953f711..46d7e672a 100644 --- a/beetsplug/autobpm.py +++ b/beetsplug/autobpm.py @@ -15,10 +15,10 @@ from __future__ import annotations -from collections.abc import Iterable from typing import TYPE_CHECKING import librosa +import numpy as np from beets.plugins import BeetsPlugin from beets.ui import Subcommand, should_write @@ -76,7 +76,10 @@ class AutoBPMPlugin(BeetsPlugin): self._log.error("Failed to measure BPM for {}: {}", path, exc) continue - bpm = round(tempo[0] if isinstance(tempo, Iterable) else tempo) + bpm = round( + float(tempo[0] if isinstance(tempo, np.ndarray) else tempo) + ) + item["bpm"] = bpm self._log.info("Computed BPM for {}: {}", path, bpm) diff --git a/beetsplug/beatport.py b/beetsplug/beatport.py index 20147b5cc..16e0dc896 100644 --- a/beetsplug/beatport.py +++ b/beetsplug/beatport.py @@ -14,9 +14,19 @@ """Adds Beatport release and track search support to the autotagger""" +from __future__ import annotations + import json import re from datetime import datetime, timedelta +from typing import ( + TYPE_CHECKING, + Iterable, + Iterator, + Literal, + Sequence, + overload, +) import confuse from requests_oauthlib import OAuth1Session @@ -29,7 +39,13 @@ from requests_oauthlib.oauth1_session import ( import beets import beets.ui from beets.autotag.hooks import AlbumInfo, TrackInfo -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance +from beets.metadata_plugins import MetadataSourcePlugin + +if TYPE_CHECKING: + from beets.importer import ImportSession + from beets.library import Item + + from ._typing import JSONDict AUTH_ERRORS = (TokenRequestDenied, TokenMissing, VerifierMissing) USER_AGENT = f"beets/{beets.__version__} +https://beets.io/" @@ -39,20 +55,6 @@ class BeatportAPIError(Exception): pass -class BeatportObject: - def __init__(self, data): - self.beatport_id = data["id"] - self.name = str(data["name"]) - if "releaseDate" in data: - self.release_date = datetime.strptime( - data["releaseDate"], "%Y-%m-%d" - ) - if "artists" in data: - self.artists = [(x["id"], str(x["name"])) for x in data["artists"]] - if "genres" in data: - self.genres = [str(x["name"]) for x in data["genres"]] - - class BeatportClient: _api_base = "https://oauth-api.beatport.com" @@ -77,7 +79,7 @@ class BeatportClient: ) self.api.headers = {"User-Agent": USER_AGENT} - def get_authorize_url(self): + def get_authorize_url(self) -> str: """Generate the URL for the user to authorize the application. Retrieves a request token from the Beatport API and returns the @@ -99,15 +101,13 @@ class BeatportClient: self._make_url("/identity/1/oauth/authorize") ) - def get_access_token(self, auth_data): + def get_access_token(self, auth_data: str) -> tuple[str, str]: """Obtain the final access token and secret for the API. :param auth_data: URL-encoded authorization data as displayed at the authorization url (obtained via :py:meth:`get_authorize_url`) after signing in - :type auth_data: unicode - :returns: OAuth resource owner key and secret - :rtype: (unicode, unicode) tuple + :returns: OAuth resource owner key and secret as unicode """ self.api.parse_authorization_response( "https://beets.io/auth?" + auth_data @@ -117,20 +117,37 @@ class BeatportClient: ) return access_data["oauth_token"], access_data["oauth_token_secret"] - def search(self, query, release_type="release", details=True): + @overload + def search( + self, + query: str, + release_type: Literal["release"], + details: bool = True, + ) -> Iterator[BeatportRelease]: ... + + @overload + def search( + self, + query: str, + release_type: Literal["track"], + details: bool = True, + ) -> Iterator[BeatportTrack]: ... + + def search( + self, + query: str, + release_type: Literal["release", "track"], + details=True, + ) -> Iterator[BeatportRelease | BeatportTrack]: """Perform a search of the Beatport catalogue. :param query: Query string - :param release_type: Type of releases to search for, can be - 'release' or 'track' + :param release_type: Type of releases to search for. :param details: Retrieve additional information about the search results. Currently this will fetch the tracklist for releases and do nothing for tracks :returns: Search results - :rtype: generator that yields - py:class:`BeatportRelease` or - :py:class:`BeatportTrack` """ response = self._get( "catalog/3/search", @@ -140,20 +157,18 @@ class BeatportClient: ) for item in response: if release_type == "release": + release = BeatportRelease(item) if details: - release = self.get_release(item["id"]) - else: - release = BeatportRelease(item) + release.tracks = self.get_release_tracks(item["id"]) yield release elif release_type == "track": yield BeatportTrack(item) - def get_release(self, beatport_id): + def get_release(self, beatport_id: str) -> BeatportRelease | None: """Get information about a single release. :param beatport_id: Beatport ID of the release :returns: The matching release - :rtype: :py:class:`BeatportRelease` """ response = self._get("/catalog/3/releases", id=beatport_id) if response: @@ -162,35 +177,33 @@ class BeatportClient: return release return None - def get_release_tracks(self, beatport_id): + def get_release_tracks(self, beatport_id: str) -> list[BeatportTrack]: """Get all tracks for a given release. :param beatport_id: Beatport ID of the release :returns: Tracks in the matching release - :rtype: list of :py:class:`BeatportTrack` """ response = self._get( "/catalog/3/tracks", releaseId=beatport_id, perPage=100 ) return [BeatportTrack(t) for t in response] - def get_track(self, beatport_id): + def get_track(self, beatport_id: str) -> BeatportTrack: """Get information about a single track. :param beatport_id: Beatport ID of the track :returns: The matching track - :rtype: :py:class:`BeatportTrack` """ response = self._get("/catalog/3/tracks", id=beatport_id) return BeatportTrack(response[0]) - def _make_url(self, endpoint): + def _make_url(self, endpoint: str) -> str: """Get complete URL for a given API endpoint.""" if not endpoint.startswith("/"): endpoint = "/" + endpoint return self._api_base + endpoint - def _get(self, endpoint, **kwargs): + def _get(self, endpoint: str, **kwargs) -> list[JSONDict]: """Perform a GET request on a given API endpoint. Automatically extracts result data from the response and converts HTTP @@ -211,48 +224,81 @@ class BeatportClient: return response.json()["results"] -class BeatportRelease(BeatportObject): - def __str__(self): - if len(self.artists) < 4: - artist_str = ", ".join(x[1] for x in self.artists) +class BeatportObject: + beatport_id: str + name: str + + release_date: datetime | None = None + + artists: list[tuple[str, str]] | None = None + # tuple of artist id and artist name + + def __init__(self, data: JSONDict): + self.beatport_id = str(data["id"]) # given as int in the response + self.name = str(data["name"]) + if "releaseDate" in data: + self.release_date = datetime.strptime( + data["releaseDate"], "%Y-%m-%d" + ) + if "artists" in data: + self.artists = [(x["id"], str(x["name"])) for x in data["artists"]] + if "genres" in data: + self.genres = [str(x["name"]) for x in data["genres"]] + + def artists_str(self) -> str | None: + if self.artists is not None: + if len(self.artists) < 4: + artist_str = ", ".join(x[1] for x in self.artists) + else: + artist_str = "Various Artists" else: - artist_str = "Various Artists" - return "".format( - artist_str, - self.name, - self.catalog_number, - ) + artist_str = None - def __repr__(self): - return str(self).encode("utf-8") + return artist_str + + +class BeatportRelease(BeatportObject): + catalog_number: str | None + label_name: str | None + category: str | None + url: str | None + genre: str | None + + tracks: list[BeatportTrack] | None = None + + def __init__(self, data: JSONDict): + super().__init__(data) + + self.catalog_number = data.get("catalogNumber") + self.label_name = data.get("label", {}).get("name") + self.category = data.get("category") + self.genre = data.get("genre") - def __init__(self, data): - BeatportObject.__init__(self, data) - if "catalogNumber" in data: - self.catalog_number = data["catalogNumber"] - if "label" in data: - self.label_name = data["label"]["name"] - if "category" in data: - self.category = data["category"] if "slug" in data: self.url = "https://beatport.com/release/{}/{}".format( data["slug"], data["id"] ) - self.genre = data.get("genre") + + def __str__(self) -> str: + return "".format( + self.artists_str(), + self.name, + self.catalog_number, + ) class BeatportTrack(BeatportObject): - def __str__(self): - artist_str = ", ".join(x[1] for x in self.artists) - return "".format( - artist_str, self.name, self.mix_name - ) + title: str | None + mix_name: str | None + length: timedelta + url: str | None + track_number: int | None + bpm: str | None + initial_key: str | None + genre: str | None - def __repr__(self): - return str(self).encode("utf-8") - - def __init__(self, data): - BeatportObject.__init__(self, data) + def __init__(self, data: JSONDict): + super().__init__(data) if "title" in data: self.title = str(data["title"]) if "mixName" in data: @@ -279,8 +325,8 @@ class BeatportTrack(BeatportObject): self.genre = str(data["genres"][0].get("name")) -class BeatportPlugin(BeetsPlugin): - data_source = "Beatport" +class BeatportPlugin(MetadataSourcePlugin): + _client: BeatportClient | None = None def __init__(self): super().__init__() @@ -294,12 +340,19 @@ class BeatportPlugin(BeetsPlugin): ) self.config["apikey"].redact = True self.config["apisecret"].redact = True - self.client = None self.register_listener("import_begin", self.setup) - def setup(self, session=None): - c_key = self.config["apikey"].as_str() - c_secret = self.config["apisecret"].as_str() + @property + def client(self) -> BeatportClient: + if self._client is None: + raise ValueError( + "Beatport client not initialized. Call setup() first." + ) + return self._client + + def setup(self, session: ImportSession): + c_key: str = self.config["apikey"].as_str() + c_secret: str = self.config["apisecret"].as_str() # Get the OAuth token from a file or log in. try: @@ -312,9 +365,9 @@ class BeatportPlugin(BeetsPlugin): token = tokendata["token"] secret = tokendata["secret"] - self.client = BeatportClient(c_key, c_secret, token, secret) + self._client = BeatportClient(c_key, c_secret, token, secret) - def authenticate(self, c_key, c_secret): + def authenticate(self, c_key: str, c_secret: str) -> tuple[str, str]: # Get the link for the OAuth page. auth_client = BeatportClient(c_key, c_secret) try: @@ -341,44 +394,30 @@ class BeatportPlugin(BeetsPlugin): return token, secret - def _tokenfile(self): + def _tokenfile(self) -> str: """Get the path to the JSON file for storing the OAuth token.""" return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True)) - def album_distance(self, items, album_info, mapping): - """Returns the Beatport source weight and the maximum source weight - for albums. - """ - return get_distance( - data_source=self.data_source, info=album_info, config=self.config - ) - - def track_distance(self, item, track_info): - """Returns the Beatport source weight and the maximum source weight - for individual tracks. - """ - return get_distance( - data_source=self.data_source, info=track_info, config=self.config - ) - - def candidates(self, items, artist, release, va_likely): - """Returns a list of AlbumInfo objects for beatport search results - matching release and artist (if not various). - """ + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterator[AlbumInfo]: if va_likely: - query = release + query = album else: - query = f"{artist} {release}" + query = f"{artist} {album}" try: - return self._get_releases(query) + yield from self._get_releases(query) except BeatportAPIError as e: self._log.debug("API Error: {0} (query: {1})", e, query) - return [] + return - def item_candidates(self, item, artist, title): - """Returns a list of TrackInfo objects for beatport search results - matching title and artist. - """ + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: query = f"{artist} {title}" try: return self._get_tracks(query) @@ -386,13 +425,13 @@ class BeatportPlugin(BeetsPlugin): self._log.debug("API Error: {0} (query: {1})", e, query) return [] - def album_for_id(self, release_id): + def album_for_id(self, album_id: str): """Fetches a release by its Beatport ID and returns an AlbumInfo object or None if the query is not a valid ID or release is not found. """ - self._log.debug("Searching for release {0}", release_id) + self._log.debug("Searching for release {0}", album_id) - if not (release_id := self._get_id(release_id)): + if not (release_id := self._extract_id(album_id)): self._log.debug("Not a valid Beatport release ID.") return None @@ -401,11 +440,12 @@ class BeatportPlugin(BeetsPlugin): return self._get_album_info(release) return None - def track_for_id(self, track_id): + def track_for_id(self, track_id: str): """Fetches a track by its Beatport ID and returns a TrackInfo object or None if the track is not a valid Beatport ID or track is not found. """ self._log.debug("Searching for track {0}", track_id) + # TODO: move to extractor match = re.search(r"(^|beatport\.com/track/.+/)(\d+)$", track_id) if not match: self._log.debug("Not a valid Beatport track ID.") @@ -415,7 +455,7 @@ class BeatportPlugin(BeetsPlugin): return self._get_track_info(bp_track) return None - def _get_releases(self, query): + def _get_releases(self, query: str) -> Iterator[AlbumInfo]: """Returns a list of AlbumInfo objects for a beatport search query.""" # Strip non-word characters from query. Things like "!" and "-" can # cause a query to return no results, even if they match the artist or @@ -425,16 +465,22 @@ class BeatportPlugin(BeetsPlugin): # Strip medium information from query, Things like "CD1" and "disk 1" # can also negate an otherwise positive result. query = re.sub(r"\b(CD|disc)\s*\d+", "", query, flags=re.I) - albums = [self._get_album_info(x) for x in self.client.search(query)] - return albums + for beatport_release in self.client.search(query, "release"): + if beatport_release is None: + continue + yield self._get_album_info(beatport_release) - def _get_album_info(self, release): + def _get_album_info(self, release: BeatportRelease) -> AlbumInfo: """Returns an AlbumInfo object for a Beatport Release object.""" - va = len(release.artists) > 3 + va = release.artists is not None and len(release.artists) > 3 artist, artist_id = self._get_artist(release.artists) if va: artist = "Various Artists" - tracks = [self._get_track_info(x) for x in release.tracks] + tracks: list[TrackInfo] = [] + if release.tracks is not None: + tracks = [self._get_track_info(x) for x in release.tracks] + + release_date = release.release_date return AlbumInfo( album=release.name, @@ -445,18 +491,18 @@ class BeatportPlugin(BeetsPlugin): tracks=tracks, albumtype=release.category, va=va, - year=release.release_date.year, - month=release.release_date.month, - day=release.release_date.day, label=release.label_name, catalognum=release.catalog_number, media="Digital", data_source=self.data_source, data_url=release.url, genre=release.genre, + year=release_date.year if release_date else None, + month=release_date.month if release_date else None, + day=release_date.day if release_date else None, ) - def _get_track_info(self, track): + def _get_track_info(self, track: BeatportTrack) -> TrackInfo: """Returns a TrackInfo object for a Beatport Track object.""" title = track.name if track.mix_name != "Original Mix": @@ -482,9 +528,7 @@ class BeatportPlugin(BeetsPlugin): """Returns an artist string (all artists) and an artist_id (the main artist) for a list of Beatport release or track artists. """ - return MetadataSourcePlugin.get_artist( - artists=artists, id_key=0, name_key=1 - ) + return self.get_artist(artists=artists, id_key=0, name_key=1) def _get_tracks(self, query): """Returns a list of TrackInfo objects for a Beatport query.""" diff --git a/beetsplug/bpd/__init__.py b/beetsplug/bpd/__init__.py index 435368e35..a2ad2835c 100644 --- a/beetsplug/bpd/__init__.py +++ b/beetsplug/bpd/__init__.py @@ -30,7 +30,7 @@ from typing import TYPE_CHECKING import beets import beets.ui -from beets import dbcore, vfs +from beets import dbcore, logging, vfs from beets.library import Item from beets.plugins import BeetsPlugin from beets.util import as_string, bluelet @@ -38,6 +38,17 @@ from beets.util import as_string, bluelet if TYPE_CHECKING: from beets.dbcore.query import Query +log = logging.getLogger(__name__) + + +try: + from . import gstplayer +except ImportError as e: + raise ImportError( + "Gstreamer Python bindings not found." + ' Install "gstreamer1.0" and "python-gi" or similar package to use BPD.' + ) from e + PROTOCOL_VERSION = "0.16.0" BUFSIZE = 1024 @@ -94,11 +105,6 @@ SUBSYSTEMS = [ ] -# Gstreamer import error. -class NoGstreamerError(Exception): - pass - - # Error-handling, exceptions, parameter parsing. @@ -1099,14 +1105,6 @@ class Server(BaseServer): """ def __init__(self, library, host, port, password, ctrl_port, log): - try: - from beetsplug.bpd import gstplayer - except ImportError as e: - # This is a little hacky, but it's the best I know for now. - if e.args[0].endswith(" gst"): - raise NoGstreamerError() - else: - raise log.info("Starting server...") super().__init__(host, port, password, ctrl_port, log) self.lib = library @@ -1616,16 +1614,9 @@ class BPDPlugin(BeetsPlugin): def start_bpd(self, lib, host, port, password, volume, ctrl_port): """Starts a BPD server.""" - try: - server = Server(lib, host, port, password, ctrl_port, self._log) - server.cmd_setvol(None, volume) - server.run() - except NoGstreamerError: - self._log.error("Gstreamer Python bindings not found.") - self._log.error( - 'Install "gstreamer1.0" and "python-gi"' - "or similar package to use BPD." - ) + server = Server(lib, host, port, password, ctrl_port, self._log) + server.cmd_setvol(None, volume) + server.run() def commands(self): cmd = beets.ui.Subcommand( diff --git a/beetsplug/chroma.py b/beetsplug/chroma.py index de3ac525a..f90877113 100644 --- a/beetsplug/chroma.py +++ b/beetsplug/chroma.py @@ -19,12 +19,15 @@ autotagger. Requires the pyacoustid library. import re from collections import defaultdict from functools import cached_property, partial +from typing import Iterable import acoustid import confuse -from beets import config, plugins, ui, util +from beets import config, ui, util from beets.autotag.distance import Distance +from beets.autotag.hooks import TrackInfo +from beets.metadata_plugins import MetadataSourcePlugin from beetsplug.musicbrainz import MusicBrainzPlugin API_KEY = "1vOwZtEn" @@ -168,10 +171,9 @@ def _all_releases(items): yield release_id -class AcoustidPlugin(plugins.BeetsPlugin): +class AcoustidPlugin(MetadataSourcePlugin): def __init__(self): super().__init__() - self.config.add( { "auto": True, @@ -210,7 +212,7 @@ class AcoustidPlugin(plugins.BeetsPlugin): self._log.debug("acoustid album candidates: {0}", len(albums)) return albums - def item_candidates(self, item, artist, title): + def item_candidates(self, item, artist, title) -> Iterable[TrackInfo]: if item.path not in _matches: return [] @@ -223,6 +225,14 @@ class AcoustidPlugin(plugins.BeetsPlugin): self._log.debug("acoustid item candidates: {0}", len(tracks)) return tracks + def album_for_id(self, *args, **kwargs): + # Lookup by fingerprint ID does not make too much sense. + return None + + def track_for_id(self, *args, **kwargs): + # Lookup by fingerprint ID does not make too much sense. + return None + def commands(self): submit_cmd = ui.Subcommand( "submit", help="submit Acoustid fingerprints" diff --git a/beetsplug/deezer.py b/beetsplug/deezer.py index 7e4896437..8815e3d59 100644 --- a/beetsplug/deezer.py +++ b/beetsplug/deezer.py @@ -26,16 +26,19 @@ import unidecode from beets import ui from beets.autotag import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response +from beets.metadata_plugins import ( + IDResponse, + SearchApiMetadataSourcePlugin, + SearchFilter, +) if TYPE_CHECKING: from beets.library import Item, Library - from beetsplug._typing import JSONDict + + from ._typing import JSONDict -class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): - data_source = "Deezer" - +class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]): item_types = { "deezer_track_rank": types.INTEGER, "deezer_track_id": types.INTEGER, @@ -63,7 +66,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): def album_for_id(self, album_id: str) -> AlbumInfo | None: """Fetch an album by its Deezer ID or URL.""" - if not (deezer_id := self._get_id(album_id)): + if not (deezer_id := self._extract_id(album_id)): return None album_url = f"{self.album_url}{deezer_id}" @@ -145,11 +148,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): ) def track_for_id(self, track_id: str) -> None | TrackInfo: - """Fetch a track by its Deezer ID or URL. + """Fetch a track by its Deezer ID or URL and return a + TrackInfo object or None if the track is not found. + + :param track_id: (Optional) Deezer ID or URL for the track. Either + ``track_id`` or ``track_data`` must be provided. - Returns a TrackInfo object or None if the track is not found. """ - if not (deezer_id := self._get_id(track_id)): + if not (deezer_id := self._extract_id(track_id)): self._log.debug("Invalid Deezer track_id: {}", track_id) return None @@ -162,11 +168,13 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): # Get album's tracks to set `track.index` (position on the entire # release) and `track.medium_total` (total number of tracks on # the track's disc). - album_tracks_obj = self.fetch_data( - self.album_url + str(track_data["album"]["id"]) + "/tracks" - ) - if album_tracks_obj is None: + if not ( + album_tracks_obj := self.fetch_data( + self.album_url + str(track_data["album"]["id"]) + "/tracks" + ) + ): return None + try: album_tracks_data = album_tracks_obj["data"] except KeyError: @@ -187,7 +195,6 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): """Convert a Deezer track object dict to a TrackInfo object. :param track_data: Deezer Track object dict - :return: TrackInfo object for track """ artist, artist_id = self.get_artist( track_data.get("contributors", [track_data["artist"]]) @@ -211,7 +218,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): @staticmethod def _construct_search_query( - filters: dict[str, str], keywords: str = "" + filters: SearchFilter, keywords: str = "" ) -> str: """Construct a query string with the specified filters and keywords to be provided to the Deezer Search API @@ -242,14 +249,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): "radio", "user", ], - filters: dict[str, str], + filters: SearchFilter, keywords="", - ) -> Sequence[Response]: + ) -> Sequence[IDResponse]: """Query the Deezer Search API for the specified ``keywords``, applying the provided ``filters``. - :param query_type: The Deezer Search API method to use. - :param keywords: (Optional) Query keywords to use. + :param filters: Field filters to apply. + :param keywords: Query keywords to use. :return: JSON data for the class:`Response ` object or None if no search results are returned. """ @@ -269,7 +276,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin): e, ) return () - response_data = response.json().get("data", []) + response_data: Sequence[IDResponse] = response.json().get("data", []) self._log.debug( "Found {} result(s) from {} for '{}'", len(response_data), diff --git a/beetsplug/discogs.py b/beetsplug/discogs.py index 2408f3498..9765f317f 100644 --- a/beetsplug/discogs.py +++ b/beetsplug/discogs.py @@ -27,7 +27,7 @@ import time import traceback from functools import cache from string import ascii_lowercase -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence import confuse from discogs_client import Client, Master, Release @@ -40,8 +40,7 @@ import beets.ui from beets import config from beets.autotag.distance import string_dist from beets.autotag.hooks import AlbumInfo, TrackInfo -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance -from beets.util.id_extractors import extract_release_id +from beets.metadata_plugins import MetadataSourcePlugin if TYPE_CHECKING: from collections.abc import Callable, Iterable @@ -84,7 +83,7 @@ class ReleaseFormat(TypedDict): descriptions: list[str] | None -class DiscogsPlugin(BeetsPlugin): +class DiscogsPlugin(MetadataSourcePlugin): def __init__(self): super().__init__() self.config.add( @@ -169,20 +168,8 @@ class DiscogsPlugin(BeetsPlugin): return token, secret - def album_distance(self, items, album_info, mapping): - """Returns the album distance.""" - return get_distance( - data_source="Discogs", info=album_info, config=self.config - ) - - def track_distance(self, item, track_info): - """Returns the track distance.""" - return get_distance( - data_source="Discogs", info=track_info, config=self.config - ) - def candidates( - self, items: list[Item], artist: str, album: str, va_likely: bool + self, items: Sequence[Item], artist: str, album: str, va_likely: bool ) -> Iterable[AlbumInfo]: return self.get_albums(f"{artist} {album}" if va_likely else album) @@ -217,7 +204,7 @@ class DiscogsPlugin(BeetsPlugin): """ self._log.debug("Searching for release {0}", album_id) - discogs_id = extract_release_id("discogs", album_id) + discogs_id = self._extract_id(album_id) if not discogs_id: return None @@ -272,7 +259,7 @@ class DiscogsPlugin(BeetsPlugin): exc_info=True, ) return [] - return map(self.get_album_info, releases) + return filter(None, map(self.get_album_info, releases)) @cache def get_master_year(self, master_id: str) -> int | None: @@ -334,7 +321,7 @@ class DiscogsPlugin(BeetsPlugin): self._log.warning("Release does not contain the required fields") return None - artist, artist_id = MetadataSourcePlugin.get_artist( + artist, artist_id = self.get_artist( [a.data for a in result.artists], join_key="join" ) album = re.sub(r" +", " ", result.title) @@ -359,7 +346,7 @@ class DiscogsPlugin(BeetsPlugin): else: genre = base_genre - discogs_albumid = extract_release_id("discogs", result.data.get("uri")) + discogs_albumid = self._extract_id(result.data.get("uri")) # Extract information for the optional AlbumInfo fields that are # contained on nested discogs fields. @@ -419,7 +406,7 @@ class DiscogsPlugin(BeetsPlugin): genre=genre, media=media, original_year=original_year, - data_source="Discogs", + data_source=self.data_source, data_url=data_url, discogs_albumid=discogs_albumid, discogs_labelid=labelid, @@ -638,7 +625,7 @@ class DiscogsPlugin(BeetsPlugin): title = f"{prefix}: {title}" track_id = None medium, medium_index, _ = self.get_track_index(track["position"]) - artist, artist_id = MetadataSourcePlugin.get_artist( + artist, artist_id = self.get_artist( track.get("artists", []), join_key="join" ) length = self.get_track_length(track["duration"]) diff --git a/beetsplug/lastgenre/__init__.py b/beetsplug/lastgenre/__init__.py index b67f1fae2..dbab96cf8 100644 --- a/beetsplug/lastgenre/__init__.py +++ b/beetsplug/lastgenre/__init__.py @@ -401,7 +401,7 @@ class LastGenrePlugin(plugins.BeetsPlugin): label = "album" if not new_genres and "artist" in self.sources: - new_genres = None + new_genres = [] if isinstance(obj, library.Item): new_genres = self.fetch_artist_genre(obj) label = "artist" diff --git a/beetsplug/mbsync.py b/beetsplug/mbsync.py index d38b25e9f..3f7daec6c 100644 --- a/beetsplug/mbsync.py +++ b/beetsplug/mbsync.py @@ -16,7 +16,7 @@ from collections import defaultdict -from beets import autotag, library, plugins, ui, util +from beets import autotag, library, metadata_plugins, ui, util from beets.plugins import BeetsPlugin, apply_item_changes @@ -78,7 +78,9 @@ class MBSyncPlugin(BeetsPlugin): ) continue - if not (track_info := plugins.track_for_id(item.mb_trackid)): + if not ( + track_info := metadata_plugins.track_for_id(item.mb_trackid) + ): self._log.info( "Recording ID not found: {0.mb_trackid} for track {0}", item ) @@ -99,7 +101,9 @@ class MBSyncPlugin(BeetsPlugin): self._log.info("Skipping album with no mb_albumid: {}", album) continue - if not (album_info := plugins.album_for_id(album.mb_albumid)): + if not ( + album_info := metadata_plugins.album_for_id(album.mb_albumid) + ): self._log.info( "Release ID {0.mb_albumid} not found for album {0}", album ) diff --git a/beetsplug/missing.py b/beetsplug/missing.py index 8c328e647..d0e956930 100644 --- a/beetsplug/missing.py +++ b/beetsplug/missing.py @@ -21,7 +21,7 @@ from collections.abc import Iterator import musicbrainzngs from musicbrainzngs.musicbrainz import MusicBrainzError -from beets import config, plugins +from beets import config, metadata_plugins from beets.dbcore import types from beets.library import Album, Item, Library from beets.plugins import BeetsPlugin @@ -222,7 +222,7 @@ class MissingPlugin(BeetsPlugin): item_mbids = {x.mb_trackid for x in album.items()} # fetch missing items # TODO: Implement caching that without breaking other stuff - if album_info := plugins.album_for_id(album.mb_albumid): + if album_info := metadata_plugins.album_for_id(album.mb_albumid): for track_info in album_info.tracks: if track_info.track_id not in item_mbids: self._log.debug( diff --git a/beetsplug/musicbrainz.py b/beetsplug/musicbrainz.py index e33cc4fce..b52e44b23 100644 --- a/beetsplug/musicbrainz.py +++ b/beetsplug/musicbrainz.py @@ -20,7 +20,7 @@ import traceback from collections import Counter from functools import cached_property from itertools import product -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Iterable, Sequence from urllib.parse import urljoin import musicbrainzngs @@ -28,11 +28,10 @@ import musicbrainzngs import beets import beets.autotag.hooks from beets import config, plugins, util -from beets.plugins import BeetsPlugin +from beets.metadata_plugins import MetadataSourcePlugin from beets.util.id_extractors import extract_release_id if TYPE_CHECKING: - from collections.abc import Iterator, Sequence from typing import Literal from beets.library import Item @@ -362,9 +361,7 @@ def _merge_pseudo_and_actual_album( return merged -class MusicBrainzPlugin(BeetsPlugin): - data_source = "Musicbrainz" - +class MusicBrainzPlugin(MetadataSourcePlugin): def __init__(self): """Set up the python-musicbrainz-ngs module according to settings from the beets configuration. This should be called at startup. @@ -421,7 +418,7 @@ class MusicBrainzPlugin(BeetsPlugin): medium=medium, medium_index=medium_index, medium_total=medium_total, - data_source="MusicBrainz", + data_source=self.data_source, data_url=track_url(recording["id"]), ) @@ -632,7 +629,7 @@ class MusicBrainzPlugin(BeetsPlugin): artists_sort=artists_sort_names, artist_credit=artist_credit_name, artists_credit=artists_credit_names, - data_source="MusicBrainz", + data_source=self.data_source, data_url=album_url(release["id"]), barcode=release.get("barcode"), ) @@ -767,7 +764,7 @@ class MusicBrainzPlugin(BeetsPlugin): return mb_field_by_tag def get_album_criteria( - self, items: list[Item], artist: str, album: str, va_likely: bool + self, items: Sequence[Item], artist: str, album: str, va_likely: bool ) -> dict[str, str]: criteria = { "release": album, @@ -813,12 +810,11 @@ class MusicBrainzPlugin(BeetsPlugin): def candidates( self, - items: list[Item], + items: Sequence[Item], artist: str, album: str, va_likely: bool, - extra_tags: dict[str, Any] | None = None, - ) -> Iterator[beets.autotag.hooks.AlbumInfo]: + ) -> Iterable[beets.autotag.hooks.AlbumInfo]: criteria = self.get_album_criteria(items, artist, album, va_likely) release_ids = (r["id"] for r in self._search_api("release", criteria)) @@ -826,7 +822,7 @@ class MusicBrainzPlugin(BeetsPlugin): def item_candidates( self, item: Item, artist: str, title: str - ) -> Iterator[beets.autotag.hooks.TrackInfo]: + ) -> Iterable[beets.autotag.hooks.TrackInfo]: criteria = {"artist": artist, "recording": title, "alias": title} yield from filter( @@ -841,7 +837,7 @@ class MusicBrainzPlugin(BeetsPlugin): MusicBrainzAPIError. """ self._log.debug("Requesting MusicBrainz release {}", album_id) - if not (albumid := extract_release_id("musicbrainz", album_id)): + if not (albumid := self._extract_id(album_id)): self._log.debug("Invalid MBID ({0}).", album_id) return None @@ -878,7 +874,7 @@ class MusicBrainzPlugin(BeetsPlugin): """Fetches a track by its MusicBrainz ID. Returns a TrackInfo object or None if no track is found. May raise a MusicBrainzAPIError. """ - if not (trackid := extract_release_id("musicbrainz", track_id)): + if not (trackid := self._extract_id(track_id)): self._log.debug("Invalid MBID ({0}).", track_id) return None diff --git a/beetsplug/replaygain.py b/beetsplug/replaygain.py index 00b651d99..96c854314 100644 --- a/beetsplug/replaygain.py +++ b/beetsplug/replaygain.py @@ -1161,7 +1161,9 @@ class ExceptionWatcher(Thread): Once an exception occurs, raise it and execute a callback. """ - def __init__(self, queue: queue.Queue, callback: Callable[[], None]): + def __init__( + self, queue: queue.Queue[Exception], callback: Callable[[], None] + ): self._queue = queue self._callback = callback self._stopevent = Event() @@ -1197,7 +1199,9 @@ BACKENDS: dict[str, type[Backend]] = {b.NAME: b for b in BACKEND_CLASSES} class ReplayGainPlugin(BeetsPlugin): """Provides ReplayGain analysis.""" - def __init__(self): + pool: ThreadPool | None = None + + def __init__(self) -> None: super().__init__() # default backend is 'command' for backward-compatibility. @@ -1261,9 +1265,6 @@ class ReplayGainPlugin(BeetsPlugin): except (ReplayGainError, FatalReplayGainError) as e: raise ui.UserError(f"replaygain initialization failed: {e}") - # Start threadpool lazily. - self.pool = None - def should_use_r128(self, item: Item) -> bool: """Checks the plugin setting to decide whether the calculation should be done using the EBU R128 standard and use R128_ tags instead. @@ -1420,7 +1421,7 @@ class ReplayGainPlugin(BeetsPlugin): """Open a `ThreadPool` instance in `self.pool`""" if self.pool is None and self.backend_instance.do_parallel: self.pool = ThreadPool(threads) - self.exc_queue: queue.Queue = queue.Queue() + self.exc_queue: queue.Queue[Exception] = queue.Queue() signal.signal(signal.SIGINT, self._interrupt) diff --git a/beetsplug/spotify.py b/beetsplug/spotify.py index 36790b56b..fa5dc5c52 100644 --- a/beetsplug/spotify.py +++ b/beetsplug/spotify.py @@ -25,7 +25,7 @@ import json import re import time import webbrowser -from typing import TYPE_CHECKING, Any, Literal, Sequence +from typing import TYPE_CHECKING, Any, Literal, Sequence, Union import confuse import requests @@ -34,7 +34,12 @@ import unidecode from beets import ui from beets.autotag.hooks import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response +from beets.library import Library +from beets.metadata_plugins import ( + IDResponse, + SearchApiMetadataSourcePlugin, + SearchFilter, +) if TYPE_CHECKING: from beets.library import Library @@ -43,13 +48,41 @@ if TYPE_CHECKING: DEFAULT_WAITING_TIME = 5 -class SpotifyAPIError(Exception): +class SearchResponseAlbums(IDResponse): + """A response returned by the Spotify API. + + We only use items and disregard the pagination information. + i.e. res["albums"]["items"][0]. + + There are more fields in the response, but we only type + the ones we currently use. + + see https://developer.spotify.com/documentation/web-api/reference/search + """ + + album_type: str + available_markets: Sequence[str] + name: str + + +class SearchResponseTracks(IDResponse): + """A track response returned by the Spotify API.""" + + album: SearchResponseAlbums + available_markets: Sequence[str] + popularity: int + name: str + + +class APIError(Exception): pass -class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): - data_source = "Spotify" - +class SpotifyPlugin( + SearchApiMetadataSourcePlugin[ + Union[SearchResponseAlbums, SearchResponseTracks] + ] +): item_types = { "spotify_track_popularity": types.INTEGER, "spotify_acousticness": types.FLOAT, @@ -129,7 +162,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): """Get the path to the JSON file for storing the OAuth token.""" return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True)) - def _authenticate(self): + def _authenticate(self) -> None: """Request an access token via the Client Credentials Flow: https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow """ @@ -180,7 +213,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): """ if retry_count > max_retries: - raise SpotifyAPIError("Maximum retries reached.") + raise APIError("Maximum retries reached.") try: response = requests.request( @@ -194,14 +227,14 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return response.json() except requests.exceptions.ReadTimeout: self._log.error("ReadTimeout.") - raise SpotifyAPIError("Request timed out.") + raise APIError("Request timed out.") except requests.exceptions.ConnectionError as e: self._log.error(f"Network error: {e}") - raise SpotifyAPIError("Network error.") + raise APIError("Network error.") except requests.exceptions.RequestException as e: if e.response is None: self._log.error(f"Request failed: {e}") - raise SpotifyAPIError("Request failed.") + raise APIError("Request failed.") if e.response.status_code == 401: self._log.debug( f"{self.data_source} access token has expired. " @@ -215,7 +248,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): retry_count=retry_count + 1, ) elif e.response.status_code == 404: - raise SpotifyAPIError( + raise APIError( f"API Error: {e.response.status_code}\n" f"URL: {url}\nparams: {params}" ) @@ -235,18 +268,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): ) elif e.response.status_code == 503: self._log.error("Service Unavailable.") - raise SpotifyAPIError("Service Unavailable.") + raise APIError("Service Unavailable.") elif e.response.status_code == 502: self._log.error("Bad Gateway.") - raise SpotifyAPIError("Bad Gateway.") + raise APIError("Bad Gateway.") elif e.response is not None: - raise SpotifyAPIError( + raise APIError( f"{self.data_source} API error:\n{e.response.text}\n" f"URL:\n{url}\nparams:\n{params}" ) else: self._log.error(f"Request failed. Error: {e}") - raise SpotifyAPIError("Request failed.") + raise APIError("Request failed.") def album_for_id(self, album_id: str) -> AlbumInfo | None: """Fetch an album by its Spotify ID or URL and return an @@ -257,7 +290,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): :return: AlbumInfo object for album :rtype: beets.autotag.hooks.AlbumInfo or None """ - if not (spotify_id := self._get_id(album_id)): + if not (spotify_id := self._extract_id(album_id)): return None album_data = self._handle_response("get", self.album_url + spotify_id) @@ -360,7 +393,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): Returns a TrackInfo object or None if the track is not found. """ - if not (spotify_id := self._get_id(track_id)): + if not (spotify_id := self._extract_id(track_id)): self._log.debug("Invalid Spotify ID: {}", track_id) return None @@ -390,7 +423,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return track def _construct_search_query( - self, filters: dict[str, str], keywords: str = "" + self, filters: SearchFilter, keywords: str = "" ) -> str: """Construct a query string with the specified filters and keywords to be provided to the Spotify Search API @@ -400,9 +433,10 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): :param keywords: (Optional) Query keywords to use. :return: Query string to be provided to the Search API. """ + query_components = [ keywords, - " ".join(":".join((k, v)) for k, v in filters.items()), + " ".join(f"{k}:{v}" for k, v in filters.items()), ] query = " ".join([q for q in query_components if q]) if not isinstance(query, str): @@ -416,9 +450,9 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): def _search_api( self, query_type: Literal["album", "track"], - filters: dict[str, str], + filters: SearchFilter, keywords: str = "", - ) -> Sequence[Response]: + ) -> Sequence[SearchResponseAlbums | SearchResponseTracks]: """Query the Spotify Search API for the specified ``keywords``, applying the provided ``filters``. @@ -436,7 +470,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): self.search_url, params={"q": query, "type": query_type}, ) - except SpotifyAPIError as e: + except APIError as e: self._log.debug("Spotify API error: {}", e) return () response_data = response.get(query_type + "s", {}).get("items", []) @@ -557,7 +591,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): keywords = item[self.config["track_field"].get()] # Query the Web API for each track, look for the items' JSON data - query_filters = {"artist": artist, "album": album} + query_filters: SearchFilter = {"artist": artist, "album": album} response_data_tracks = self._search_api( query_type="track", keywords=keywords, filters=query_filters ) @@ -570,7 +604,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): continue # Apply market filter if requested - region_filter = self.config["region_filter"].get() + region_filter: str = self.config["region_filter"].get() if region_filter: response_data_tracks = [ track_data @@ -595,7 +629,11 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): len(response_data_tracks), ) chosen_result = max( - response_data_tracks, key=lambda x: x["popularity"] + response_data_tracks, + key=lambda x: x[ + # We are sure this is a track response! + "popularity" # type: ignore[typeddict-item] + ], ) results.append(chosen_result) @@ -691,16 +729,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): def track_info(self, track_id: str): """Fetch a track's popularity and external IDs using its Spotify ID.""" track_data = self._handle_response("get", self.track_url + track_id) + external_ids = track_data.get("external_ids", {}) + popularity = track_data.get("popularity") self._log.debug( "track_popularity: {} and track_isrc: {}", - track_data.get("popularity"), - track_data.get("external_ids").get("isrc"), + popularity, + external_ids.get("isrc"), ) return ( - track_data.get("popularity"), - track_data.get("external_ids").get("isrc"), - track_data.get("external_ids").get("ean"), - track_data.get("external_ids").get("upc"), + popularity, + external_ids.get("isrc"), + external_ids.get("ean"), + external_ids.get("upc"), ) def track_audio_features(self, track_id: str): @@ -709,6 +749,6 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): return self._handle_response( "get", self.audio_features_url + track_id ) - except SpotifyAPIError as e: + except APIError as e: self._log.debug("Spotify API error: {}", e) return None diff --git a/codecov.yml b/codecov.yml index c4b333ad3..c899db06a 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,5 +1,6 @@ -# Don't post a comment on pull requests. -comment: off +comment: + layout: "condensed_header, condensed_files" + require_changes: true # Sets non-blocking status checks # https://docs.codecov.com/docs/commit-status#informational @@ -11,7 +12,7 @@ coverage: patch: default: informational: true - changes: no + changes: false github_checks: annotations: false diff --git a/docs/changelog.rst b/docs/changelog.rst index f214a022f..b78aa1602 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -67,9 +67,20 @@ For plugin developers: * The `fetchart` plugins has seen a few changes to function signatures and source registration in the process of introducing typings to the code. Custom art sources might need to be adapted. - +* We split the responsibilities of plugins into two base classes + #. :class:`beets.plugins.BeetsPlugin` + is the base class for all plugins, any plugin needs to inherit from this class. + #. :class:`beets.metadata_plugin.MetadataSourcePlugin` + allows plugins to act like metadata sources. E.g. used by the MusicBrainz plugin. All plugins + in the beets repo are opted into this class where applicable. If you are maintaining a plugin + that acts like a metadata source, i.e. you expose any of ``track_for_id``, + ``album_for_id``, ``candidates``, ``item_candidates``, ``album_distance``, ``track_distance`` methods, + please update your plugin to inherit from the new baseclass, as otherwise your plugin will + stop working with the next major release. + Other changes: +* Refactor: Split responsibilities of Plugins into MetaDataPlugins and general Plugins. * Documentation structure for auto generated API references changed slightly. Autogenerated API references are now located in the `docs/api` subdirectory. * :doc:`/plugins/substitute`: Fix rST formatting for example cases so that each diff --git a/pyproject.toml b/pyproject.toml index ea69240d5..39c543307 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,7 +124,7 @@ aura = ["flask", "flask-cors", "Pillow"] autobpm = ["librosa", "resampy"] # badfiles # mp3val and flac beatport = ["requests-oauthlib"] -bpd = ["PyGObject"] # python-gi and GStreamer 1.0+ +bpd = ["PyGObject"] # gobject-introspection, gstreamer1.0-plugins-base, python3-gst-1.0 chroma = ["pyacoustid"] # chromaprint or fpcalc # convert # ffmpeg docs = ["pydata-sphinx-theme", "sphinx"] diff --git a/setup.cfg b/setup.cfg index e3472b04c..0b50485ea 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,7 +15,7 @@ markers = data_file = .reports/coverage/data branch = true relative_files = true -omit = +omit = beets/test/* beetsplug/_typing.py @@ -34,7 +34,6 @@ exclude_also = show_contexts = true [mypy] -files = beets,beetsplug,test,extra,docs allow_any_generics = false # FIXME: Would be better to actually type the libraries (if under our control), # or write our own stubs. For now, silence errors @@ -46,6 +45,8 @@ explicit_package_bases = true # config for all files. [[mypy-beets.plugins]] disallow_untyped_decorators = true -disallow_any_generics = true check_untyped_defs = true -allow_redefinition = true + +[[mypy-beets.metadata_plugins]] +disallow_untyped_decorators = true +check_untyped_defs = true diff --git a/test/plugins/test_player.py b/test/plugins/test_bpd.py similarity index 97% rename from test/plugins/test_player.py rename to test/plugins/test_bpd.py index a7c613d8f..16e424d7e 100644 --- a/test/plugins/test_player.py +++ b/test/plugins/test_bpd.py @@ -14,19 +14,15 @@ """Tests for BPD's implementation of the MPD protocol.""" -import importlib.util import multiprocessing as mp import os import socket -import sys import tempfile import threading import time import unittest from contextlib import contextmanager - -# Mock GstPlayer so that the forked process doesn't attempt to import gi: -from unittest import mock +from unittest.mock import MagicMock, patch import confuse import pytest @@ -34,43 +30,8 @@ import yaml from beets.test.helper import PluginTestCase from beets.util import bluelet -from beetsplug import bpd -gstplayer = importlib.util.module_from_spec( - importlib.util.find_spec("beetsplug.bpd.gstplayer") -) - - -def _gstplayer_play(*_): - bpd.gstplayer._GstPlayer.playing = True - return mock.DEFAULT - - -gstplayer._GstPlayer = mock.MagicMock( - spec_set=[ - "time", - "volume", - "playing", - "run", - "play_file", - "pause", - "stop", - "seek", - "play", - "get_decoders", - ], - **{ - "playing": False, - "volume": 0, - "time.return_value": (0, 0), - "play_file.side_effect": _gstplayer_play, - "play.side_effect": _gstplayer_play, - "get_decoders.return_value": {"default": ({"audio/mpeg"}, {"mp3"})}, - }, -) -gstplayer.GstPlayer = lambda _: gstplayer._GstPlayer -sys.modules["beetsplug.bpd.gstplayer"] = gstplayer -bpd.gstplayer = gstplayer +bpd = pytest.importorskip("beetsplug.bpd") class CommandParseTest(unittest.TestCase): @@ -256,7 +217,7 @@ def implements(commands, fail=False): bluelet_listener = bluelet.Listener -@mock.patch("beets.util.bluelet.Listener") +@patch("beets.util.bluelet.Listener") def start_server(args, assigned_port, listener_patch): """Start the bpd server, writing the port to `assigned_port`.""" @@ -938,7 +899,7 @@ class BPDPlaylistsTest(BPDTestHelper): response = client.send_command("load", "anything") self._assert_failed(response, bpd.ERROR_NO_EXIST) - @unittest.skip + @unittest.expectedFailure def test_cmd_playlistadd(self): with self.run_bpd() as client: self._bpd_add(client, self.item1, playlist="anything") @@ -1128,7 +1089,7 @@ class BPDConnectionTest(BPDTestHelper): self._assert_ok(response) assert self.TAGTYPES == set(response.data["tagtype"]) - @unittest.skip + @unittest.expectedFailure def test_tagtypes_mask(self): with self.run_bpd() as client: response = client.send_command("tagtypes", "clear") @@ -1169,6 +1130,10 @@ class BPDReflectionTest(BPDTestHelper): fail=True, ) + @patch( + "beetsplug.bpd.gstplayer.GstPlayer.get_decoders", + MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}), + ) def test_cmd_decoders(self): with self.run_bpd() as client: response = client.send_command("decoders") diff --git a/test/plugins/test_mbsync.py b/test/plugins/test_mbsync.py index 088165ef5..bb88e5e63 100644 --- a/test/plugins/test_mbsync.py +++ b/test/plugins/test_mbsync.py @@ -23,7 +23,7 @@ class MbsyncCliTest(PluginTestCase): plugin = "mbsync" @patch( - "beets.plugins.album_for_id", + "beets.metadata_plugins.album_for_id", Mock( side_effect=lambda *_: AlbumInfo( album_id="album id", @@ -33,7 +33,7 @@ class MbsyncCliTest(PluginTestCase): ), ) @patch( - "beets.plugins.track_for_id", + "beets.metadata_plugins.track_for_id", Mock( side_effect=lambda *_: TrackInfo( track_id="singleton id", title="new title" diff --git a/test/plugins/test_types_plugin.py b/test/plugins/test_types_plugin.py index b41e9bb18..41807b80d 100644 --- a/test/plugins/test_types_plugin.py +++ b/test/plugins/test_types_plugin.py @@ -134,7 +134,7 @@ class TypesPluginTest(PluginTestCase): def test_unknown_type_error(self): self.config["types"] = {"flex": "unkown type"} with pytest.raises(ConfigValueError): - self.run_command("ls") + self.add_item(flex="test") def test_template_if_def(self): # Tests for a subtle bug when using %ifdef in templates along with diff --git a/test/test_importer.py b/test/test_importer.py index 9ec160568..c1768df3e 100644 --- a/test/test_importer.py +++ b/test/test_importer.py @@ -913,7 +913,9 @@ def album_candidates_mock(*args, **kwargs): ) -@patch("beets.plugins.candidates", Mock(side_effect=album_candidates_mock)) +@patch( + "beets.metadata_plugins.candidates", Mock(side_effect=album_candidates_mock) +) class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase): plugin = "musicbrainz" @@ -1031,7 +1033,10 @@ def item_candidates_mock(*args, **kwargs): ) -@patch("beets.plugins.item_candidates", Mock(side_effect=item_candidates_mock)) +@patch( + "beets.metadata_plugins.item_candidates", + Mock(side_effect=item_candidates_mock), +) class ImportDuplicateSingletonTest(ImportTestCase): def setUp(self): super().setUp() @@ -1567,8 +1572,14 @@ def mocked_get_track_by_id(id_): ) -@patch("beets.plugins.track_for_id", Mock(side_effect=mocked_get_track_by_id)) -@patch("beets.plugins.album_for_id", Mock(side_effect=mocked_get_album_by_id)) +@patch( + "beets.metadata_plugins.track_for_id", + Mock(side_effect=mocked_get_track_by_id), +) +@patch( + "beets.metadata_plugins.album_for_id", + Mock(side_effect=mocked_get_album_by_id), +) class ImportIdTest(ImportTestCase): ID_RELEASE_0 = "00000000-0000-0000-0000-000000000000" ID_RELEASE_1 = "11111111-1111-1111-1111-111111111111" @@ -1616,9 +1627,9 @@ class ImportIdTest(ImportTestCase): task = importer.ImportTask( paths=self.import_dir, toppath="top path", items=[_common.item()] ) - task.search_ids = [self.ID_RELEASE_0, self.ID_RELEASE_1] - task.lookup_candidates() + task.lookup_candidates([self.ID_RELEASE_0, self.ID_RELEASE_1]) + assert {"VALID_RELEASE_0", "VALID_RELEASE_1"} == { c.info.album for c in task.candidates } @@ -1628,9 +1639,9 @@ class ImportIdTest(ImportTestCase): task = importer.SingletonImportTask( toppath="top path", item=_common.item() ) - task.search_ids = [self.ID_RECORDING_0, self.ID_RECORDING_1] - task.lookup_candidates() + task.lookup_candidates([self.ID_RECORDING_0, self.ID_RECORDING_1]) + assert {"VALID_RECORDING_0", "VALID_RECORDING_1"} == { c.info.title for c in task.candidates } diff --git a/test/test_ui.py b/test/test_ui.py index 713e69891..664323e2a 100644 --- a/test/test_ui.py +++ b/test/test_ui.py @@ -1024,7 +1024,9 @@ class ConfigTest(TestPluginTestCase): file.write("plugins: test") self.run_command("--config", self.cli_config_path, "plugin", lib=None) - assert plugins.find_plugins()[0].is_test_plugin + plugs = plugins.find_plugins() + assert len(plugs) == 1 + assert plugs[0].is_test_plugin self.unload_plugins() def test_beetsdir_config(self):