Migrate mbcollection to use MusicBrainzAPI

This commit is contained in:
Šarūnas Nejus 2025-12-24 03:12:09 +00:00
parent 143cd70e2f
commit 92352574aa
No known key found for this signature in database
9 changed files with 206 additions and 157 deletions

View file

@ -13,6 +13,8 @@ from beets import config, logging
from .requests import RequestHandler, TimeoutAndRetrySession
if TYPE_CHECKING:
from requests import Response
from .._typing import JSONDict
log = logging.getLogger(__name__)
@ -49,9 +51,19 @@ class MusicBrainzAPI(RequestHandler):
/ mb_config["ratelimit_interval"].as_number()
)
@cached_property
def api_root(self) -> str:
return f"{self.api_host}/ws/2"
def create_session(self) -> LimiterTimeoutSession:
return LimiterTimeoutSession(per_second=self.rate_limit)
def request(self, *args, **kwargs) -> Response:
"""Ensure all requests specify JSON response format by default."""
kwargs.setdefault("params", {})
kwargs["params"]["fmt"] = "json"
return super().request(*args, **kwargs)
def get_entity(
self, entity: str, includes: list[str] | None = None, **kwargs
) -> JSONDict:
@ -59,10 +71,7 @@ class MusicBrainzAPI(RequestHandler):
kwargs["inc"] = "+".join(includes)
return self._group_relations(
self.get_json(
f"{self.api_host}/ws/2/{entity}",
params={**kwargs, "fmt": "json"},
)
self.get_json(f"{self.api_root}/{entity}", params=kwargs)
)
def search_entity(

View file

@ -155,6 +155,7 @@ class RequestHandler:
except requests.exceptions.HTTPError as e:
if beets_error := self.status_to_error(e.response.status_code):
raise beets_error(response=e.response) from e
raise
def request(self, *args, **kwargs) -> requests.Response:
@ -170,6 +171,14 @@ class RequestHandler:
"""Perform HTTP GET request with automatic error handling."""
return self.request("get", *args, **kwargs)
def put(self, *args, **kwargs) -> requests.Response:
"""Perform HTTP PUT request with automatic error handling."""
return self.request("put", *args, **kwargs)
def delete(self, *args, **kwargs) -> requests.Response:
"""Perform HTTP DELETE request with automatic error handling."""
return self.request("delete", *args, **kwargs)
def get_json(self, *args, **kwargs):
"""Fetch and parse JSON data from an HTTP endpoint."""
return self.get(*args, **kwargs).json()

View file

@ -13,48 +13,112 @@
# included in all copies or substantial portions of the Software.
from __future__ import annotations
import re
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING
import musicbrainzngs
from requests.auth import HTTPDigestAuth
from beets import config, ui
from beets import __version__, config, ui
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand
from ._utils.musicbrainz import MusicBrainzAPI
if TYPE_CHECKING:
from collections.abc import Iterator
from requests import Response
from ._typing import JSONDict
SUBMISSION_CHUNK_SIZE = 200
FETCH_CHUNK_SIZE = 100
UUID_REGEX = r"^[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}$"
def mb_call(func, *args, **kwargs):
"""Call a MusicBrainz API function and catch exceptions."""
try:
return func(*args, **kwargs)
except musicbrainzngs.AuthenticationError:
raise ui.UserError("authentication with MusicBrainz failed")
except (musicbrainzngs.ResponseError, musicbrainzngs.NetworkError) as exc:
raise ui.UserError(f"MusicBrainz API error: {exc}")
except musicbrainzngs.UsageError:
raise ui.UserError("MusicBrainz credentials missing")
@dataclass
class MusicBrainzUserAPI(MusicBrainzAPI):
auth: HTTPDigestAuth = field(init=False)
@cached_property
def user(self) -> str:
return config["musicbrainz"]["user"].as_str()
def __post_init__(self) -> None:
super().__post_init__()
config["musicbrainz"]["pass"].redact = True
self.auth = HTTPDigestAuth(
self.user, config["musicbrainz"]["pass"].as_str()
)
def request(self, *args, **kwargs) -> Response:
kwargs.setdefault("params", {})
kwargs["params"]["client"] = f"beets-{__version__}"
kwargs["auth"] = self.auth
return super().request(*args, **kwargs)
def get_collections(self) -> list[JSONDict]:
return self.get_entity(
"collection", editor=self.user, includes=["user-collections"]
).get("collections", [])
def submit_albums(collection_id, release_ids):
@dataclass
class MBCollection:
data: JSONDict
mb_api: MusicBrainzUserAPI
@property
def id(self) -> str:
return self.data["id"]
@property
def release_count(self) -> int:
return self.data["release-count"]
@property
def releases_url(self) -> str:
return f"{self.mb_api.api_root}/collection/{self.id}/releases"
@property
def releases(self) -> list[JSONDict]:
offsets = list(range(0, self.release_count, FETCH_CHUNK_SIZE))
return [r for offset in offsets for r in self.get_releases(offset)]
def get_releases(self, offset: int) -> list[JSONDict]:
return self.mb_api.get_json(
self.releases_url,
params={"limit": FETCH_CHUNK_SIZE, "offset": offset},
)["releases"]
@staticmethod
def get_id_chunks(id_list: list[str]) -> Iterator[list[str]]:
for i in range(0, len(id_list), SUBMISSION_CHUNK_SIZE):
yield id_list[i : i + SUBMISSION_CHUNK_SIZE]
def add_releases(self, releases: list[str]) -> None:
for chunk in self.get_id_chunks(releases):
self.mb_api.put(f"{self.releases_url}/{'%3B'.join(chunk)}")
def remove_releases(self, releases: list[str]) -> None:
for chunk in self.get_id_chunks(releases):
self.mb_api.delete(f"{self.releases_url}/{'%3B'.join(chunk)}")
def submit_albums(collection: MBCollection, release_ids):
"""Add all of the release IDs to the indicated collection. Multiple
requests are made if there are many release IDs to submit.
"""
for i in range(0, len(release_ids), SUBMISSION_CHUNK_SIZE):
chunk = release_ids[i : i + SUBMISSION_CHUNK_SIZE]
mb_call(musicbrainzngs.add_releases_to_collection, collection_id, chunk)
collection.add_releases(release_ids)
class MusicBrainzCollectionPlugin(BeetsPlugin):
def __init__(self):
super().__init__()
config["musicbrainz"]["pass"].redact = True
musicbrainzngs.auth(
config["musicbrainz"]["user"].as_str(),
config["musicbrainz"]["pass"].as_str(),
)
self.config.add(
{
"auto": False,
@ -65,47 +129,34 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
if self.config["auto"]:
self.import_stages = [self.imported]
def _get_collection(self):
collections = mb_call(musicbrainzngs.get_collections)
if not collections["collection-list"]:
@cached_property
def mb_api(self) -> MusicBrainzUserAPI:
return MusicBrainzUserAPI()
def _get_collection(self) -> MBCollection:
if not (collections := self.mb_api.get_collections()):
raise ui.UserError("no collections exist for user")
# Get all release collection IDs, avoiding event collections
collection_ids = [
x["id"]
for x in collections["collection-list"]
if x["entity-type"] == "release"
]
if not collection_ids:
if not (
collection_by_id := {
c["id"]: c for c in collections if c["entity-type"] == "release"
}
):
raise ui.UserError("No release collection found.")
# Check that the collection exists so we can present a nice error
collection = self.config["collection"].as_str()
if collection:
if collection not in collection_ids:
raise ui.UserError(f"invalid collection ID: {collection}")
return collection
if collection_id := self.config["collection"].as_str():
if not (collection := collection_by_id.get(collection_id)):
raise ui.UserError(f"invalid collection ID: {collection_id}")
else:
# No specified collection. Just return the first collection ID
collection = next(iter(collection_by_id.values()))
# No specified collection. Just return the first collection ID
return collection_ids[0]
return MBCollection(collection, self.mb_api)
def _get_albums_in_collection(self, id):
def _fetch(offset):
res = mb_call(
musicbrainzngs.get_releases_in_collection,
id,
limit=FETCH_CHUNK_SIZE,
offset=offset,
)["collection"]
return [x["id"] for x in res["release-list"]], res["release-count"]
offset = 0
albums_in_collection, release_count = _fetch(offset)
for i in range(FETCH_CHUNK_SIZE, release_count, FETCH_CHUNK_SIZE):
offset += FETCH_CHUNK_SIZE
albums_in_collection += _fetch(offset)[0]
return albums_in_collection
def _get_albums_in_collection(self, collection: MBCollection) -> set[str]:
return {r["id"] for r in collection.releases}
def commands(self):
mbupdate = Subcommand("mbupdate", help="Update MusicBrainz collection")
@ -120,17 +171,10 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
mbupdate.func = self.update_collection
return [mbupdate]
def remove_missing(self, collection_id, lib_albums):
def remove_missing(self, collection: MBCollection, lib_albums):
lib_ids = {x.mb_albumid for x in lib_albums}
albums_in_collection = self._get_albums_in_collection(collection_id)
remove_me = list(set(albums_in_collection) - lib_ids)
for i in range(0, len(remove_me), FETCH_CHUNK_SIZE):
chunk = remove_me[i : i + FETCH_CHUNK_SIZE]
mb_call(
musicbrainzngs.remove_releases_from_collection,
collection_id,
chunk,
)
albums_in_collection = self._get_albums_in_collection(collection)
collection.remove_releases(list(albums_in_collection - lib_ids))
def update_collection(self, lib, opts, args):
self.config.set_args(opts)
@ -144,7 +188,7 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
def update_album_list(self, lib, album_list, remove_missing=False):
"""Update the MusicBrainz collection from a list of Beets albums"""
collection_id = self._get_collection()
collection = self._get_collection()
# Get a list of all the album IDs.
album_ids = []
@ -157,8 +201,8 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
self._log.info("skipping invalid MBID: {}", aid)
# Submit to MusicBrainz.
self._log.info("Updating MusicBrainz collection {}...", collection_id)
submit_albums(collection_id, album_ids)
self._log.info("Updating MusicBrainz collection {}...", collection.id)
submit_albums(collection, album_ids)
if remove_missing:
self.remove_missing(collection_id, lib.albums())
self.remove_missing(collection, lib.albums())
self._log.info("...MusicBrainz collection updated.")

View file

@ -6,18 +6,9 @@ maintain your `music collection`_ list there.
.. _music collection: https://musicbrainz.org/doc/Collections
Installation
------------
To use the ``mbcollection`` plugin, first enable it in your configuration (see
:ref:`using-plugins`). Then, install ``beets`` with ``mbcollection`` extra
.. code-block:: bash
pip install "beets[mbcollection]"
Then, add your MusicBrainz username and password to your :doc:`configuration
file </reference/config>` under a ``musicbrainz`` section:
To begin, just enable the ``mbcollection`` plugin in your configuration (see
:ref:`using-plugins`). Then, add your MusicBrainz username and password to your
:doc:`configuration file </reference/config>` under a ``musicbrainz`` section:
::

14
poetry.lock generated
View file

@ -1818,17 +1818,6 @@ check = ["check-manifest", "flake8", "flake8-black", "isort (>=5.0.3)", "pygment
test = ["coverage[toml] (>=5.2)", "coveralls (>=2.1.1)", "hypothesis", "pyannotate", "pytest", "pytest-cov"]
type = ["mypy", "mypy-extensions"]
[[package]]
name = "musicbrainzngs"
version = "0.7.1"
description = "Python bindings for the MusicBrainz NGS and the Cover Art Archive webservices"
optional = true
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
{file = "musicbrainzngs-0.7.1-py2.py3-none-any.whl", hash = "sha256:e841a8f975104c0a72290b09f59326050194081a5ae62ee512f41915090e1a10"},
{file = "musicbrainzngs-0.7.1.tar.gz", hash = "sha256:ab1c0100fd0b305852e65f2ed4113c6de12e68afd55186987b8ed97e0f98e627"},
]
[[package]]
name = "mutagen"
version = "1.47.0"
@ -4181,7 +4170,6 @@ kodiupdate = ["requests"]
lastgenre = ["pylast"]
lastimport = ["pylast"]
lyrics = ["beautifulsoup4", "langdetect", "requests"]
mbcollection = ["musicbrainzngs"]
metasync = ["dbus-python"]
mpdstats = ["python-mpd2"]
plexupdate = ["requests"]
@ -4196,4 +4184,4 @@ web = ["flask", "flask-cors"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<4"
content-hash = "a18c3047f4f395841e785ed146af3505974839ab23eccdde34a7738e216f0277"
content-hash = "cd53b70a9cd746a88e80e04e67e0b010a0e5b87f745be94e901a9fd08619771a"

View file

@ -69,7 +69,6 @@ scipy = [ # for librosa
{ python = "<3.13", version = ">=1.13.1", optional = true },
{ python = ">=3.13", version = ">=1.16.1", optional = true },
]
musicbrainzngs = { version = ">=0.4", optional = true }
numba = [ # for librosa
{ python = "<3.13", version = ">=0.60", optional = true },
{ python = ">=3.13", version = ">=0.62.1", optional = true },
@ -164,7 +163,6 @@ kodiupdate = ["requests"]
lastgenre = ["pylast"]
lastimport = ["pylast"]
lyrics = ["beautifulsoup4", "langdetect", "requests"]
mbcollection = ["musicbrainzngs"]
metasync = ["dbus-python"]
mpdstats = ["python-mpd2"]
plexupdate = ["requests"]

22
test/plugins/conftest.py Normal file
View file

@ -0,0 +1,22 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
import requests
if TYPE_CHECKING:
from requests_mock import Mocker
@pytest.fixture
def requests_mock(requests_mock, monkeypatch) -> Mocker:
"""Use plain session wherever MB requests are mocked.
This avoids rate limiting requests to speed up tests.
"""
monkeypatch.setattr(
"beetsplug._utils.musicbrainz.MusicBrainzAPI.create_session",
lambda _: requests.Session(),
)
return requests_mock

View file

@ -1,3 +1,4 @@
import re
import uuid
from contextlib import nullcontext as does_not_raise
@ -9,27 +10,27 @@ from beets.ui import UserError
from beetsplug import mbcollection
@pytest.fixture
def collection():
return mbcollection.MBCollection(
{"id": str(uuid.uuid4()), "release-count": 150}
)
class TestMbCollectionAPI:
"""Tests for the low-level MusicBrainz API wrapper functions."""
def test_submit_albums_batches(self, monkeypatch):
chunks_received = []
def mock_add(collection_id, chunk):
chunks_received.append(chunk)
monkeypatch.setattr(
"musicbrainzngs.add_releases_to_collection", mock_add
)
def test_submit_albums_batches(self, collection, requests_mock):
# Chunk size is 200. Create 250 IDs.
ids = [f"id{i}" for i in range(250)]
mbcollection.submit_albums("coll_id", ids)
requests_mock.put(
f"/ws/2/collection/{collection.id}/releases/{';'.join(ids[:200])}"
)
requests_mock.put(
f"/ws/2/collection/{collection.id}/releases/{';'.join(ids[200:])}"
)
# Verify behavioral outcome: 2 batches were sent
assert len(chunks_received) == 2
assert len(chunks_received[0]) == 200
assert len(chunks_received[1]) == 50
mbcollection.submit_albums(collection, ids)
class TestMbCollectionPlugin(ConfigMixin):
@ -38,10 +39,7 @@ class TestMbCollectionPlugin(ConfigMixin):
COLLECTION_ID = str(uuid.uuid4())
@pytest.fixture
def plugin(self, monkeypatch):
# Prevent actual auth call during init
monkeypatch.setattr("musicbrainzngs.auth", lambda *a, **k: None)
def plugin(self):
self.config["musicbrainz"]["user"] = "testuser"
self.config["musicbrainz"]["pass"] = "testpass"
@ -73,50 +71,42 @@ class TestMbCollectionPlugin(ConfigMixin):
],
)
def test_get_collection_validation(
self, plugin, monkeypatch, user_collections, expectation
self, plugin, requests_mock, user_collections, expectation
):
mock_resp = {"collection-list": user_collections}
monkeypatch.setattr("musicbrainzngs.get_collections", lambda: mock_resp)
requests_mock.get(
"/ws/2/collection", json={"collections": user_collections}
)
with expectation:
plugin._get_collection()
def test_get_albums_in_collection_pagination(self, plugin, monkeypatch):
fetched_offsets = []
def mock_get_releases(collection_id, limit, offset):
fetched_offsets.append(offset)
count = 150
# Return IDs based on offset to verify order/content
start = offset
end = min(offset + limit, count)
return {
"collection": {
"release-count": count,
"release-list": [
{"id": f"r{i}"} for i in range(start, end)
],
}
}
monkeypatch.setattr(
"musicbrainzngs.get_releases_in_collection", mock_get_releases
def test_get_albums_in_collection_pagination(
self, plugin, requests_mock, collection
):
releases = [{"id": str(i)} for i in range(collection.release_count)]
requests_mock.get(
re.compile(
rf".*/ws/2/collection/{collection.id}/releases\b.*&offset=0.*"
),
json={"releases": releases[:100]},
)
requests_mock.get(
re.compile(
rf".*/ws/2/collection/{collection.id}/releases\b.*&offset=100.*"
),
json={"releases": releases[100:]},
)
albums = plugin._get_albums_in_collection("cid")
assert len(albums) == 150
assert fetched_offsets == [0, 100]
assert albums[0] == "r0"
assert albums[149] == "r149"
plugin._get_albums_in_collection(collection)
def test_update_album_list_filtering(self, plugin, monkeypatch):
def test_update_album_list_filtering(self, plugin, collection, monkeypatch):
ids_submitted = []
def mock_submit(_, album_ids):
ids_submitted.extend(album_ids)
monkeypatch.setattr("beetsplug.mbcollection.submit_albums", mock_submit)
monkeypatch.setattr(plugin, "_get_collection", lambda: "cid")
monkeypatch.setattr(plugin, "_get_collection", lambda: collection)
albums = [
Album(mb_albumid="invalid-id"),
@ -127,23 +117,21 @@ class TestMbCollectionPlugin(ConfigMixin):
# Behavior: only valid UUID was submitted
assert ids_submitted == ["00000000-0000-0000-0000-000000000001"]
def test_remove_missing(self, plugin, monkeypatch):
def test_remove_missing(
self, plugin, monkeypatch, requests_mock, collection
):
removed_ids = []
def mock_remove(_, chunk):
removed_ids.extend(chunk)
monkeypatch.setattr(
"musicbrainzngs.remove_releases_from_collection", mock_remove
requests_mock.delete(
re.compile(rf".*/ws/2/collection/{collection.id}/releases/r3")
)
monkeypatch.setattr(
plugin,
"_get_albums_in_collection",
lambda _: ["r1", "r2", "r3"],
plugin, "_get_albums_in_collection", lambda _: {"r1", "r2", "r3"}
)
lib_albums = [Album(mb_albumid="r1"), Album(mb_albumid="r2")]
plugin.remove_missing("cid", lib_albums)
# Behavior: only 'r3' (missing from library) was removed from collection
assert removed_ids == ["r3"]
plugin.remove_missing(collection, lib_albums)

View file