Deduplicate candidate methods using _search_api method

This commit is contained in:
Šarūnas Nejus 2025-05-18 14:22:19 +01:00
parent 0102f3ce7d
commit 2ec65ed8ca
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
2 changed files with 100 additions and 131 deletions

View file

@ -33,6 +33,7 @@ from beets.util.id_extractors import extract_release_id
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from typing import Literal
from beets.library import Item
@ -769,24 +770,46 @@ class MusicBrainzPlugin(BeetsPlugin):
self, items: list[Item], artist: str, album: str, va_likely: bool
) -> dict[str, str]:
criteria = {
"release": album.lower().strip(),
"release": album,
"tracks": str(len(items)),
} | (
{"arid": VARIOUS_ARTISTS_ID}
if va_likely
else {"artist": artist.lower().strip()}
)
} | ({"arid": VARIOUS_ARTISTS_ID} if va_likely else {"artist": artist})
for tag, mb_field in self.extra_mb_field_by_tag.items():
most_common, _ = util.plurality(i.get(tag) for i in items)
value = str(most_common).lower().strip()
value = str(most_common)
if tag == "catalognum":
value = value.replace(" ", "")
if value:
criteria[mb_field] = value
criteria[mb_field] = value
return criteria
def _search_api(
self,
query_type: Literal["recording", "release"],
filters: dict[str, str],
) -> list[JSONDict]:
"""Perform MusicBrainz API search and return results.
Execute a search against the MusicBrainz API for recordings or releases
using the provided criteria. Handles API errors by converting them into
MusicBrainzAPIError exceptions with contextual information.
"""
filters = {
k: _v for k, v in filters.items() if (_v := v.lower().strip())
}
self._log.debug(
"Searching for MusicBrainz {}s with: {!r}", query_type, filters
)
try:
method = getattr(musicbrainzngs, f"search_{query_type}s")
res = method(limit=self.config["searchlimit"].get(int), **filters)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, f"{query_type} search", filters, traceback.format_exc()
)
return res[f"{query_type}-list"]
def candidates(
self,
items: list[Item],
@ -795,54 +818,19 @@ class MusicBrainzPlugin(BeetsPlugin):
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Iterator[beets.autotag.hooks.AlbumInfo]:
"""Searches for a single album ("release" in MusicBrainz parlance)
and returns an iterator over AlbumInfo objects. May raise a
MusicBrainzAPIError.
The query consists of an artist name, an album name, and,
optionally, a number of tracks on the album and any other extra tags.
"""
criteria = self.get_album_criteria(items, artist, album, va_likely)
release_ids = (r["id"] for r in self._search_api("release", criteria))
try:
self._log.debug(
"Searching for MusicBrainz releases with: {!r}", criteria
)
res = musicbrainzngs.search_releases(
limit=self.config["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "release search", criteria, traceback.format_exc()
)
for release in res["release-list"]:
# The search result is missing some data (namely, the tracks),
# so we just use the ID and fetch the rest of the information.
albuminfo = self.album_for_id(release["id"])
if albuminfo is not None:
yield albuminfo
yield from filter(None, map(self.album_for_id, release_ids))
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterator[beets.autotag.hooks.TrackInfo]:
"""Searches for a single track and returns an iterable of TrackInfo
objects. May raise a MusicBrainzAPIError.
"""
criteria = {
"artist": artist.lower().strip(),
"recording": title.lower().strip(),
}
criteria = {"artist": artist, "recording": title}
try:
res = musicbrainzngs.search_recordings(
limit=self.config["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "recording search", criteria, traceback.format_exc()
)
for recording in res["recording-list"]:
yield self.track_info(recording)
yield from filter(
None, map(self.track_info, self._search_api("recording", criteria))
)
def album_for_id(
self, album_id: str

View file

@ -757,79 +757,6 @@ class ArtistFlatteningTest(BeetsTestCase):
class MBLibraryTest(MusicBrainzTestCase):
def test_match_track(self):
with mock.patch("musicbrainzngs.search_recordings") as p:
p.return_value = {
"recording-list": [
{
"title": "foo",
"id": "bar",
"length": 42,
}
],
}
ti = list(self.mb.item_candidates(None, "hello", "there"))[0]
p.assert_called_with(artist="hello", recording="there", limit=5)
assert ti.title == "foo"
assert ti.track_id == "bar"
def test_candidates(self):
mbid = "d2a6f856-b553-40a0-ac54-a321e8e2da99"
with mock.patch("musicbrainzngs.search_releases") as sp:
sp.return_value = {
"release-list": [
{
"id": mbid,
}
],
}
with mock.patch("musicbrainzngs.get_release_by_id") as gp:
gp.return_value = {
"release": {
"title": "hi",
"id": mbid,
"status": "status",
"medium-list": [
{
"track-list": [
{
"id": "baz",
"recording": {
"title": "foo",
"id": "bar",
"length": 42,
},
"position": 9,
"number": "A1",
}
],
"position": 5,
}
],
"artist-credit": [
{
"artist": {
"name": "some-artist",
"id": "some-id",
},
}
],
"release-group": {
"id": "another-id",
},
}
}
ai = list(self.mb.candidates([], "hello", "there", False))[0]
sp.assert_called_with(
artist="hello", release="there", tracks="0", limit=5
)
gp.assert_called_with(mbid, mock.ANY)
assert ai.tracks[0].title == "foo"
assert ai.album == "hi"
def test_follow_pseudo_releases(self):
side_effect = [
{
@ -1061,8 +988,15 @@ class MBLibraryTest(MusicBrainzTestCase):
class TestMusicBrainzPlugin(PluginMixin):
plugin = "musicbrainz"
mbid = "d2a6f856-b553-40a0-ac54-a321e8e2da99"
RECORDING = {"title": "foo", "id": "bar", "length": 42}
@pytest.fixture
def mb_plugin(self, plugin_config):
def plugin_config(self):
return {}
@pytest.fixture
def mb(self, plugin_config):
self.config[self.plugin].set(plugin_config)
return musicbrainz.MusicBrainzPlugin()
@ -1070,17 +1004,17 @@ class TestMusicBrainzPlugin(PluginMixin):
@pytest.mark.parametrize(
"plugin_config,va_likely,expected_additional_criteria",
[
({}, False, {"artist": "artist"}),
({}, False, {"artist": "Artist "}),
({}, True, {"arid": "89ad4ac3-39f7-470e-963a-56509c546377"}),
(
{"extra_tags": ["label", "catalognum"]},
False,
{"artist": "artist", "label": "abc", "catno": "abc123"},
{"artist": "Artist ", "label": "abc", "catno": "ABC123"},
),
],
)
def test_get_album_criteria(
self, mb_plugin, va_likely, expected_additional_criteria
self, mb, va_likely, expected_additional_criteria
):
items = [
Item(catalognum="ABC 123", label="abc"),
@ -1088,10 +1022,57 @@ class TestMusicBrainzPlugin(PluginMixin):
Item(catalognum="ABC 123", label="def"),
]
assert mb_plugin.get_album_criteria(
items, "Artist ", " Album", va_likely
) == {
"release": "album",
assert mb.get_album_criteria(items, "Artist ", " Album", va_likely) == {
"release": " Album",
"tracks": str(len(items)),
**expected_additional_criteria,
}
def test_item_candidates(self, monkeypatch, mb):
monkeypatch.setattr(
"musicbrainzngs.search_recordings",
lambda *_, **__: {"recording-list": [self.RECORDING]},
)
candidates = list(mb.item_candidates(Item(), "hello", "there"))
assert len(candidates) == 1
assert candidates[0].track_id == self.RECORDING["id"]
def test_candidates(self, monkeypatch, mb):
monkeypatch.setattr(
"musicbrainzngs.search_releases",
lambda *_, **__: {"release-list": [{"id": self.mbid}]},
)
monkeypatch.setattr(
"musicbrainzngs.get_release_by_id",
lambda *_, **__: {
"release": {
"title": "hi",
"id": self.mbid,
"status": "status",
"medium-list": [
{
"track-list": [
{
"id": "baz",
"recording": self.RECORDING,
"position": 9,
"number": "A1",
}
],
"position": 5,
}
],
"artist-credit": [
{"artist": {"name": "some-artist", "id": "some-id"}}
],
"release-group": {"id": "another-id"},
}
},
)
candidates = list(mb.candidates([], "hello", "there", False))
assert len(candidates) == 1
assert candidates[0].tracks[0].track_id == self.RECORDING["id"]
assert candidates[0].album == "hi"