mbcollection: slight refactor

This commit is contained in:
Šarūnas Nejus 2025-12-24 03:13:37 +00:00
parent 92352574aa
commit b49d71cb69
No known key found for this signature in database
2 changed files with 118 additions and 116 deletions

View file

@ -18,7 +18,7 @@ from __future__ import annotations
import re
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, ClassVar
from requests.auth import HTTPDigestAuth
@ -29,15 +29,16 @@ from beets.ui import Subcommand
from ._utils.musicbrainz import MusicBrainzAPI
if TYPE_CHECKING:
from collections.abc import Iterator
from collections.abc import Iterable, Iterator
from requests import Response
from beets.importer import ImportSession, ImportTask
from beets.library import Album, Library
from ._typing import JSONDict
SUBMISSION_CHUNK_SIZE = 200
FETCH_CHUNK_SIZE = 100
UUID_REGEX = r"^[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}$"
UUID_PAT = re.compile(r"^[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}$")
@dataclass
@ -69,6 +70,9 @@ class MusicBrainzUserAPI(MusicBrainzAPI):
@dataclass
class MBCollection:
SUBMISSION_CHUNK_SIZE: ClassVar[int] = 200
FETCH_CHUNK_SIZE: ClassVar[int] = 100
data: JSONDict
mb_api: MusicBrainzUserAPI
@ -86,19 +90,19 @@ class MBCollection:
@property
def releases(self) -> list[JSONDict]:
offsets = list(range(0, self.release_count, FETCH_CHUNK_SIZE))
offsets = list(range(0, self.release_count, self.FETCH_CHUNK_SIZE))
return [r for offset in offsets for r in self.get_releases(offset)]
def get_releases(self, offset: int) -> list[JSONDict]:
return self.mb_api.get_json(
self.releases_url,
params={"limit": FETCH_CHUNK_SIZE, "offset": offset},
params={"limit": self.FETCH_CHUNK_SIZE, "offset": offset},
)["releases"]
@staticmethod
def get_id_chunks(id_list: list[str]) -> Iterator[list[str]]:
for i in range(0, len(id_list), SUBMISSION_CHUNK_SIZE):
yield id_list[i : i + SUBMISSION_CHUNK_SIZE]
@classmethod
def get_id_chunks(cls, id_list: list[str]) -> Iterator[list[str]]:
for i in range(0, len(id_list), cls.SUBMISSION_CHUNK_SIZE):
yield id_list[i : i + cls.SUBMISSION_CHUNK_SIZE]
def add_releases(self, releases: list[str]) -> None:
for chunk in self.get_id_chunks(releases):
@ -117,7 +121,7 @@ def submit_albums(collection: MBCollection, release_ids):
class MusicBrainzCollectionPlugin(BeetsPlugin):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.config.add(
{
@ -133,7 +137,8 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
def mb_api(self) -> MusicBrainzUserAPI:
return MusicBrainzUserAPI()
def _get_collection(self) -> MBCollection:
@cached_property
def collection(self) -> MBCollection:
if not (collections := self.mb_api.get_collections()):
raise ui.UserError("no collections exist for user")
@ -155,9 +160,6 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
return MBCollection(collection, self.mb_api)
def _get_albums_in_collection(self, collection: MBCollection) -> set[str]:
return {r["id"] for r in collection.releases}
def commands(self):
mbupdate = Subcommand("mbupdate", help="Update MusicBrainz collection")
mbupdate.parser.add_option(
@ -171,38 +173,33 @@ class MusicBrainzCollectionPlugin(BeetsPlugin):
mbupdate.func = self.update_collection
return [mbupdate]
def remove_missing(self, collection: MBCollection, lib_albums):
lib_ids = {x.mb_albumid for x in lib_albums}
albums_in_collection = self._get_albums_in_collection(collection)
collection.remove_releases(list(albums_in_collection - lib_ids))
def update_collection(self, lib, opts, args):
def update_collection(self, lib: Library, opts, args) -> None:
self.config.set_args(opts)
remove_missing = self.config["remove"].get(bool)
self.update_album_list(lib, lib.albums(), remove_missing)
def imported(self, session, task):
def imported(self, session: ImportSession, task: ImportTask) -> None:
"""Add each imported album to the collection."""
if task.is_album:
self.update_album_list(session.lib, [task.album])
self.update_album_list(
session.lib, [task.album], remove_missing=False
)
def update_album_list(self, lib, album_list, remove_missing=False):
def update_album_list(
self, lib: Library, albums: Iterable[Album], remove_missing: bool
) -> None:
"""Update the MusicBrainz collection from a list of Beets albums"""
collection = self._get_collection()
collection = self.collection
# Get a list of all the album IDs.
album_ids = []
for album in album_list:
aid = album.mb_albumid
if aid:
if re.match(UUID_REGEX, aid):
album_ids.append(aid)
else:
self._log.info("skipping invalid MBID: {}", aid)
album_ids = [id_ for a in albums if UUID_PAT.match(id_ := a.mb_albumid)]
# Submit to MusicBrainz.
self._log.info("Updating MusicBrainz collection {}...", collection.id)
submit_albums(collection, album_ids)
collection.add_releases(album_ids)
if remove_missing:
self.remove_missing(collection, lib.albums())
lib_ids = {x.mb_albumid for x in lib.albums()}
albums_in_collection = {r["id"] for r in collection.releases}
collection.remove_releases(list(albums_in_collection - lib_ids))
self._log.info("...MusicBrainz collection updated.")

View file

@ -5,47 +5,31 @@ from contextlib import nullcontext as does_not_raise
import pytest
from beets.library import Album
from beets.test.helper import ConfigMixin
from beets.test.helper import PluginMixin, TestHelper
from beets.ui import UserError
from beetsplug import mbcollection
@pytest.fixture
def collection():
return mbcollection.MBCollection(
{"id": str(uuid.uuid4()), "release-count": 150}
)
class TestMbCollectionAPI:
"""Tests for the low-level MusicBrainz API wrapper functions."""
def test_submit_albums_batches(self, collection, requests_mock):
# Chunk size is 200. Create 250 IDs.
ids = [f"id{i}" for i in range(250)]
requests_mock.put(
f"/ws/2/collection/{collection.id}/releases/{';'.join(ids[:200])}"
)
requests_mock.put(
f"/ws/2/collection/{collection.id}/releases/{';'.join(ids[200:])}"
)
mbcollection.submit_albums(collection, ids)
class TestMbCollectionPlugin(ConfigMixin):
class TestMbCollectionPlugin(PluginMixin, TestHelper):
"""Tests for the MusicBrainzCollectionPlugin class methods."""
plugin = "mbcollection"
COLLECTION_ID = str(uuid.uuid4())
@pytest.fixture
def plugin(self):
@pytest.fixture(autouse=True)
def setup_config(self):
self.config["musicbrainz"]["user"] = "testuser"
self.config["musicbrainz"]["pass"] = "testpass"
self.config["mbcollection"]["collection"] = self.COLLECTION_ID
plugin = mbcollection.MusicBrainzCollectionPlugin()
plugin.config["collection"] = self.COLLECTION_ID
return plugin
@pytest.fixture(autouse=True)
def helper(self):
self.setup_beets()
yield self
self.teardown_beets()
@pytest.mark.parametrize(
"user_collections,expectation",
@ -69,69 +53,90 @@ class TestMbCollectionPlugin(ConfigMixin):
does_not_raise(),
),
],
ids=["no collections", "no release collections", "invalid ID", "valid"],
)
def test_get_collection_validation(
self, plugin, requests_mock, user_collections, expectation
self, requests_mock, user_collections, expectation
):
requests_mock.get(
"/ws/2/collection", json={"collections": user_collections}
)
with expectation:
plugin._get_collection()
mbcollection.MusicBrainzCollectionPlugin().collection
def test_get_albums_in_collection_pagination(
self, plugin, requests_mock, collection
):
releases = [{"id": str(i)} for i in range(collection.release_count)]
def test_mbupdate(self, helper, requests_mock, monkeypatch):
"""Verify mbupdate sync of a MusicBrainz collection with the library.
This test ensures that the command:
- fetches collection releases using paginated requests,
- submits releases that exist locally but are missing from the remote
collection
- and removes releases from the remote collection that are not in the
local library. Small chunk sizes are forced to exercise pagination and
batching logic.
"""
for mb_albumid in [
# already present in remote collection
"in_collection1",
"in_collection2",
# two new albums not in remote collection
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
]:
helper.lib.add(Album(mb_albumid=mb_albumid))
# The relevant collection
requests_mock.get(
re.compile(
rf".*/ws/2/collection/{collection.id}/releases\b.*&offset=0.*"
),
json={"releases": releases[:100]},
)
requests_mock.get(
re.compile(
rf".*/ws/2/collection/{collection.id}/releases\b.*&offset=100.*"
),
json={"releases": releases[100:]},
"/ws/2/collection",
json={
"collections": [
{
"id": self.COLLECTION_ID,
"entity-type": "release",
"release-count": 3,
}
]
},
)
plugin._get_albums_in_collection(collection)
def test_update_album_list_filtering(self, plugin, collection, monkeypatch):
ids_submitted = []
def mock_submit(_, album_ids):
ids_submitted.extend(album_ids)
monkeypatch.setattr("beetsplug.mbcollection.submit_albums", mock_submit)
monkeypatch.setattr(plugin, "_get_collection", lambda: collection)
albums = [
Album(mb_albumid="invalid-id"),
Album(mb_albumid="00000000-0000-0000-0000-000000000001"),
]
plugin.update_album_list(None, albums)
# Behavior: only valid UUID was submitted
assert ids_submitted == ["00000000-0000-0000-0000-000000000001"]
def test_remove_missing(
self, plugin, monkeypatch, requests_mock, collection
):
removed_ids = []
def mock_remove(_, chunk):
removed_ids.extend(chunk)
requests_mock.delete(
re.compile(rf".*/ws/2/collection/{collection.id}/releases/r3")
)
collection_releases = f"/ws/2/collection/{self.COLLECTION_ID}/releases"
# Force small fetch chunk to require multiple paged requests.
monkeypatch.setattr(
plugin, "_get_albums_in_collection", lambda _: {"r1", "r2", "r3"}
"beetsplug.mbcollection.MBCollection.FETCH_CHUNK_SIZE", 2
)
# 3 releases are fetched in two pages.
requests_mock.get(
re.compile(rf".*{collection_releases}\b.*&offset=0.*"),
json={
"releases": [{"id": "in_collection1"}, {"id": "not_in_library"}]
},
)
requests_mock.get(
re.compile(rf".*{collection_releases}\b.*&offset=2.*"),
json={"releases": [{"id": "in_collection2"}]},
)
lib_albums = [Album(mb_albumid="r1"), Album(mb_albumid="r2")]
# Force small submission chunk
monkeypatch.setattr(
"beetsplug.mbcollection.MBCollection.SUBMISSION_CHUNK_SIZE", 1
)
# so that releases are added using two requests
requests_mock.put(
re.compile(
rf".*{collection_releases}/00000000-0000-0000-0000-000000000001"
)
)
requests_mock.put(
re.compile(
rf".*{collection_releases}/00000000-0000-0000-0000-000000000002"
)
)
# and finally, one release is removed
requests_mock.delete(
re.compile(rf".*{collection_releases}/not_in_library")
)
plugin.remove_missing(collection, lib_albums)
helper.run_command("mbupdate", "--remove")
assert requests_mock.call_count == 6