Merge branch 'master' into feature/add-artist-to-item-entry-template

This commit is contained in:
Martin Atukunda 2025-07-18 00:25:42 +03:00 committed by GitHub
commit 4a7e474efc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 975 additions and 752 deletions

View file

@ -33,7 +33,7 @@ jobs:
if: matrix.platform == 'ubuntu-latest'
run: |
sudo apt update
sudo apt install ffmpeg gobject-introspection libcairo2-dev libgirepository-2.0-dev pandoc imagemagick
sudo apt install --yes --no-install-recommends ffmpeg gobject-introspection gstreamer1.0-plugins-base python3-gst-1.0 libcairo2-dev libgirepository-2.0-dev pandoc imagemagick
- name: Get changed lyrics files
id: lyrics-update

View file

@ -105,7 +105,6 @@ jobs:
- name: Type check code
uses: liskin/gh-problem-matcher-wrap@v3
continue-on-error: true
with:
linters: mypy
run: poe check-types --show-column-numbers --no-error-summary ${{ needs.changed-files.outputs.changed_python_files }}

View file

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any
from jellyfish import levenshtein_distance
from unidecode import unidecode
from beets import config, plugins
from beets import config, metadata_plugins
from beets.util import as_string, cached_classproperty, get_most_common_tags
if TYPE_CHECKING:
@ -409,7 +409,7 @@ def track_distance(
dist.add_expr("medium", item.disc != track_info.medium)
# Plugins.
dist.update(plugins.track_distance(item, track_info))
dist.update(metadata_plugins.track_distance(item, track_info))
return dist
@ -526,6 +526,6 @@ def distance(
dist.add("unmatched_tracks", 1.0)
# Plugins.
dist.update(plugins.album_distance(items, album_info, mapping))
dist.update(metadata_plugins.album_distance(items, album_info, mapping))
return dist

View file

@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
import lap
import numpy as np
from beets import config, logging, plugins
from beets import config, logging, metadata_plugins
from beets.autotag import AlbumInfo, AlbumMatch, TrackInfo, TrackMatch, hooks
from beets.util import get_most_common_tags
@ -119,7 +119,7 @@ def match_by_id(items: Iterable[Item]) -> AlbumInfo | None:
return None
# If all album IDs are equal, look up the album.
log.debug("Searching for discovered album ID: {0}", first)
return plugins.album_for_id(first)
return metadata_plugins.album_for_id(first)
def _recommendation(
@ -274,7 +274,7 @@ def tag_album(
if search_ids:
for search_id in search_ids:
log.debug("Searching for album ID: {0}", search_id)
if info := plugins.album_for_id(search_id):
if info := metadata_plugins.album_for_id(search_id):
_add_candidate(items, candidates, info)
# Use existing metadata or text search.
@ -311,7 +311,7 @@ def tag_album(
log.debug("Album might be VA: {0}", va_likely)
# Get the results from the data sources.
for matched_candidate in plugins.candidates(
for matched_candidate in metadata_plugins.candidates(
items, search_artist, search_album, va_likely
):
_add_candidate(items, candidates, matched_candidate)
@ -346,7 +346,7 @@ def tag_item(
if trackids:
for trackid in trackids:
log.debug("Searching for track ID: {0}", trackid)
if info := plugins.track_for_id(trackid):
if info := metadata_plugins.track_for_id(trackid):
dist = track_distance(item, info, incl_artist=True)
candidates[info.track_id] = hooks.TrackMatch(dist, info)
# If this is a good match, then don't keep searching.
@ -372,7 +372,7 @@ def tag_item(
log.debug("Item search terms: {0} - {1}", search_artist, search_title)
# Get and evaluate candidate metadata.
for track_info in plugins.item_candidates(
for track_info in metadata_plugins.item_candidates(
item, search_artist, search_title
):
dist = track_distance(item, track_info, incl_artist=True)

View file

@ -289,19 +289,22 @@ class Model(ABC, Generic[D]):
terms.
"""
_types: dict[str, types.Type] = {}
"""Optional Types for non-fixed (i.e., flexible and computed) fields.
"""
@cached_classproperty
def _types(cls) -> dict[str, types.Type]:
"""Optional types for non-fixed (flexible and computed) fields."""
return {}
_sorts: dict[str, type[FieldSort]] = {}
"""Optional named sort criteria. The keys are strings and the values
are subclasses of `Sort`.
"""
_queries: dict[str, FieldQueryType] = {}
"""Named queries that use a field-like `name:value` syntax but which
do not relate to any specific field.
"""
@cached_classproperty
def _queries(cls) -> dict[str, FieldQueryType]:
"""Named queries that use a field-like `name:value` syntax but which
do not relate to any specific field.
"""
return {}
_always_dirty = False
"""By default, fields only become "dirty" when their value actually

View file

@ -28,6 +28,7 @@ from re import Pattern
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union
from beets import util
from beets.util.units import raw_seconds_short
if TYPE_CHECKING:
from beets.dbcore.db import AnyModel, Model
@ -892,7 +893,7 @@ class DurationQuery(NumericQuery):
if not s:
return None
try:
return util.raw_seconds_short(s)
return raw_seconds_short(s)
except ValueError:
try:
return float(s)

View file

@ -292,7 +292,7 @@ class DelimitedString(BaseString[list[str], list[str]]):
containing delimiter-separated values.
"""
model_type = list
model_type = list[str]
def __init__(self, delimiter: str):
self.delimiter = delimiter

View file

@ -70,6 +70,7 @@ def query_tasks(session: ImportSession):
Instead of finding files from the filesystem, a query is used to
match items from the library.
"""
task: ImportTask
if session.config["singletons"]:
# Search for items.
for item in session.lib.items(session.query):
@ -143,9 +144,7 @@ def lookup_candidates(session: ImportSession, task: ImportTask):
# Restrict the initial lookup to IDs specified by the user via the -m
# option. Currently all the IDs are passed onto the tasks directly.
task.search_ids = session.config["search_ids"].as_str_seq()
task.lookup_candidates()
task.lookup_candidates(session.config["search_ids"].as_str_seq())
@pipeline.stage

View file

@ -22,7 +22,7 @@ import time
from collections import defaultdict
from enum import Enum
from tempfile import mkdtemp
from typing import TYPE_CHECKING, Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
import mediafile
@ -32,6 +32,8 @@ from beets.dbcore.query import PathQuery
from .state import ImportState
if TYPE_CHECKING:
from beets.autotag.match import Recommendation
from .session import ImportSession
# Global logger.
@ -159,6 +161,7 @@ class ImportTask(BaseImportTask):
cur_album: str | None = None
cur_artist: str | None = None
candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = []
rec: Recommendation | None = None
def __init__(
self,
@ -167,11 +170,9 @@ class ImportTask(BaseImportTask):
items: Iterable[library.Item] | None,
):
super().__init__(toppath, paths, items)
self.rec = None
self.should_remove_duplicates = False
self.should_merge_duplicates = False
self.is_album = True
self.search_ids = [] # user-supplied candidate IDs.
def set_choice(
self, choice: Action | autotag.AlbumMatch | autotag.TrackMatch
@ -356,20 +357,17 @@ class ImportTask(BaseImportTask):
tasks = [t for inner in tasks for t in inner]
return tasks
def lookup_candidates(self):
"""Retrieve and store candidates for this album. User-specified
candidate IDs are stored in self.search_ids: if present, the
initial lookup is restricted to only those IDs.
"""
artist, album, prop = autotag.tag_album(
self.items, search_ids=self.search_ids
)
self.cur_artist = artist
self.cur_album = album
self.candidates = prop.candidates
self.rec = prop.recommendation
def lookup_candidates(self, search_ids: list[str]) -> None:
"""Retrieve and store candidates for this album.
def find_duplicates(self, lib: library.Library):
If User-specified ``search_ids`` list is not empty, the lookup is
restricted to only those IDs.
"""
self.cur_artist, self.cur_album, (self.candidates, self.rec) = (
autotag.tag_album(self.items, search_ids=search_ids)
)
def find_duplicates(self, lib: library.Library) -> list[library.Album]:
"""Return a list of albums from `lib` with the same artist and
album name as the task.
"""
@ -695,12 +693,12 @@ class SingletonImportTask(ImportTask):
for item in self.imported_items():
plugins.send("item_imported", lib=lib, item=item)
def lookup_candidates(self):
prop = autotag.tag_item(self.item, search_ids=self.search_ids)
self.candidates = prop.candidates
self.rec = prop.recommendation
def lookup_candidates(self, search_ids: list[str]) -> None:
self.candidates, self.rec = autotag.tag_item(
self.item, search_ids=search_ids
)
def find_duplicates(self, lib):
def find_duplicates(self, lib: library.Library) -> list[library.Item]: # type: ignore[override] # Need splitting Singleton and Album tasks into separate classes
"""Return a list of items from `lib` that have the same artist
and title as the task.
"""
@ -802,6 +800,11 @@ class SentinelImportTask(ImportTask):
pass
ArchiveHandler = tuple[
Callable[[util.StrPath], bool], Callable[[util.StrPath], Any]
]
class ArchiveImportTask(SentinelImportTask):
"""An import task that represents the processing of an archive.
@ -827,13 +830,13 @@ class ArchiveImportTask(SentinelImportTask):
if not os.path.isfile(path):
return False
for path_test, _ in cls.handlers():
for path_test, _ in cls.handlers:
if path_test(os.fsdecode(path)):
return True
return False
@classmethod
def handlers(cls):
@util.cached_classproperty
def handlers(cls) -> list[ArchiveHandler]:
"""Returns a list of archive handlers.
Each handler is a `(path_test, ArchiveClass)` tuple. `path_test`
@ -841,28 +844,27 @@ class ArchiveImportTask(SentinelImportTask):
handled by `ArchiveClass`. `ArchiveClass` is a class that
implements the same interface as `tarfile.TarFile`.
"""
if not hasattr(cls, "_handlers"):
cls._handlers: list[tuple[Callable, ...]] = []
from zipfile import ZipFile, is_zipfile
_handlers: list[ArchiveHandler] = []
from zipfile import ZipFile, is_zipfile
cls._handlers.append((is_zipfile, ZipFile))
import tarfile
_handlers.append((is_zipfile, ZipFile))
import tarfile
cls._handlers.append((tarfile.is_tarfile, tarfile.open))
try:
from rarfile import RarFile, is_rarfile
except ImportError:
pass
else:
cls._handlers.append((is_rarfile, RarFile))
try:
from py7zr import SevenZipFile, is_7zfile
except ImportError:
pass
else:
cls._handlers.append((is_7zfile, SevenZipFile))
_handlers.append((tarfile.is_tarfile, tarfile.open))
try:
from rarfile import RarFile, is_rarfile
except ImportError:
pass
else:
_handlers.append((is_rarfile, RarFile))
try:
from py7zr import SevenZipFile, is_7zfile
except ImportError:
pass
else:
_handlers.append((is_7zfile, SevenZipFile))
return cls._handlers
return _handlers
def cleanup(self, copy=False, delete=False, move=False):
"""Removes the temporary directory the archive was extracted to."""
@ -879,7 +881,7 @@ class ArchiveImportTask(SentinelImportTask):
"""
assert self.toppath is not None, "toppath must be set"
for path_test, handler_class in self.handlers():
for path_test, handler_class in self.handlers:
if path_test(os.fsdecode(self.toppath)):
break
else:
@ -925,7 +927,7 @@ class ImportTaskFactory:
self.imported = 0 # "Real" tasks created.
self.is_archive = ArchiveImportTask.is_archive(util.syspath(toppath))
def tasks(self):
def tasks(self) -> Iterable[ImportTask]:
"""Yield all import tasks for music found in the user-specified
path `self.toppath`. Any necessary sentinel tasks are also
produced.
@ -1114,7 +1116,10 @@ def albums_in_dir(path: util.PathBytes):
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
collapse_pat = collapse_paths = collapse_items = None
collapse_paths: list[util.PathBytes] = []
collapse_items: list[util.PathBytes] = []
collapse_pat = None
ignore: list[str] = config["ignore"].as_str_seq()
ignore_hidden: bool = config["ignore_hidden"].get(bool)
@ -1139,7 +1144,7 @@ def albums_in_dir(path: util.PathBytes):
# proceed to process the current one.
if collapse_items:
yield collapse_paths, collapse_items
collapse_pat = collapse_paths = collapse_items = None
collapse_pat, collapse_paths, collapse_items = None, [], []
# Check whether this directory looks like the *first* directory
# in a multi-disc sequence. There are two indicators: the file

View file

@ -41,6 +41,18 @@ class LibModel(dbcore.Model["Library"]):
_format_config_key: str
path: bytes
@cached_classproperty
def _types(cls) -> dict[str, types.Type]:
"""Return the types of the fields in this model."""
return {
**plugins.types(cls), # type: ignore[arg-type]
"data_source": types.STRING,
}
@cached_classproperty
def _queries(cls) -> dict[str, FieldQueryType]:
return plugins.named_queries(cls) # type: ignore[arg-type]
@cached_classproperty
def writable_media_fields(cls) -> set[str]:
return set(MediaFile.fields()) & cls._fields.keys()
@ -265,10 +277,9 @@ class Album(LibModel):
_search_fields = ("album", "albumartist", "genre")
_types = {
"path": types.PathType(),
"data_source": types.STRING,
}
@cached_classproperty
def _types(cls) -> dict[str, types.Type]:
return {**super()._types, "path": types.PathType()}
_sorts = {
"albumartist": dbcore.query.SmartArtistSort,
@ -715,10 +726,6 @@ class Item(LibModel):
"genre",
)
_types = {
"data_source": types.STRING,
}
# Set of item fields that are backed by `MediaFile` fields.
# Any kind of field (fixed, flexible, and computed) may be a media
# field. Only these fields are read from disk in `read` and written in
@ -737,7 +744,9 @@ class Item(LibModel):
_sorts = {"artist": dbcore.query.SmartArtistSort}
_queries = {"singleton": dbcore.query.SingletonQuery}
@cached_classproperty
def _queries(cls) -> dict[str, FieldQueryType]:
return {**super()._queries, "singleton": dbcore.query.SingletonQuery}
_format_config_key = "format_item"

397
beets/metadata_plugins.py Normal file
View file

@ -0,0 +1,397 @@
"""Metadata source plugin interface.
This allows beets to lookup metadata from various sources. We define
a common interface for all metadata sources which need to be
implemented as plugins.
"""
from __future__ import annotations
import abc
import inspect
import re
import sys
import warnings
from typing import TYPE_CHECKING, Generic, Literal, Sequence, TypedDict, TypeVar
from beets.util import cached_classproperty
from beets.util.id_extractors import extract_release_id
from .plugins import BeetsPlugin, find_plugins, notify_info_yielded, send
if sys.version_info >= (3, 11):
from typing import NotRequired
else:
from typing_extensions import NotRequired
if TYPE_CHECKING:
from collections.abc import Iterable
from confuse import ConfigView
from .autotag import Distance
from .autotag.hooks import AlbumInfo, Item, TrackInfo
def find_metadata_source_plugins() -> list[MetadataSourcePlugin]:
"""Returns a list of MetadataSourcePlugin subclass instances
Resolved from all currently loaded beets plugins.
"""
all_plugins = find_plugins()
metadata_plugins: list[MetadataSourcePlugin | BeetsPlugin] = []
for plugin in all_plugins:
if isinstance(plugin, MetadataSourcePlugin):
metadata_plugins.append(plugin)
elif hasattr(plugin, "data_source"):
# TODO: Remove this in the future major release, v3.0.0
warnings.warn(
f"{plugin.__class__.__name__} is used as a legacy metadata source. "
"It should extend MetadataSourcePlugin instead of BeetsPlugin. "
"Support for this will be removed in the v3.0.0 release!",
DeprecationWarning,
stacklevel=2,
)
metadata_plugins.append(plugin)
# typeignore: BeetsPlugin is not a MetadataSourcePlugin (legacy support)
return metadata_plugins # type: ignore[return-value]
@notify_info_yielded("albuminfo_received")
def candidates(*args, **kwargs) -> Iterable[AlbumInfo]:
"""Return matching album candidates from all metadata source plugins."""
for plugin in find_metadata_source_plugins():
yield from plugin.candidates(*args, **kwargs)
@notify_info_yielded("trackinfo_received")
def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]:
"""Return matching track candidates fromm all metadata source plugins."""
for plugin in find_metadata_source_plugins():
yield from plugin.item_candidates(*args, **kwargs)
def album_for_id(_id: str) -> AlbumInfo | None:
"""Get AlbumInfo object for the given ID string.
A single ID can yield just a single album, so we return the first match.
"""
for plugin in find_metadata_source_plugins():
if info := plugin.album_for_id(album_id=_id):
send("albuminfo_received", info=info)
return info
return None
def track_for_id(_id: str) -> TrackInfo | None:
"""Get TrackInfo object for the given ID string.
A single ID can yield just a single track, so we return the first match.
"""
for plugin in find_metadata_source_plugins():
if info := plugin.track_for_id(_id):
send("trackinfo_received", info=info)
return info
return None
def track_distance(item: Item, info: TrackInfo) -> Distance:
"""Returns the track distance for an item and trackinfo.
Returns a Distance object is populated by all metadata source plugins
that implement the :py:meth:`MetadataSourcePlugin.track_distance` method.
"""
from beets.autotag.distance import Distance
dist = Distance()
for plugin in find_metadata_source_plugins():
dist.update(plugin.track_distance(item, info))
return dist
def album_distance(
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
"""Returns the album distance calculated by plugins."""
from beets.autotag.distance import Distance
dist = Distance()
for plugin in find_metadata_source_plugins():
dist.update(plugin.album_distance(items, album_info, mapping))
return dist
def _get_distance(
config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo
) -> Distance:
"""Returns the ``data_source`` weight and the maximum source weight
for albums or individual tracks.
"""
from beets.autotag.distance import Distance
dist = Distance()
if info.data_source == data_source:
dist.add("source", config["source_weight"].as_number())
return dist
class MetadataSourcePlugin(BeetsPlugin, metaclass=abc.ABCMeta):
"""A plugin that provides metadata from a specific source.
This base class implements a contract for plugins that provide metadata
from a specific source. The plugin must implement the methods to search for albums
and tracks, and to retrieve album and track information by ID.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.config.add({"source_weight": 0.5})
@abc.abstractmethod
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Return :py:class:`AlbumInfo` object or None if no matching release was
found."""
raise NotImplementedError
@abc.abstractmethod
def track_for_id(self, track_id: str) -> TrackInfo | None:
"""Return a :py:class:`TrackInfo` object or None if no matching release was
found.
"""
raise NotImplementedError
# ---------------------------------- search ---------------------------------- #
@abc.abstractmethod
def candidates(
self,
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
) -> Iterable[AlbumInfo]:
"""Return :py:class:`AlbumInfo` candidates that match the given album.
Used in the autotag functionality to search for albums.
:param items: List of items in the album
:param artist: Album artist
:param album: Album name
:param va_likely: Whether the album is likely to be by various artists
"""
raise NotImplementedError
@abc.abstractmethod
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
"""Return :py:class:`TrackInfo` candidates that match the given track.
Used in the autotag functionality to search for tracks.
:param item: Track item
:param artist: Track artist
:param title: Track title
"""
raise NotImplementedError
def albums_for_ids(self, ids: Sequence[str]) -> Iterable[AlbumInfo | None]:
"""Batch lookup of album metadata for a list of album IDs.
Given a list of album identifiers, yields corresponding AlbumInfo objects.
Missing albums result in None values in the output iterator.
Plugins may implement this for optimized batched lookups instead of
single calls to album_for_id.
"""
return (self.album_for_id(id) for id in ids)
def tracks_for_ids(self, ids: Sequence[str]) -> Iterable[TrackInfo | None]:
"""Batch lookup of track metadata for a list of track IDs.
Given a list of track identifiers, yields corresponding TrackInfo objects.
Missing tracks result in None values in the output iterator.
Plugins may implement this for optimized batched lookups instead of
single calls to track_for_id.
"""
return (self.track_for_id(id) for id in ids)
def album_distance(
self,
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
"""Calculate the distance for an album based on its items and album info."""
return _get_distance(
data_source=self.data_source, info=album_info, config=self.config
)
def track_distance(
self,
item: Item,
info: TrackInfo,
) -> Distance:
"""Calculate the distance for a track based on its item and track info."""
return _get_distance(
data_source=self.data_source, info=info, config=self.config
)
@cached_classproperty
def data_source(cls) -> str:
"""The data source name for this plugin.
This is inferred from the plugin name.
"""
return cls.__name__.replace("Plugin", "") # type: ignore[attr-defined]
def _extract_id(self, url: str) -> str | None:
"""Extract an ID from a URL for this metadata source plugin.
Uses the plugin's data source name to determine the ID format and
extracts the ID from a given URL.
"""
return extract_release_id(self.data_source, url)
@staticmethod
def get_artist(
artists: Iterable[dict[str | int, str]],
id_key: str | int = "id",
name_key: str | int = "name",
join_key: str | int | None = None,
) -> tuple[str, str | None]:
"""Returns an artist string (all artists) and an artist_id (the main
artist) for a list of artist object dicts.
For each artist, this function moves articles (such as 'a', 'an',
and 'the') to the front and strips trailing disambiguation numbers. It
returns a tuple containing the comma-separated string of all
normalized artists and the ``id`` of the main/first artist.
Alternatively a keyword can be used to combine artists together into a
single string by passing the join_key argument.
:param artists: Iterable of artist dicts or lists returned by API.
:param id_key: Key or index corresponding to the value of ``id`` for
the main/first artist. Defaults to 'id'.
:param name_key: Key or index corresponding to values of names
to concatenate for the artist string (containing all artists).
Defaults to 'name'.
:param join_key: Key or index corresponding to a field containing a
keyword to use for combining artists into a single string, for
example "Feat.", "Vs.", "And" or similar. The default is None
which keeps the default behaviour (comma-separated).
:return: Normalized artist string.
"""
artist_id = None
artist_string = ""
artists = list(artists) # In case a generator was passed.
total = len(artists)
for idx, artist in enumerate(artists):
if not artist_id:
artist_id = artist[id_key]
name = artist[name_key]
# Strip disambiguation number.
name = re.sub(r" \(\d+\)$", "", name)
# Move articles to the front.
name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I)
# Use a join keyword if requested and available.
if idx < (total - 1): # Skip joining on last.
if join_key and artist.get(join_key, None):
name += f" {artist[join_key]} "
else:
name += ", "
artist_string += name
return artist_string, artist_id
class IDResponse(TypedDict):
"""Response from the API containing an ID."""
id: str
class SearchFilter(TypedDict):
artist: NotRequired[str]
album: NotRequired[str]
R = TypeVar("R", bound=IDResponse)
class SearchApiMetadataSourcePlugin(
Generic[R], MetadataSourcePlugin, metaclass=abc.ABCMeta
):
"""Helper class to implement a metadata source plugin with an API.
Plugins using this ABC must implement an API search method to
retrieve album and track information by ID,
i.e. `album_for_id` and `track_for_id`, and a search method to
perform a search on the API. The search method should return a list
of identifiers for the requested type (album or track).
"""
@abc.abstractmethod
def _search_api(
self,
query_type: Literal["album", "track"],
filters: SearchFilter,
keywords: str = "",
) -> Sequence[R]:
"""Perform a search on the API.
:param query_type: The type of query to perform.
:param filters: A dictionary of filters to apply to the search.
:param keywords: Additional keywords to include in the search.
Should return a list of identifiers for the requested type (album or track).
"""
raise NotImplementedError
def candidates(
self,
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
) -> Iterable[AlbumInfo]:
query_filters: SearchFilter = {"album": album}
if not va_likely:
query_filters["artist"] = artist
results = self._search_api("album", query_filters)
if not results:
return []
return filter(
None, self.albums_for_ids([result["id"] for result in results])
)
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
results = self._search_api("track", {"artist": artist}, keywords=title)
if not results:
return []
return filter(
None,
self.tracks_for_ids([result["id"] for result in results if result]),
)
# Dynamically copy methods to BeetsPlugin for legacy support
# TODO: Remove this in the future major release, v3.0.0
for name, method in inspect.getmembers(
MetadataSourcePlugin, predicate=inspect.isfunction
):
if not hasattr(BeetsPlugin, name):
setattr(BeetsPlugin, name, method)

View file

@ -23,22 +23,13 @@ import sys
import traceback
from collections import defaultdict
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Literal,
Sequence,
TypedDict,
TypeVar,
)
from types import GenericAlias
from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar
import mediafile
import beets
from beets import logging
from beets.util.id_extractors import extract_release_id
if TYPE_CHECKING:
from beets.event_types import EventType
@ -54,8 +45,6 @@ if TYPE_CHECKING:
from confuse import ConfigView
from beets.autotag import AlbumInfo, TrackInfo
from beets.autotag.distance import Distance
from beets.dbcore import Query
from beets.dbcore.db import FieldQueryType
from beets.dbcore.types import Type
@ -115,7 +104,7 @@ class PluginLogFilter(logging.Filter):
# Managing the plugins themselves.
class BeetsPlugin:
class BeetsPlugin(metaclass=abc.ABCMeta):
"""The base class for all beets plugins. Plugins provide
functionality by defining a subclass of BeetsPlugin and overriding
the abstract methods defined here.
@ -218,66 +207,6 @@ class BeetsPlugin:
"""Return a dict mapping prefixes to Query subclasses."""
return {}
def track_distance(
self,
item: Item,
info: TrackInfo,
) -> Distance:
"""Should return a Distance object to be added to the
distance for every track comparison.
"""
from beets.autotag.distance import Distance
return Distance()
def album_distance(
self,
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
"""Should return a Distance object to be added to the
distance for every album-level comparison.
"""
from beets.autotag.distance import Distance
return Distance()
def candidates(
self, items: list[Item], artist: str, album: str, va_likely: bool
) -> Iterable[AlbumInfo]:
"""Return :py:class:`AlbumInfo` candidates that match the given album.
:param items: List of items in the album
:param artist: Album artist
:param album: Album name
:param va_likely: Whether the album is likely to be by various artists
"""
yield from ()
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
"""Return :py:class:`TrackInfo` candidates that match the given track.
:param item: Track item
:param artist: Track artist
:param title: Track title
"""
yield from ()
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Return an AlbumInfo object or None if no matching release was
found.
"""
return None
def track_for_id(self, track_id: str) -> TrackInfo | None:
"""Return a TrackInfo object or None if no matching release was
found.
"""
return None
def add_media_field(
self, name: str, descriptor: mediafile.MediaField
) -> None:
@ -369,10 +298,13 @@ def load_plugins(names: Sequence[str] = ()) -> None:
else:
for obj in getattr(namespace, name).__dict__.values():
if (
isinstance(obj, type)
inspect.isclass(obj)
and not isinstance(
obj, GenericAlias
) # seems to be needed for python <= 3.9 only
and issubclass(obj, BeetsPlugin)
and obj != BeetsPlugin
and obj != MetadataSourcePlugin
and not inspect.isabstract(obj)
and obj not in _classes
):
_classes.add(obj)
@ -430,7 +362,7 @@ def queries() -> dict[str, type[Query]]:
def types(model_cls: type[AnyModel]) -> dict[str, Type]:
# Gives us `item_types` and `album_types`
"""Return mapping between flex field names and types for the given model."""
attr_name = f"{model_cls.__name__.lower()}_types"
types: dict[str, Type] = {}
for plugin in find_plugins():
@ -447,39 +379,13 @@ def types(model_cls: type[AnyModel]) -> dict[str, Type]:
def named_queries(model_cls: type[AnyModel]) -> dict[str, FieldQueryType]:
# Gather `item_queries` and `album_queries` from the plugins.
"""Return mapping between field names and queries for the given model."""
attr_name = f"{model_cls.__name__.lower()}_queries"
queries: dict[str, FieldQueryType] = {}
for plugin in find_plugins():
plugin_queries = getattr(plugin, attr_name, {})
queries.update(plugin_queries)
return queries
def track_distance(item: Item, info: TrackInfo) -> Distance:
"""Gets the track distance calculated by all loaded plugins.
Returns a Distance object.
"""
from beets.autotag.distance import Distance
dist = Distance()
for plugin in find_plugins():
dist.update(plugin.track_distance(item, info))
return dist
def album_distance(
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
"""Returns the album distance calculated by plugins."""
from beets.autotag.distance import Distance
dist = Distance()
for plugin in find_plugins():
dist.update(plugin.album_distance(items, album_info, mapping))
return dist
return {
field: query
for plugin in find_plugins()
for field, query in getattr(plugin, attr_name, {}).items()
}
def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
@ -502,46 +408,6 @@ def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
return decorator
@notify_info_yielded("albuminfo_received")
def candidates(*args, **kwargs) -> Iterable[AlbumInfo]:
"""Return matching album candidates from all plugins."""
for plugin in find_plugins():
yield from plugin.candidates(*args, **kwargs)
@notify_info_yielded("trackinfo_received")
def item_candidates(*args, **kwargs) -> Iterable[TrackInfo]:
"""Return matching track candidates from all plugins."""
for plugin in find_plugins():
yield from plugin.item_candidates(*args, **kwargs)
def album_for_id(_id: str) -> AlbumInfo | None:
"""Get AlbumInfo object for the given ID string.
A single ID can yield just a single album, so we return the first match.
"""
for plugin in find_plugins():
if info := plugin.album_for_id(_id):
send("albuminfo_received", info=info)
return info
return None
def track_for_id(_id: str) -> TrackInfo | None:
"""Get TrackInfo object for the given ID string.
A single ID can yield just a single track, so we return the first match.
"""
for plugin in find_plugins():
if info := plugin.track_for_id(_id):
send("trackinfo_received", info=info)
return info
return None
def template_funcs() -> TFuncMap[str]:
"""Get all the template functions declared by plugins as a
dictionary.
@ -656,20 +522,6 @@ def feat_tokens(for_artist: bool = True) -> str:
)
def get_distance(
config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo
) -> Distance:
"""Returns the ``data_source`` weight and the maximum source weight
for albums or individual tracks.
"""
from beets.autotag.distance import Distance
dist = Distance()
if info.data_source == data_source:
dist.add("source", config["source_weight"].as_number())
return dist
def apply_item_changes(
lib: Library, item: Item, move: bool, pretend: bool, write: bool
) -> None:
@ -695,149 +547,3 @@ def apply_item_changes(
item.try_write()
item.store()
class Response(TypedDict):
"""A dictionary with the response of a plugin API call.
May be extended by plugins to include additional information, but `id`
is required.
"""
id: str
R = TypeVar("R", bound=Response)
class MetadataSourcePlugin(Generic[R], BeetsPlugin, metaclass=abc.ABCMeta):
def __init__(self):
super().__init__()
self.config.add({"source_weight": 0.5})
@property
@abc.abstractmethod
def data_source(self) -> str:
raise NotImplementedError
@property
@abc.abstractmethod
def search_url(self) -> str:
raise NotImplementedError
@property
@abc.abstractmethod
def album_url(self) -> str:
raise NotImplementedError
@property
@abc.abstractmethod
def track_url(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def _search_api(
self,
query_type: Literal["album", "track"],
filters: dict[str, str],
keywords: str = "",
) -> Sequence[R]:
raise NotImplementedError
@abc.abstractmethod
def album_for_id(self, album_id: str) -> AlbumInfo | None:
raise NotImplementedError
@abc.abstractmethod
def track_for_id(self, track_id: str) -> TrackInfo | None:
raise NotImplementedError
@staticmethod
def get_artist(
artists,
id_key: str | int = "id",
name_key: str | int = "name",
join_key: str | int | None = None,
) -> tuple[str, str | None]:
"""Returns an artist string (all artists) and an artist_id (the main
artist) for a list of artist object dicts.
For each artist, this function moves articles (such as 'a', 'an',
and 'the') to the front and strips trailing disambiguation numbers. It
returns a tuple containing the comma-separated string of all
normalized artists and the ``id`` of the main/first artist.
Alternatively a keyword can be used to combine artists together into a
single string by passing the join_key argument.
:param artists: Iterable of artist dicts or lists returned by API.
:type artists: list[dict] or list[list]
:param id_key: Key or index corresponding to the value of ``id`` for
the main/first artist. Defaults to 'id'.
:param name_key: Key or index corresponding to values of names
to concatenate for the artist string (containing all artists).
Defaults to 'name'.
:param join_key: Key or index corresponding to a field containing a
keyword to use for combining artists into a single string, for
example "Feat.", "Vs.", "And" or similar. The default is None
which keeps the default behaviour (comma-separated).
:return: Normalized artist string.
"""
artist_id = None
artist_string = ""
artists = list(artists) # In case a generator was passed.
total = len(artists)
for idx, artist in enumerate(artists):
if not artist_id:
artist_id = artist[id_key]
name = artist[name_key]
# Strip disambiguation number.
name = re.sub(r" \(\d+\)$", "", name)
# Move articles to the front.
name = re.sub(r"^(.*?), (a|an|the)$", r"\2 \1", name, flags=re.I)
# Use a join keyword if requested and available.
if idx < (total - 1): # Skip joining on last.
if join_key and artist.get(join_key, None):
name += f" {artist[join_key]} "
else:
name += ", "
artist_string += name
return artist_string, artist_id
def _get_id(self, id_string: str) -> str | None:
"""Parse release ID from the given ID string."""
return extract_release_id(self.data_source.lower(), id_string)
def candidates(
self, items: list[Item], artist: str, album: str, va_likely: bool
) -> Iterable[AlbumInfo]:
query_filters = {"album": album}
if not va_likely:
query_filters["artist"] = artist
for result in self._search_api("album", query_filters):
if info := self.album_for_id(result["id"]):
yield info
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
for result in self._search_api(
"track", {"artist": artist}, keywords=title
):
if info := self.track_for_id(result["id"]):
yield info
def album_distance(
self,
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
return get_distance(
data_source=self.data_source, info=album_info, config=self.config
)
def track_distance(self, item: Item, info: TrackInfo) -> Distance:
return get_distance(
data_source=self.data_source, info=info, config=self.config
)

View file

@ -52,12 +52,13 @@ import beets.plugins
from beets import importer, logging, util
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.importer import ImportSession
from beets.library import Album, Item, Library
from beets.library import Item, Library
from beets.test import _common
from beets.ui.commands import TerminalImportSession
from beets.util import (
MoveOperation,
bytestring_path,
cached_classproperty,
clean_module_tempdir,
syspath,
)
@ -471,11 +472,6 @@ class PluginMixin(ConfigMixin):
plugin: ClassVar[str]
preload_plugin: ClassVar[bool] = True
original_item_types = dict(Item._types)
original_album_types = dict(Album._types)
original_item_queries = dict(Item._queries)
original_album_queries = dict(Album._queries)
def setup_beets(self):
super().setup_beets()
if self.preload_plugin:
@ -494,16 +490,11 @@ class PluginMixin(ConfigMixin):
# FIXME this should eventually be handled by a plugin manager
plugins = (self.plugin,) if hasattr(self, "plugin") else plugins
self.config["plugins"] = plugins
cached_classproperty.cache.clear()
beets.plugins.load_plugins(plugins)
beets.plugins.send("pluginload")
beets.plugins.find_plugins()
# Take a backup of the original _types and _queries to restore
# when unloading.
Item._types.update(beets.plugins.types(Item))
Album._types.update(beets.plugins.types(Album))
Item._queries.update(beets.plugins.named_queries(Item))
Album._queries.update(beets.plugins.named_queries(Album))
def unload_plugins(self) -> None:
"""Unload all plugins and remove them from the configuration."""
# FIXME this should eventually be handled by a plugin manager
@ -512,10 +503,6 @@ class PluginMixin(ConfigMixin):
self.config["plugins"] = []
beets.plugins._classes = set()
beets.plugins._instances = {}
Item._types = self.original_item_types
Album._types = self.original_album_types
Item._queries = self.original_item_queries
Album._queries = self.original_album_queries
@contextmanager
def configure_plugin(self, config: Any):
@ -799,10 +786,12 @@ class AutotagStub:
def install(self):
self.patchers = [
patch("beets.plugins.album_for_id", lambda *_: None),
patch("beets.plugins.track_for_id", lambda *_: None),
patch("beets.plugins.candidates", self.candidates),
patch("beets.plugins.item_candidates", self.item_candidates),
patch("beets.metadata_plugins.album_for_id", lambda *_: None),
patch("beets.metadata_plugins.track_for_id", lambda *_: None),
patch("beets.metadata_plugins.candidates", self.candidates),
patch(
"beets.metadata_plugins.item_candidates", self.item_candidates
),
]
for p in self.patchers:
p.start()

View file

@ -1609,17 +1609,6 @@ def _setup(options, lib=None):
plugins = _load_plugins(options, config)
# Add types and queries defined by plugins.
plugin_types_album = plugins.types(library.Album)
library.Album._types.update(plugin_types_album)
item_types = plugin_types_album.copy()
item_types.update(library.Item._types)
item_types.update(plugins.types(library.Item))
library.Item._types = item_types
library.Item._queries.update(plugins.named_queries(library.Item))
library.Album._queries.update(plugins.named_queries(library.Album))
plugins.send("pluginload")
# Get the default subcommands.

View file

@ -1343,7 +1343,7 @@ def import_func(lib, opts, args: list[str]):
if opts.library:
query = args
paths = []
byte_paths = []
else:
query = None
paths = args

View file

@ -41,6 +41,7 @@ from typing import (
Any,
AnyStr,
Callable,
ClassVar,
Generic,
NamedTuple,
TypeVar,
@ -63,6 +64,7 @@ MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T")
PathLike = Union[str, bytes, Path]
StrPath = Union[str, Path]
Replacements = Sequence[tuple[Pattern[str], str]]
# Here for now to allow for a easy replace later on
@ -1051,20 +1053,46 @@ def par_map(transform: Callable[[T], Any], items: Sequence[T]) -> None:
class cached_classproperty:
"""A decorator implementing a read-only property that is *lazy* in
the sense that the getter is only invoked once. Subsequent accesses
through *any* instance use the cached result.
"""Descriptor implementing cached class properties.
Provides class-level dynamic property behavior where the getter function is
called once per class and the result is cached for subsequent access. Unlike
instance properties, this operates on the class rather than instances.
"""
def __init__(self, getter):
cache: ClassVar[dict[tuple[Any, str], Any]] = {}
name: str
# Ideally, we would like to use `Callable[[type[T]], Any]` here,
# however, `mypy` is unable to see this as a **class** property, and thinks
# that this callable receives an **instance** of the object, failing the
# type check, for example:
# >>> class Album:
# >>> @cached_classproperty
# >>> def foo(cls):
# >>> reveal_type(cls) # mypy: revealed type is "Album"
# >>> return cls.bar
#
# Argument 1 to "cached_classproperty" has incompatible type
# "Callable[[Album], ...]"; expected "Callable[[type[Album]], ...]"
#
# Therefore, we just use `Any` here, which is not ideal, but works.
def __init__(self, getter: Callable[[Any], Any]) -> None:
"""Initialize the descriptor with the property getter function."""
self.getter = getter
self.cache = {}
def __get__(self, instance, owner):
if owner not in self.cache:
self.cache[owner] = self.getter(owner)
def __set_name__(self, owner: Any, name: str) -> None:
"""Capture the attribute name this descriptor is assigned to."""
self.name = name
return self.cache[owner]
def __get__(self, instance: Any, owner: type[Any]) -> Any:
"""Compute and cache if needed, and return the property value."""
key = owner, self.name
if key not in self.cache:
self.cache[key] = self.getter(owner)
return self.cache[key]
class LazySharedInstance(Generic[T]):

View file

@ -18,6 +18,11 @@ from __future__ import annotations
import re
from beets import logging
log = logging.getLogger("beets")
PATTERN_BY_SOURCE = {
"spotify": re.compile(r"(?:^|open\.spotify\.com/[^/]+/)([0-9A-Za-z]{22})"),
"deezer": re.compile(r"(?:^|deezer\.com/)(?:[a-z]*/)?(?:[^/]+/)?(\d+)"),
@ -43,6 +48,21 @@ PATTERN_BY_SOURCE = {
def extract_release_id(source: str, id_: str) -> str | None:
if m := PATTERN_BY_SOURCE[source].search(str(id_)):
"""Extract the release ID from a given source and ID.
Normally, the `id_` is a url string which contains the ID of the
release. This function extracts the ID from the URL based on the
`source` provided.
"""
try:
source_pattern = PATTERN_BY_SOURCE[source.lower()]
except KeyError:
log.debug(
f"Unknown source '{source}' for ID extraction. Returning id/url as-is."
)
return id_
if m := source_pattern.search(str(id_)):
return m[1]
return None

View file

@ -48,6 +48,8 @@ POISON = "__PIPELINE_POISON__"
DEFAULT_QUEUE_SIZE = 16
Tq = TypeVar("Tq")
def _invalidate_queue(q, val=None, sync=True):
"""Breaks a Queue such that it never blocks, always has size 1,
@ -91,7 +93,7 @@ def _invalidate_queue(q, val=None, sync=True):
q.mutex.release()
class CountedQueue(queue.Queue):
class CountedQueue(queue.Queue[Tq]):
"""A queue that keeps track of the number of threads that are
still feeding into it. The queue is poisoned when all threads are
finished with the queue.

View file

@ -58,7 +58,9 @@ class AdvancedRewritePlugin(BeetsPlugin):
def __init__(self):
"""Parse configuration and register template fields for rewriting."""
super().__init__()
self.register_listener("pluginload", self.loaded)
def loaded(self):
template = confuse.Sequence(
confuse.OneOf(
[

View file

@ -15,10 +15,10 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import TYPE_CHECKING
import librosa
import numpy as np
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, should_write
@ -76,7 +76,10 @@ class AutoBPMPlugin(BeetsPlugin):
self._log.error("Failed to measure BPM for {}: {}", path, exc)
continue
bpm = round(tempo[0] if isinstance(tempo, Iterable) else tempo)
bpm = round(
float(tempo[0] if isinstance(tempo, np.ndarray) else tempo)
)
item["bpm"] = bpm
self._log.info("Computed BPM for {}: {}", path, bpm)

View file

@ -14,9 +14,19 @@
"""Adds Beatport release and track search support to the autotagger"""
from __future__ import annotations
import json
import re
from datetime import datetime, timedelta
from typing import (
TYPE_CHECKING,
Iterable,
Iterator,
Literal,
Sequence,
overload,
)
import confuse
from requests_oauthlib import OAuth1Session
@ -29,7 +39,13 @@ from requests_oauthlib.oauth1_session import (
import beets
import beets.ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance
from beets.metadata_plugins import MetadataSourcePlugin
if TYPE_CHECKING:
from beets.importer import ImportSession
from beets.library import Item
from ._typing import JSONDict
AUTH_ERRORS = (TokenRequestDenied, TokenMissing, VerifierMissing)
USER_AGENT = f"beets/{beets.__version__} +https://beets.io/"
@ -39,20 +55,6 @@ class BeatportAPIError(Exception):
pass
class BeatportObject:
def __init__(self, data):
self.beatport_id = data["id"]
self.name = str(data["name"])
if "releaseDate" in data:
self.release_date = datetime.strptime(
data["releaseDate"], "%Y-%m-%d"
)
if "artists" in data:
self.artists = [(x["id"], str(x["name"])) for x in data["artists"]]
if "genres" in data:
self.genres = [str(x["name"]) for x in data["genres"]]
class BeatportClient:
_api_base = "https://oauth-api.beatport.com"
@ -77,7 +79,7 @@ class BeatportClient:
)
self.api.headers = {"User-Agent": USER_AGENT}
def get_authorize_url(self):
def get_authorize_url(self) -> str:
"""Generate the URL for the user to authorize the application.
Retrieves a request token from the Beatport API and returns the
@ -99,15 +101,13 @@ class BeatportClient:
self._make_url("/identity/1/oauth/authorize")
)
def get_access_token(self, auth_data):
def get_access_token(self, auth_data: str) -> tuple[str, str]:
"""Obtain the final access token and secret for the API.
:param auth_data: URL-encoded authorization data as displayed at
the authorization url (obtained via
:py:meth:`get_authorize_url`) after signing in
:type auth_data: unicode
:returns: OAuth resource owner key and secret
:rtype: (unicode, unicode) tuple
:returns: OAuth resource owner key and secret as unicode
"""
self.api.parse_authorization_response(
"https://beets.io/auth?" + auth_data
@ -117,20 +117,37 @@ class BeatportClient:
)
return access_data["oauth_token"], access_data["oauth_token_secret"]
def search(self, query, release_type="release", details=True):
@overload
def search(
self,
query: str,
release_type: Literal["release"],
details: bool = True,
) -> Iterator[BeatportRelease]: ...
@overload
def search(
self,
query: str,
release_type: Literal["track"],
details: bool = True,
) -> Iterator[BeatportTrack]: ...
def search(
self,
query: str,
release_type: Literal["release", "track"],
details=True,
) -> Iterator[BeatportRelease | BeatportTrack]:
"""Perform a search of the Beatport catalogue.
:param query: Query string
:param release_type: Type of releases to search for, can be
'release' or 'track'
:param release_type: Type of releases to search for.
:param details: Retrieve additional information about the
search results. Currently this will fetch
the tracklist for releases and do nothing for
tracks
:returns: Search results
:rtype: generator that yields
py:class:`BeatportRelease` or
:py:class:`BeatportTrack`
"""
response = self._get(
"catalog/3/search",
@ -140,20 +157,18 @@ class BeatportClient:
)
for item in response:
if release_type == "release":
release = BeatportRelease(item)
if details:
release = self.get_release(item["id"])
else:
release = BeatportRelease(item)
release.tracks = self.get_release_tracks(item["id"])
yield release
elif release_type == "track":
yield BeatportTrack(item)
def get_release(self, beatport_id):
def get_release(self, beatport_id: str) -> BeatportRelease | None:
"""Get information about a single release.
:param beatport_id: Beatport ID of the release
:returns: The matching release
:rtype: :py:class:`BeatportRelease`
"""
response = self._get("/catalog/3/releases", id=beatport_id)
if response:
@ -162,35 +177,33 @@ class BeatportClient:
return release
return None
def get_release_tracks(self, beatport_id):
def get_release_tracks(self, beatport_id: str) -> list[BeatportTrack]:
"""Get all tracks for a given release.
:param beatport_id: Beatport ID of the release
:returns: Tracks in the matching release
:rtype: list of :py:class:`BeatportTrack`
"""
response = self._get(
"/catalog/3/tracks", releaseId=beatport_id, perPage=100
)
return [BeatportTrack(t) for t in response]
def get_track(self, beatport_id):
def get_track(self, beatport_id: str) -> BeatportTrack:
"""Get information about a single track.
:param beatport_id: Beatport ID of the track
:returns: The matching track
:rtype: :py:class:`BeatportTrack`
"""
response = self._get("/catalog/3/tracks", id=beatport_id)
return BeatportTrack(response[0])
def _make_url(self, endpoint):
def _make_url(self, endpoint: str) -> str:
"""Get complete URL for a given API endpoint."""
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return self._api_base + endpoint
def _get(self, endpoint, **kwargs):
def _get(self, endpoint: str, **kwargs) -> list[JSONDict]:
"""Perform a GET request on a given API endpoint.
Automatically extracts result data from the response and converts HTTP
@ -211,48 +224,81 @@ class BeatportClient:
return response.json()["results"]
class BeatportRelease(BeatportObject):
def __str__(self):
if len(self.artists) < 4:
artist_str = ", ".join(x[1] for x in self.artists)
class BeatportObject:
beatport_id: str
name: str
release_date: datetime | None = None
artists: list[tuple[str, str]] | None = None
# tuple of artist id and artist name
def __init__(self, data: JSONDict):
self.beatport_id = str(data["id"]) # given as int in the response
self.name = str(data["name"])
if "releaseDate" in data:
self.release_date = datetime.strptime(
data["releaseDate"], "%Y-%m-%d"
)
if "artists" in data:
self.artists = [(x["id"], str(x["name"])) for x in data["artists"]]
if "genres" in data:
self.genres = [str(x["name"]) for x in data["genres"]]
def artists_str(self) -> str | None:
if self.artists is not None:
if len(self.artists) < 4:
artist_str = ", ".join(x[1] for x in self.artists)
else:
artist_str = "Various Artists"
else:
artist_str = "Various Artists"
return "<BeatportRelease: {} - {} ({})>".format(
artist_str,
self.name,
self.catalog_number,
)
artist_str = None
def __repr__(self):
return str(self).encode("utf-8")
return artist_str
class BeatportRelease(BeatportObject):
catalog_number: str | None
label_name: str | None
category: str | None
url: str | None
genre: str | None
tracks: list[BeatportTrack] | None = None
def __init__(self, data: JSONDict):
super().__init__(data)
self.catalog_number = data.get("catalogNumber")
self.label_name = data.get("label", {}).get("name")
self.category = data.get("category")
self.genre = data.get("genre")
def __init__(self, data):
BeatportObject.__init__(self, data)
if "catalogNumber" in data:
self.catalog_number = data["catalogNumber"]
if "label" in data:
self.label_name = data["label"]["name"]
if "category" in data:
self.category = data["category"]
if "slug" in data:
self.url = "https://beatport.com/release/{}/{}".format(
data["slug"], data["id"]
)
self.genre = data.get("genre")
def __str__(self) -> str:
return "<BeatportRelease: {} - {} ({})>".format(
self.artists_str(),
self.name,
self.catalog_number,
)
class BeatportTrack(BeatportObject):
def __str__(self):
artist_str = ", ".join(x[1] for x in self.artists)
return "<BeatportTrack: {} - {} ({})>".format(
artist_str, self.name, self.mix_name
)
title: str | None
mix_name: str | None
length: timedelta
url: str | None
track_number: int | None
bpm: str | None
initial_key: str | None
genre: str | None
def __repr__(self):
return str(self).encode("utf-8")
def __init__(self, data):
BeatportObject.__init__(self, data)
def __init__(self, data: JSONDict):
super().__init__(data)
if "title" in data:
self.title = str(data["title"])
if "mixName" in data:
@ -279,8 +325,8 @@ class BeatportTrack(BeatportObject):
self.genre = str(data["genres"][0].get("name"))
class BeatportPlugin(BeetsPlugin):
data_source = "Beatport"
class BeatportPlugin(MetadataSourcePlugin):
_client: BeatportClient | None = None
def __init__(self):
super().__init__()
@ -294,12 +340,19 @@ class BeatportPlugin(BeetsPlugin):
)
self.config["apikey"].redact = True
self.config["apisecret"].redact = True
self.client = None
self.register_listener("import_begin", self.setup)
def setup(self, session=None):
c_key = self.config["apikey"].as_str()
c_secret = self.config["apisecret"].as_str()
@property
def client(self) -> BeatportClient:
if self._client is None:
raise ValueError(
"Beatport client not initialized. Call setup() first."
)
return self._client
def setup(self, session: ImportSession):
c_key: str = self.config["apikey"].as_str()
c_secret: str = self.config["apisecret"].as_str()
# Get the OAuth token from a file or log in.
try:
@ -312,9 +365,9 @@ class BeatportPlugin(BeetsPlugin):
token = tokendata["token"]
secret = tokendata["secret"]
self.client = BeatportClient(c_key, c_secret, token, secret)
self._client = BeatportClient(c_key, c_secret, token, secret)
def authenticate(self, c_key, c_secret):
def authenticate(self, c_key: str, c_secret: str) -> tuple[str, str]:
# Get the link for the OAuth page.
auth_client = BeatportClient(c_key, c_secret)
try:
@ -341,44 +394,30 @@ class BeatportPlugin(BeetsPlugin):
return token, secret
def _tokenfile(self):
def _tokenfile(self) -> str:
"""Get the path to the JSON file for storing the OAuth token."""
return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True))
def album_distance(self, items, album_info, mapping):
"""Returns the Beatport source weight and the maximum source weight
for albums.
"""
return get_distance(
data_source=self.data_source, info=album_info, config=self.config
)
def track_distance(self, item, track_info):
"""Returns the Beatport source weight and the maximum source weight
for individual tracks.
"""
return get_distance(
data_source=self.data_source, info=track_info, config=self.config
)
def candidates(self, items, artist, release, va_likely):
"""Returns a list of AlbumInfo objects for beatport search results
matching release and artist (if not various).
"""
def candidates(
self,
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
) -> Iterator[AlbumInfo]:
if va_likely:
query = release
query = album
else:
query = f"{artist} {release}"
query = f"{artist} {album}"
try:
return self._get_releases(query)
yield from self._get_releases(query)
except BeatportAPIError as e:
self._log.debug("API Error: {0} (query: {1})", e, query)
return []
return
def item_candidates(self, item, artist, title):
"""Returns a list of TrackInfo objects for beatport search results
matching title and artist.
"""
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
query = f"{artist} {title}"
try:
return self._get_tracks(query)
@ -386,13 +425,13 @@ class BeatportPlugin(BeetsPlugin):
self._log.debug("API Error: {0} (query: {1})", e, query)
return []
def album_for_id(self, release_id):
def album_for_id(self, album_id: str):
"""Fetches a release by its Beatport ID and returns an AlbumInfo object
or None if the query is not a valid ID or release is not found.
"""
self._log.debug("Searching for release {0}", release_id)
self._log.debug("Searching for release {0}", album_id)
if not (release_id := self._get_id(release_id)):
if not (release_id := self._extract_id(album_id)):
self._log.debug("Not a valid Beatport release ID.")
return None
@ -401,11 +440,12 @@ class BeatportPlugin(BeetsPlugin):
return self._get_album_info(release)
return None
def track_for_id(self, track_id):
def track_for_id(self, track_id: str):
"""Fetches a track by its Beatport ID and returns a TrackInfo object
or None if the track is not a valid Beatport ID or track is not found.
"""
self._log.debug("Searching for track {0}", track_id)
# TODO: move to extractor
match = re.search(r"(^|beatport\.com/track/.+/)(\d+)$", track_id)
if not match:
self._log.debug("Not a valid Beatport track ID.")
@ -415,7 +455,7 @@ class BeatportPlugin(BeetsPlugin):
return self._get_track_info(bp_track)
return None
def _get_releases(self, query):
def _get_releases(self, query: str) -> Iterator[AlbumInfo]:
"""Returns a list of AlbumInfo objects for a beatport search query."""
# Strip non-word characters from query. Things like "!" and "-" can
# cause a query to return no results, even if they match the artist or
@ -425,16 +465,22 @@ class BeatportPlugin(BeetsPlugin):
# Strip medium information from query, Things like "CD1" and "disk 1"
# can also negate an otherwise positive result.
query = re.sub(r"\b(CD|disc)\s*\d+", "", query, flags=re.I)
albums = [self._get_album_info(x) for x in self.client.search(query)]
return albums
for beatport_release in self.client.search(query, "release"):
if beatport_release is None:
continue
yield self._get_album_info(beatport_release)
def _get_album_info(self, release):
def _get_album_info(self, release: BeatportRelease) -> AlbumInfo:
"""Returns an AlbumInfo object for a Beatport Release object."""
va = len(release.artists) > 3
va = release.artists is not None and len(release.artists) > 3
artist, artist_id = self._get_artist(release.artists)
if va:
artist = "Various Artists"
tracks = [self._get_track_info(x) for x in release.tracks]
tracks: list[TrackInfo] = []
if release.tracks is not None:
tracks = [self._get_track_info(x) for x in release.tracks]
release_date = release.release_date
return AlbumInfo(
album=release.name,
@ -445,18 +491,18 @@ class BeatportPlugin(BeetsPlugin):
tracks=tracks,
albumtype=release.category,
va=va,
year=release.release_date.year,
month=release.release_date.month,
day=release.release_date.day,
label=release.label_name,
catalognum=release.catalog_number,
media="Digital",
data_source=self.data_source,
data_url=release.url,
genre=release.genre,
year=release_date.year if release_date else None,
month=release_date.month if release_date else None,
day=release_date.day if release_date else None,
)
def _get_track_info(self, track):
def _get_track_info(self, track: BeatportTrack) -> TrackInfo:
"""Returns a TrackInfo object for a Beatport Track object."""
title = track.name
if track.mix_name != "Original Mix":
@ -482,9 +528,7 @@ class BeatportPlugin(BeetsPlugin):
"""Returns an artist string (all artists) and an artist_id (the main
artist) for a list of Beatport release or track artists.
"""
return MetadataSourcePlugin.get_artist(
artists=artists, id_key=0, name_key=1
)
return self.get_artist(artists=artists, id_key=0, name_key=1)
def _get_tracks(self, query):
"""Returns a list of TrackInfo objects for a Beatport query."""

View file

@ -30,7 +30,7 @@ from typing import TYPE_CHECKING
import beets
import beets.ui
from beets import dbcore, vfs
from beets import dbcore, logging, vfs
from beets.library import Item
from beets.plugins import BeetsPlugin
from beets.util import as_string, bluelet
@ -38,6 +38,17 @@ from beets.util import as_string, bluelet
if TYPE_CHECKING:
from beets.dbcore.query import Query
log = logging.getLogger(__name__)
try:
from . import gstplayer
except ImportError as e:
raise ImportError(
"Gstreamer Python bindings not found."
' Install "gstreamer1.0" and "python-gi" or similar package to use BPD.'
) from e
PROTOCOL_VERSION = "0.16.0"
BUFSIZE = 1024
@ -94,11 +105,6 @@ SUBSYSTEMS = [
]
# Gstreamer import error.
class NoGstreamerError(Exception):
pass
# Error-handling, exceptions, parameter parsing.
@ -1099,14 +1105,6 @@ class Server(BaseServer):
"""
def __init__(self, library, host, port, password, ctrl_port, log):
try:
from beetsplug.bpd import gstplayer
except ImportError as e:
# This is a little hacky, but it's the best I know for now.
if e.args[0].endswith(" gst"):
raise NoGstreamerError()
else:
raise
log.info("Starting server...")
super().__init__(host, port, password, ctrl_port, log)
self.lib = library
@ -1616,16 +1614,9 @@ class BPDPlugin(BeetsPlugin):
def start_bpd(self, lib, host, port, password, volume, ctrl_port):
"""Starts a BPD server."""
try:
server = Server(lib, host, port, password, ctrl_port, self._log)
server.cmd_setvol(None, volume)
server.run()
except NoGstreamerError:
self._log.error("Gstreamer Python bindings not found.")
self._log.error(
'Install "gstreamer1.0" and "python-gi"'
"or similar package to use BPD."
)
server = Server(lib, host, port, password, ctrl_port, self._log)
server.cmd_setvol(None, volume)
server.run()
def commands(self):
cmd = beets.ui.Subcommand(

View file

@ -19,12 +19,15 @@ autotagger. Requires the pyacoustid library.
import re
from collections import defaultdict
from functools import cached_property, partial
from typing import Iterable
import acoustid
import confuse
from beets import config, plugins, ui, util
from beets import config, ui, util
from beets.autotag.distance import Distance
from beets.autotag.hooks import TrackInfo
from beets.metadata_plugins import MetadataSourcePlugin
from beetsplug.musicbrainz import MusicBrainzPlugin
API_KEY = "1vOwZtEn"
@ -168,10 +171,9 @@ def _all_releases(items):
yield release_id
class AcoustidPlugin(plugins.BeetsPlugin):
class AcoustidPlugin(MetadataSourcePlugin):
def __init__(self):
super().__init__()
self.config.add(
{
"auto": True,
@ -210,7 +212,7 @@ class AcoustidPlugin(plugins.BeetsPlugin):
self._log.debug("acoustid album candidates: {0}", len(albums))
return albums
def item_candidates(self, item, artist, title):
def item_candidates(self, item, artist, title) -> Iterable[TrackInfo]:
if item.path not in _matches:
return []
@ -223,6 +225,14 @@ class AcoustidPlugin(plugins.BeetsPlugin):
self._log.debug("acoustid item candidates: {0}", len(tracks))
return tracks
def album_for_id(self, *args, **kwargs):
# Lookup by fingerprint ID does not make too much sense.
return None
def track_for_id(self, *args, **kwargs):
# Lookup by fingerprint ID does not make too much sense.
return None
def commands(self):
submit_cmd = ui.Subcommand(
"submit", help="submit Acoustid fingerprints"

View file

@ -26,16 +26,19 @@ import unidecode
from beets import ui
from beets.autotag import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response
from beets.metadata_plugins import (
IDResponse,
SearchApiMetadataSourcePlugin,
SearchFilter,
)
if TYPE_CHECKING:
from beets.library import Item, Library
from beetsplug._typing import JSONDict
from ._typing import JSONDict
class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
data_source = "Deezer"
class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]):
item_types = {
"deezer_track_rank": types.INTEGER,
"deezer_track_id": types.INTEGER,
@ -63,7 +66,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Fetch an album by its Deezer ID or URL."""
if not (deezer_id := self._get_id(album_id)):
if not (deezer_id := self._extract_id(album_id)):
return None
album_url = f"{self.album_url}{deezer_id}"
@ -145,11 +148,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
)
def track_for_id(self, track_id: str) -> None | TrackInfo:
"""Fetch a track by its Deezer ID or URL.
"""Fetch a track by its Deezer ID or URL and return a
TrackInfo object or None if the track is not found.
:param track_id: (Optional) Deezer ID or URL for the track. Either
``track_id`` or ``track_data`` must be provided.
Returns a TrackInfo object or None if the track is not found.
"""
if not (deezer_id := self._get_id(track_id)):
if not (deezer_id := self._extract_id(track_id)):
self._log.debug("Invalid Deezer track_id: {}", track_id)
return None
@ -162,11 +168,13 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
# Get album's tracks to set `track.index` (position on the entire
# release) and `track.medium_total` (total number of tracks on
# the track's disc).
album_tracks_obj = self.fetch_data(
self.album_url + str(track_data["album"]["id"]) + "/tracks"
)
if album_tracks_obj is None:
if not (
album_tracks_obj := self.fetch_data(
self.album_url + str(track_data["album"]["id"]) + "/tracks"
)
):
return None
try:
album_tracks_data = album_tracks_obj["data"]
except KeyError:
@ -187,7 +195,6 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
"""Convert a Deezer track object dict to a TrackInfo object.
:param track_data: Deezer Track object dict
:return: TrackInfo object for track
"""
artist, artist_id = self.get_artist(
track_data.get("contributors", [track_data["artist"]])
@ -211,7 +218,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
@staticmethod
def _construct_search_query(
filters: dict[str, str], keywords: str = ""
filters: SearchFilter, keywords: str = ""
) -> str:
"""Construct a query string with the specified filters and keywords to
be provided to the Deezer Search API
@ -242,14 +249,14 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
"radio",
"user",
],
filters: dict[str, str],
filters: SearchFilter,
keywords="",
) -> Sequence[Response]:
) -> Sequence[IDResponse]:
"""Query the Deezer Search API for the specified ``keywords``, applying
the provided ``filters``.
:param query_type: The Deezer Search API method to use.
:param keywords: (Optional) Query keywords to use.
:param filters: Field filters to apply.
:param keywords: Query keywords to use.
:return: JSON data for the class:`Response <Response>` object or None
if no search results are returned.
"""
@ -269,7 +276,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
e,
)
return ()
response_data = response.json().get("data", [])
response_data: Sequence[IDResponse] = response.json().get("data", [])
self._log.debug(
"Found {} result(s) from {} for '{}'",
len(response_data),

View file

@ -27,7 +27,7 @@ import time
import traceback
from functools import cache
from string import ascii_lowercase
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence
import confuse
from discogs_client import Client, Master, Release
@ -40,8 +40,7 @@ import beets.ui
from beets import config
from beets.autotag.distance import string_dist
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, get_distance
from beets.util.id_extractors import extract_release_id
from beets.metadata_plugins import MetadataSourcePlugin
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
@ -84,7 +83,7 @@ class ReleaseFormat(TypedDict):
descriptions: list[str] | None
class DiscogsPlugin(BeetsPlugin):
class DiscogsPlugin(MetadataSourcePlugin):
def __init__(self):
super().__init__()
self.config.add(
@ -169,20 +168,8 @@ class DiscogsPlugin(BeetsPlugin):
return token, secret
def album_distance(self, items, album_info, mapping):
"""Returns the album distance."""
return get_distance(
data_source="Discogs", info=album_info, config=self.config
)
def track_distance(self, item, track_info):
"""Returns the track distance."""
return get_distance(
data_source="Discogs", info=track_info, config=self.config
)
def candidates(
self, items: list[Item], artist: str, album: str, va_likely: bool
self, items: Sequence[Item], artist: str, album: str, va_likely: bool
) -> Iterable[AlbumInfo]:
return self.get_albums(f"{artist} {album}" if va_likely else album)
@ -217,7 +204,7 @@ class DiscogsPlugin(BeetsPlugin):
"""
self._log.debug("Searching for release {0}", album_id)
discogs_id = extract_release_id("discogs", album_id)
discogs_id = self._extract_id(album_id)
if not discogs_id:
return None
@ -272,7 +259,7 @@ class DiscogsPlugin(BeetsPlugin):
exc_info=True,
)
return []
return map(self.get_album_info, releases)
return filter(None, map(self.get_album_info, releases))
@cache
def get_master_year(self, master_id: str) -> int | None:
@ -334,7 +321,7 @@ class DiscogsPlugin(BeetsPlugin):
self._log.warning("Release does not contain the required fields")
return None
artist, artist_id = MetadataSourcePlugin.get_artist(
artist, artist_id = self.get_artist(
[a.data for a in result.artists], join_key="join"
)
album = re.sub(r" +", " ", result.title)
@ -359,7 +346,7 @@ class DiscogsPlugin(BeetsPlugin):
else:
genre = base_genre
discogs_albumid = extract_release_id("discogs", result.data.get("uri"))
discogs_albumid = self._extract_id(result.data.get("uri"))
# Extract information for the optional AlbumInfo fields that are
# contained on nested discogs fields.
@ -419,7 +406,7 @@ class DiscogsPlugin(BeetsPlugin):
genre=genre,
media=media,
original_year=original_year,
data_source="Discogs",
data_source=self.data_source,
data_url=data_url,
discogs_albumid=discogs_albumid,
discogs_labelid=labelid,
@ -638,7 +625,7 @@ class DiscogsPlugin(BeetsPlugin):
title = f"{prefix}: {title}"
track_id = None
medium, medium_index, _ = self.get_track_index(track["position"])
artist, artist_id = MetadataSourcePlugin.get_artist(
artist, artist_id = self.get_artist(
track.get("artists", []), join_key="join"
)
length = self.get_track_length(track["duration"])

View file

@ -401,7 +401,7 @@ class LastGenrePlugin(plugins.BeetsPlugin):
label = "album"
if not new_genres and "artist" in self.sources:
new_genres = None
new_genres = []
if isinstance(obj, library.Item):
new_genres = self.fetch_artist_genre(obj)
label = "artist"

View file

@ -16,7 +16,7 @@
from collections import defaultdict
from beets import autotag, library, plugins, ui, util
from beets import autotag, library, metadata_plugins, ui, util
from beets.plugins import BeetsPlugin, apply_item_changes
@ -78,7 +78,9 @@ class MBSyncPlugin(BeetsPlugin):
)
continue
if not (track_info := plugins.track_for_id(item.mb_trackid)):
if not (
track_info := metadata_plugins.track_for_id(item.mb_trackid)
):
self._log.info(
"Recording ID not found: {0.mb_trackid} for track {0}", item
)
@ -99,7 +101,9 @@ class MBSyncPlugin(BeetsPlugin):
self._log.info("Skipping album with no mb_albumid: {}", album)
continue
if not (album_info := plugins.album_for_id(album.mb_albumid)):
if not (
album_info := metadata_plugins.album_for_id(album.mb_albumid)
):
self._log.info(
"Release ID {0.mb_albumid} not found for album {0}", album
)

View file

@ -21,7 +21,7 @@ from collections.abc import Iterator
import musicbrainzngs
from musicbrainzngs.musicbrainz import MusicBrainzError
from beets import config, plugins
from beets import config, metadata_plugins
from beets.dbcore import types
from beets.library import Album, Item, Library
from beets.plugins import BeetsPlugin
@ -222,7 +222,7 @@ class MissingPlugin(BeetsPlugin):
item_mbids = {x.mb_trackid for x in album.items()}
# fetch missing items
# TODO: Implement caching that without breaking other stuff
if album_info := plugins.album_for_id(album.mb_albumid):
if album_info := metadata_plugins.album_for_id(album.mb_albumid):
for track_info in album_info.tracks:
if track_info.track_id not in item_mbids:
self._log.debug(

View file

@ -20,7 +20,7 @@ import traceback
from collections import Counter
from functools import cached_property
from itertools import product
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Iterable, Sequence
from urllib.parse import urljoin
import musicbrainzngs
@ -28,11 +28,10 @@ import musicbrainzngs
import beets
import beets.autotag.hooks
from beets import config, plugins, util
from beets.plugins import BeetsPlugin
from beets.metadata_plugins import MetadataSourcePlugin
from beets.util.id_extractors import extract_release_id
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from typing import Literal
from beets.library import Item
@ -362,9 +361,7 @@ def _merge_pseudo_and_actual_album(
return merged
class MusicBrainzPlugin(BeetsPlugin):
data_source = "Musicbrainz"
class MusicBrainzPlugin(MetadataSourcePlugin):
def __init__(self):
"""Set up the python-musicbrainz-ngs module according to settings
from the beets configuration. This should be called at startup.
@ -421,7 +418,7 @@ class MusicBrainzPlugin(BeetsPlugin):
medium=medium,
medium_index=medium_index,
medium_total=medium_total,
data_source="MusicBrainz",
data_source=self.data_source,
data_url=track_url(recording["id"]),
)
@ -632,7 +629,7 @@ class MusicBrainzPlugin(BeetsPlugin):
artists_sort=artists_sort_names,
artist_credit=artist_credit_name,
artists_credit=artists_credit_names,
data_source="MusicBrainz",
data_source=self.data_source,
data_url=album_url(release["id"]),
barcode=release.get("barcode"),
)
@ -767,7 +764,7 @@ class MusicBrainzPlugin(BeetsPlugin):
return mb_field_by_tag
def get_album_criteria(
self, items: list[Item], artist: str, album: str, va_likely: bool
self, items: Sequence[Item], artist: str, album: str, va_likely: bool
) -> dict[str, str]:
criteria = {
"release": album,
@ -813,12 +810,11 @@ class MusicBrainzPlugin(BeetsPlugin):
def candidates(
self,
items: list[Item],
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Iterator[beets.autotag.hooks.AlbumInfo]:
) -> Iterable[beets.autotag.hooks.AlbumInfo]:
criteria = self.get_album_criteria(items, artist, album, va_likely)
release_ids = (r["id"] for r in self._search_api("release", criteria))
@ -826,7 +822,7 @@ class MusicBrainzPlugin(BeetsPlugin):
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterator[beets.autotag.hooks.TrackInfo]:
) -> Iterable[beets.autotag.hooks.TrackInfo]:
criteria = {"artist": artist, "recording": title, "alias": title}
yield from filter(
@ -841,7 +837,7 @@ class MusicBrainzPlugin(BeetsPlugin):
MusicBrainzAPIError.
"""
self._log.debug("Requesting MusicBrainz release {}", album_id)
if not (albumid := extract_release_id("musicbrainz", album_id)):
if not (albumid := self._extract_id(album_id)):
self._log.debug("Invalid MBID ({0}).", album_id)
return None
@ -878,7 +874,7 @@ class MusicBrainzPlugin(BeetsPlugin):
"""Fetches a track by its MusicBrainz ID. Returns a TrackInfo object
or None if no track is found. May raise a MusicBrainzAPIError.
"""
if not (trackid := extract_release_id("musicbrainz", track_id)):
if not (trackid := self._extract_id(track_id)):
self._log.debug("Invalid MBID ({0}).", track_id)
return None

View file

@ -1161,7 +1161,9 @@ class ExceptionWatcher(Thread):
Once an exception occurs, raise it and execute a callback.
"""
def __init__(self, queue: queue.Queue, callback: Callable[[], None]):
def __init__(
self, queue: queue.Queue[Exception], callback: Callable[[], None]
):
self._queue = queue
self._callback = callback
self._stopevent = Event()
@ -1197,7 +1199,9 @@ BACKENDS: dict[str, type[Backend]] = {b.NAME: b for b in BACKEND_CLASSES}
class ReplayGainPlugin(BeetsPlugin):
"""Provides ReplayGain analysis."""
def __init__(self):
pool: ThreadPool | None = None
def __init__(self) -> None:
super().__init__()
# default backend is 'command' for backward-compatibility.
@ -1261,9 +1265,6 @@ class ReplayGainPlugin(BeetsPlugin):
except (ReplayGainError, FatalReplayGainError) as e:
raise ui.UserError(f"replaygain initialization failed: {e}")
# Start threadpool lazily.
self.pool = None
def should_use_r128(self, item: Item) -> bool:
"""Checks the plugin setting to decide whether the calculation
should be done using the EBU R128 standard and use R128_ tags instead.
@ -1420,7 +1421,7 @@ class ReplayGainPlugin(BeetsPlugin):
"""Open a `ThreadPool` instance in `self.pool`"""
if self.pool is None and self.backend_instance.do_parallel:
self.pool = ThreadPool(threads)
self.exc_queue: queue.Queue = queue.Queue()
self.exc_queue: queue.Queue[Exception] = queue.Queue()
signal.signal(signal.SIGINT, self._interrupt)

View file

@ -25,7 +25,7 @@ import json
import re
import time
import webbrowser
from typing import TYPE_CHECKING, Any, Literal, Sequence
from typing import TYPE_CHECKING, Any, Literal, Sequence, Union
import confuse
import requests
@ -34,7 +34,12 @@ import unidecode
from beets import ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response
from beets.library import Library
from beets.metadata_plugins import (
IDResponse,
SearchApiMetadataSourcePlugin,
SearchFilter,
)
if TYPE_CHECKING:
from beets.library import Library
@ -43,13 +48,41 @@ if TYPE_CHECKING:
DEFAULT_WAITING_TIME = 5
class SpotifyAPIError(Exception):
class SearchResponseAlbums(IDResponse):
"""A response returned by the Spotify API.
We only use items and disregard the pagination information.
i.e. res["albums"]["items"][0].
There are more fields in the response, but we only type
the ones we currently use.
see https://developer.spotify.com/documentation/web-api/reference/search
"""
album_type: str
available_markets: Sequence[str]
name: str
class SearchResponseTracks(IDResponse):
"""A track response returned by the Spotify API."""
album: SearchResponseAlbums
available_markets: Sequence[str]
popularity: int
name: str
class APIError(Exception):
pass
class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
data_source = "Spotify"
class SpotifyPlugin(
SearchApiMetadataSourcePlugin[
Union[SearchResponseAlbums, SearchResponseTracks]
]
):
item_types = {
"spotify_track_popularity": types.INTEGER,
"spotify_acousticness": types.FLOAT,
@ -129,7 +162,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
"""Get the path to the JSON file for storing the OAuth token."""
return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True))
def _authenticate(self):
def _authenticate(self) -> None:
"""Request an access token via the Client Credentials Flow:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow
"""
@ -180,7 +213,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
"""
if retry_count > max_retries:
raise SpotifyAPIError("Maximum retries reached.")
raise APIError("Maximum retries reached.")
try:
response = requests.request(
@ -194,14 +227,14 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
return response.json()
except requests.exceptions.ReadTimeout:
self._log.error("ReadTimeout.")
raise SpotifyAPIError("Request timed out.")
raise APIError("Request timed out.")
except requests.exceptions.ConnectionError as e:
self._log.error(f"Network error: {e}")
raise SpotifyAPIError("Network error.")
raise APIError("Network error.")
except requests.exceptions.RequestException as e:
if e.response is None:
self._log.error(f"Request failed: {e}")
raise SpotifyAPIError("Request failed.")
raise APIError("Request failed.")
if e.response.status_code == 401:
self._log.debug(
f"{self.data_source} access token has expired. "
@ -215,7 +248,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
retry_count=retry_count + 1,
)
elif e.response.status_code == 404:
raise SpotifyAPIError(
raise APIError(
f"API Error: {e.response.status_code}\n"
f"URL: {url}\nparams: {params}"
)
@ -235,18 +268,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
)
elif e.response.status_code == 503:
self._log.error("Service Unavailable.")
raise SpotifyAPIError("Service Unavailable.")
raise APIError("Service Unavailable.")
elif e.response.status_code == 502:
self._log.error("Bad Gateway.")
raise SpotifyAPIError("Bad Gateway.")
raise APIError("Bad Gateway.")
elif e.response is not None:
raise SpotifyAPIError(
raise APIError(
f"{self.data_source} API error:\n{e.response.text}\n"
f"URL:\n{url}\nparams:\n{params}"
)
else:
self._log.error(f"Request failed. Error: {e}")
raise SpotifyAPIError("Request failed.")
raise APIError("Request failed.")
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Fetch an album by its Spotify ID or URL and return an
@ -257,7 +290,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
:return: AlbumInfo object for album
:rtype: beets.autotag.hooks.AlbumInfo or None
"""
if not (spotify_id := self._get_id(album_id)):
if not (spotify_id := self._extract_id(album_id)):
return None
album_data = self._handle_response("get", self.album_url + spotify_id)
@ -360,7 +393,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
Returns a TrackInfo object or None if the track is not found.
"""
if not (spotify_id := self._get_id(track_id)):
if not (spotify_id := self._extract_id(track_id)):
self._log.debug("Invalid Spotify ID: {}", track_id)
return None
@ -390,7 +423,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
return track
def _construct_search_query(
self, filters: dict[str, str], keywords: str = ""
self, filters: SearchFilter, keywords: str = ""
) -> str:
"""Construct a query string with the specified filters and keywords to
be provided to the Spotify Search API
@ -400,9 +433,10 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
:param keywords: (Optional) Query keywords to use.
:return: Query string to be provided to the Search API.
"""
query_components = [
keywords,
" ".join(":".join((k, v)) for k, v in filters.items()),
" ".join(f"{k}:{v}" for k, v in filters.items()),
]
query = " ".join([q for q in query_components if q])
if not isinstance(query, str):
@ -416,9 +450,9 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
def _search_api(
self,
query_type: Literal["album", "track"],
filters: dict[str, str],
filters: SearchFilter,
keywords: str = "",
) -> Sequence[Response]:
) -> Sequence[SearchResponseAlbums | SearchResponseTracks]:
"""Query the Spotify Search API for the specified ``keywords``,
applying the provided ``filters``.
@ -436,7 +470,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
self.search_url,
params={"q": query, "type": query_type},
)
except SpotifyAPIError as e:
except APIError as e:
self._log.debug("Spotify API error: {}", e)
return ()
response_data = response.get(query_type + "s", {}).get("items", [])
@ -557,7 +591,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
keywords = item[self.config["track_field"].get()]
# Query the Web API for each track, look for the items' JSON data
query_filters = {"artist": artist, "album": album}
query_filters: SearchFilter = {"artist": artist, "album": album}
response_data_tracks = self._search_api(
query_type="track", keywords=keywords, filters=query_filters
)
@ -570,7 +604,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
continue
# Apply market filter if requested
region_filter = self.config["region_filter"].get()
region_filter: str = self.config["region_filter"].get()
if region_filter:
response_data_tracks = [
track_data
@ -595,7 +629,11 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
len(response_data_tracks),
)
chosen_result = max(
response_data_tracks, key=lambda x: x["popularity"]
response_data_tracks,
key=lambda x: x[
# We are sure this is a track response!
"popularity" # type: ignore[typeddict-item]
],
)
results.append(chosen_result)
@ -691,16 +729,18 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
def track_info(self, track_id: str):
"""Fetch a track's popularity and external IDs using its Spotify ID."""
track_data = self._handle_response("get", self.track_url + track_id)
external_ids = track_data.get("external_ids", {})
popularity = track_data.get("popularity")
self._log.debug(
"track_popularity: {} and track_isrc: {}",
track_data.get("popularity"),
track_data.get("external_ids").get("isrc"),
popularity,
external_ids.get("isrc"),
)
return (
track_data.get("popularity"),
track_data.get("external_ids").get("isrc"),
track_data.get("external_ids").get("ean"),
track_data.get("external_ids").get("upc"),
popularity,
external_ids.get("isrc"),
external_ids.get("ean"),
external_ids.get("upc"),
)
def track_audio_features(self, track_id: str):
@ -709,6 +749,6 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
return self._handle_response(
"get", self.audio_features_url + track_id
)
except SpotifyAPIError as e:
except APIError as e:
self._log.debug("Spotify API error: {}", e)
return None

View file

@ -1,5 +1,6 @@
# Don't post a comment on pull requests.
comment: off
comment:
layout: "condensed_header, condensed_files"
require_changes: true
# Sets non-blocking status checks
# https://docs.codecov.com/docs/commit-status#informational
@ -11,7 +12,7 @@ coverage:
patch:
default:
informational: true
changes: no
changes: false
github_checks:
annotations: false

View file

@ -67,9 +67,20 @@ For plugin developers:
* The `fetchart` plugins has seen a few changes to function signatures and
source registration in the process of introducing typings to the code.
Custom art sources might need to be adapted.
* We split the responsibilities of plugins into two base classes
#. :class:`beets.plugins.BeetsPlugin`
is the base class for all plugins, any plugin needs to inherit from this class.
#. :class:`beets.metadata_plugin.MetadataSourcePlugin`
allows plugins to act like metadata sources. E.g. used by the MusicBrainz plugin. All plugins
in the beets repo are opted into this class where applicable. If you are maintaining a plugin
that acts like a metadata source, i.e. you expose any of ``track_for_id``,
``album_for_id``, ``candidates``, ``item_candidates``, ``album_distance``, ``track_distance`` methods,
please update your plugin to inherit from the new baseclass, as otherwise your plugin will
stop working with the next major release.
Other changes:
* Refactor: Split responsibilities of Plugins into MetaDataPlugins and general Plugins.
* Documentation structure for auto generated API references changed slightly.
Autogenerated API references are now located in the `docs/api` subdirectory.
* :doc:`/plugins/substitute`: Fix rST formatting for example cases so that each

View file

@ -124,7 +124,7 @@ aura = ["flask", "flask-cors", "Pillow"]
autobpm = ["librosa", "resampy"]
# badfiles # mp3val and flac
beatport = ["requests-oauthlib"]
bpd = ["PyGObject"] # python-gi and GStreamer 1.0+
bpd = ["PyGObject"] # gobject-introspection, gstreamer1.0-plugins-base, python3-gst-1.0
chroma = ["pyacoustid"] # chromaprint or fpcalc
# convert # ffmpeg
docs = ["pydata-sphinx-theme", "sphinx"]

View file

@ -15,7 +15,7 @@ markers =
data_file = .reports/coverage/data
branch = true
relative_files = true
omit =
omit =
beets/test/*
beetsplug/_typing.py
@ -34,7 +34,6 @@ exclude_also =
show_contexts = true
[mypy]
files = beets,beetsplug,test,extra,docs
allow_any_generics = false
# FIXME: Would be better to actually type the libraries (if under our control),
# or write our own stubs. For now, silence errors
@ -46,6 +45,8 @@ explicit_package_bases = true
# config for all files.
[[mypy-beets.plugins]]
disallow_untyped_decorators = true
disallow_any_generics = true
check_untyped_defs = true
allow_redefinition = true
[[mypy-beets.metadata_plugins]]
disallow_untyped_decorators = true
check_untyped_defs = true

View file

@ -14,19 +14,15 @@
"""Tests for BPD's implementation of the MPD protocol."""
import importlib.util
import multiprocessing as mp
import os
import socket
import sys
import tempfile
import threading
import time
import unittest
from contextlib import contextmanager
# Mock GstPlayer so that the forked process doesn't attempt to import gi:
from unittest import mock
from unittest.mock import MagicMock, patch
import confuse
import pytest
@ -34,43 +30,8 @@ import yaml
from beets.test.helper import PluginTestCase
from beets.util import bluelet
from beetsplug import bpd
gstplayer = importlib.util.module_from_spec(
importlib.util.find_spec("beetsplug.bpd.gstplayer")
)
def _gstplayer_play(*_):
bpd.gstplayer._GstPlayer.playing = True
return mock.DEFAULT
gstplayer._GstPlayer = mock.MagicMock(
spec_set=[
"time",
"volume",
"playing",
"run",
"play_file",
"pause",
"stop",
"seek",
"play",
"get_decoders",
],
**{
"playing": False,
"volume": 0,
"time.return_value": (0, 0),
"play_file.side_effect": _gstplayer_play,
"play.side_effect": _gstplayer_play,
"get_decoders.return_value": {"default": ({"audio/mpeg"}, {"mp3"})},
},
)
gstplayer.GstPlayer = lambda _: gstplayer._GstPlayer
sys.modules["beetsplug.bpd.gstplayer"] = gstplayer
bpd.gstplayer = gstplayer
bpd = pytest.importorskip("beetsplug.bpd")
class CommandParseTest(unittest.TestCase):
@ -256,7 +217,7 @@ def implements(commands, fail=False):
bluelet_listener = bluelet.Listener
@mock.patch("beets.util.bluelet.Listener")
@patch("beets.util.bluelet.Listener")
def start_server(args, assigned_port, listener_patch):
"""Start the bpd server, writing the port to `assigned_port`."""
@ -938,7 +899,7 @@ class BPDPlaylistsTest(BPDTestHelper):
response = client.send_command("load", "anything")
self._assert_failed(response, bpd.ERROR_NO_EXIST)
@unittest.skip
@unittest.expectedFailure
def test_cmd_playlistadd(self):
with self.run_bpd() as client:
self._bpd_add(client, self.item1, playlist="anything")
@ -1128,7 +1089,7 @@ class BPDConnectionTest(BPDTestHelper):
self._assert_ok(response)
assert self.TAGTYPES == set(response.data["tagtype"])
@unittest.skip
@unittest.expectedFailure
def test_tagtypes_mask(self):
with self.run_bpd() as client:
response = client.send_command("tagtypes", "clear")
@ -1169,6 +1130,10 @@ class BPDReflectionTest(BPDTestHelper):
fail=True,
)
@patch(
"beetsplug.bpd.gstplayer.GstPlayer.get_decoders",
MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}),
)
def test_cmd_decoders(self):
with self.run_bpd() as client:
response = client.send_command("decoders")

View file

@ -23,7 +23,7 @@ class MbsyncCliTest(PluginTestCase):
plugin = "mbsync"
@patch(
"beets.plugins.album_for_id",
"beets.metadata_plugins.album_for_id",
Mock(
side_effect=lambda *_: AlbumInfo(
album_id="album id",
@ -33,7 +33,7 @@ class MbsyncCliTest(PluginTestCase):
),
)
@patch(
"beets.plugins.track_for_id",
"beets.metadata_plugins.track_for_id",
Mock(
side_effect=lambda *_: TrackInfo(
track_id="singleton id", title="new title"

View file

@ -134,7 +134,7 @@ class TypesPluginTest(PluginTestCase):
def test_unknown_type_error(self):
self.config["types"] = {"flex": "unkown type"}
with pytest.raises(ConfigValueError):
self.run_command("ls")
self.add_item(flex="test")
def test_template_if_def(self):
# Tests for a subtle bug when using %ifdef in templates along with

View file

@ -913,7 +913,9 @@ def album_candidates_mock(*args, **kwargs):
)
@patch("beets.plugins.candidates", Mock(side_effect=album_candidates_mock))
@patch(
"beets.metadata_plugins.candidates", Mock(side_effect=album_candidates_mock)
)
class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase):
plugin = "musicbrainz"
@ -1031,7 +1033,10 @@ def item_candidates_mock(*args, **kwargs):
)
@patch("beets.plugins.item_candidates", Mock(side_effect=item_candidates_mock))
@patch(
"beets.metadata_plugins.item_candidates",
Mock(side_effect=item_candidates_mock),
)
class ImportDuplicateSingletonTest(ImportTestCase):
def setUp(self):
super().setUp()
@ -1567,8 +1572,14 @@ def mocked_get_track_by_id(id_):
)
@patch("beets.plugins.track_for_id", Mock(side_effect=mocked_get_track_by_id))
@patch("beets.plugins.album_for_id", Mock(side_effect=mocked_get_album_by_id))
@patch(
"beets.metadata_plugins.track_for_id",
Mock(side_effect=mocked_get_track_by_id),
)
@patch(
"beets.metadata_plugins.album_for_id",
Mock(side_effect=mocked_get_album_by_id),
)
class ImportIdTest(ImportTestCase):
ID_RELEASE_0 = "00000000-0000-0000-0000-000000000000"
ID_RELEASE_1 = "11111111-1111-1111-1111-111111111111"
@ -1616,9 +1627,9 @@ class ImportIdTest(ImportTestCase):
task = importer.ImportTask(
paths=self.import_dir, toppath="top path", items=[_common.item()]
)
task.search_ids = [self.ID_RELEASE_0, self.ID_RELEASE_1]
task.lookup_candidates()
task.lookup_candidates([self.ID_RELEASE_0, self.ID_RELEASE_1])
assert {"VALID_RELEASE_0", "VALID_RELEASE_1"} == {
c.info.album for c in task.candidates
}
@ -1628,9 +1639,9 @@ class ImportIdTest(ImportTestCase):
task = importer.SingletonImportTask(
toppath="top path", item=_common.item()
)
task.search_ids = [self.ID_RECORDING_0, self.ID_RECORDING_1]
task.lookup_candidates()
task.lookup_candidates([self.ID_RECORDING_0, self.ID_RECORDING_1])
assert {"VALID_RECORDING_0", "VALID_RECORDING_1"} == {
c.info.title for c in task.candidates
}

View file

@ -1024,7 +1024,9 @@ class ConfigTest(TestPluginTestCase):
file.write("plugins: test")
self.run_command("--config", self.cli_config_path, "plugin", lib=None)
assert plugins.find_plugins()[0].is_test_plugin
plugs = plugins.find_plugins()
assert len(plugs) == 1
assert plugs[0].is_test_plugin
self.unload_plugins()
def test_beetsdir_config(self):