Reimplement mbpseudo plugin inheriting from MusicBrainzPlugin

This commit is contained in:
asardaes 2025-10-05 22:00:46 -06:00
parent 79f691832c
commit ab5705f444
2 changed files with 99 additions and 274 deletions

View file

@ -14,82 +14,108 @@
"""Adds pseudo-releases from MusicBrainz as candidates during import."""
import itertools
from copy import deepcopy
from typing import Any, Iterable, Optional, Sequence
import musicbrainzngs
from typing_extensions import override
import beetsplug.musicbrainz as mbplugin # avoid implicit loading of main plugin
from beets.autotag import AlbumInfo
from beets.autotag.distance import Distance, distance
from beets.autotag.hooks import TrackInfo
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.autotag.match import assign_items
from beets.library import Item
from beets.metadata_plugins import MetadataSourcePlugin
from beets.plugins import find_plugins
from beets.util.id_extractors import extract_release_id
from beetsplug._typing import JSONDict
from beetsplug.musicbrainz import (
RELEASE_INCLUDES,
MusicBrainzPlugin,
_merge_pseudo_and_actual_album,
)
_STATUS_PSEUDO = "Pseudo-Release"
class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.config.add({"scripts": [], "include_official_releases": False})
class MusicBrainzPseudoReleasePlugin(MusicBrainzPlugin):
def __init__(self) -> None:
super().__init__()
self.config.add({"scripts": []})
self._scripts = self.config["scripts"].as_str_seq()
self._mb = mbplugin.MusicBrainzPlugin()
self._pseudo_release_ids: dict[str, list[str]] = {}
self._intercepted_candidates: dict[str, AlbumInfo] = {}
self._mb_plugin_loaded_before = True
self.register_listener("pluginload", self._on_plugins_loaded)
self.register_listener("mb_album_extract", self._intercept_mb_releases)
self.register_listener(
"albuminfo_received", self._intercept_mb_candidates
)
self._log.debug("Desired scripts: {0}", self._scripts)
def _on_plugins_loaded(self):
mb_index = None
self_index = -1
for i, plugin in enumerate(find_plugins()):
if isinstance(plugin, mbplugin.MusicBrainzPlugin):
mb_index = i
elif isinstance(plugin, MusicBrainzPseudoReleasePlugin):
self_index = i
self.register_listener("pluginload", self._on_plugins_loaded)
if mb_index and self_index < mb_index:
self._mb_plugin_loaded_before = False
self._log.warning(
"The mbpseudo plugin was loaded before the musicbrainz plugin"
", this will result in redundant network calls"
# noinspection PyMethodMayBeStatic
def _on_plugins_loaded(self):
for plugin in find_plugins():
if isinstance(plugin, MusicBrainzPlugin) and not isinstance(
plugin, MusicBrainzPseudoReleasePlugin
):
raise RuntimeError(
"The musicbrainz plugin should not be enabled together with"
" the mbpseudo plugin"
)
def _intercept_mb_releases(self, data: JSONDict):
album_id = data["id"] if "id" in data else None
if (
self._has_desired_script(data)
or not isinstance(album_id, str)
or album_id in self._pseudo_release_ids
@override
def candidates(
self,
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
) -> Iterable[AlbumInfo]:
if len(self._scripts) == 0:
yield from super().candidates(items, artist, album, va_likely)
else:
for album_info in super().candidates(
items, artist, album, va_likely
):
return None
if isinstance(album_info, PseudoAlbumInfo):
yield album_info.get_official_release()
self._log.debug(
"Using {0} release for distance calculations for album {1}",
album_info.determine_best_ref(items),
album_info.album_id,
)
pseudo_release_ids = [
yield album_info
@override
def album_info(self, release: JSONDict) -> AlbumInfo:
official_release = super().album_info(release)
official_release.data_source = "MusicBrainz"
if release.get("status") == _STATUS_PSEUDO:
return official_release
elif pseudo_release_ids := self._intercept_mb_release(release):
album_id = self._extract_id(pseudo_release_ids[0])
raw_pseudo_release = musicbrainzngs.get_release_by_id(
album_id, RELEASE_INCLUDES
)
pseudo_release = super().album_info(raw_pseudo_release["release"])
return PseudoAlbumInfo(
pseudo_release=_merge_pseudo_and_actual_album(
pseudo_release, official_release
),
official_release=official_release,
data_source=self.data_source,
)
else:
return official_release
def _intercept_mb_release(self, data: JSONDict) -> list[str]:
album_id = data["id"] if "id" in data else None
if self._has_desired_script(data) or not isinstance(album_id, str):
return []
return [
pr_id
for rel in data.get("release-relation-list", [])
if (pr_id := self._wanted_pseudo_release_id(rel)) is not None
if (pr_id := self._wanted_pseudo_release_id(album_id, rel))
is not None
]
if len(pseudo_release_ids) > 0:
self._log.debug("Intercepted release with album id {0}", album_id)
self._pseudo_release_ids[album_id] = pseudo_release_ids
return None
def _has_desired_script(self, release: JSONDict) -> bool:
if len(self._scripts) == 0:
return False
@ -100,6 +126,7 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin):
def _wanted_pseudo_release_id(
self,
album_id: str,
relation: JSONDict,
) -> Optional[str]:
if (
@ -112,207 +139,15 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin):
release = relation["release"]
if "id" in release and self._has_desired_script(release):
self._log.debug(
"Adding pseudo-release {0} for main release {1}",
release["id"],
album_id,
)
return release["id"]
else:
return None
def _intercept_mb_candidates(self, info: AlbumInfo):
if (
not isinstance(info, PseudoAlbumInfo)
and info.album_id in self._pseudo_release_ids
and info.album_id not in self._intercepted_candidates
):
self._log.debug(
"Intercepted candidate with album id {0.album_id}", info
)
self._intercepted_candidates[info.album_id] = info.copy()
elif info.get("albumstatus", "") == _STATUS_PSEUDO:
self._purge_intercepted_pseudo_releases(info)
def candidates(
self,
items: Sequence[Item],
artist: str,
album: str,
va_likely: bool,
) -> Iterable[AlbumInfo]:
"""Even though a candidate might have extra and/or missing tracks, the set of
paths from the items that were actually matched (which are stored in the
corresponding ``mapping``) must be a subset of the set of paths from the input
items. This helps us figure out which intercepted candidate might be relevant
for the items we get in this call even if other candidates have been
concurrently intercepted as well.
"""
if len(self._scripts) == 0:
return []
try:
item_paths = {item.path for item in items}
official_release_id = next(
key
for key, info in self._intercepted_candidates.items()
if "mapping" in info
and all(
mapping_key.path in item_paths
for mapping_key in info.mapping.keys()
)
)
pseudo_release_ids = self._pseudo_release_ids[official_release_id]
self._log.debug(
"Processing pseudo-releases for {0}: {1}",
official_release_id,
pseudo_release_ids,
)
except StopIteration:
official_release_id = None
pseudo_release_ids = []
if official_release_id is not None:
pseudo_releases = self._get_pseudo_releases(
items, official_release_id, pseudo_release_ids
)
del self._pseudo_release_ids[official_release_id]
del self._intercepted_candidates[official_release_id]
return pseudo_releases
if (
any(
isinstance(plugin, mbplugin.MusicBrainzPlugin)
for plugin in find_plugins()
)
and self._mb_plugin_loaded_before
):
self._log.debug(
"No releases found after main MusicBrainz plugin executed"
)
return []
# musicbrainz plugin isn't enabled
self._log.debug("Searching for official releases")
try:
existing_album_id = next(
item.mb_albumid for item in items if item.mb_albumid
)
existing_album_info = self._mb.album_for_id(existing_album_id)
if not isinstance(existing_album_info, AlbumInfo):
official_candidates = list(
self._mb.candidates(items, artist, album, va_likely)
)
else:
official_candidates = [existing_album_info]
except StopIteration:
official_candidates = list(
self._mb.candidates(items, artist, album, va_likely)
)
recursion = self._mb_plugin_simulation_matched(
items, official_candidates
)
if recursion and not self.config.get().get("include_official_releases"):
official_candidates = []
self._log.debug(
"Emitting {0} official match(es)", len(official_candidates)
)
if recursion:
self._log.debug("Matches found after search")
return itertools.chain(
self.candidates(items, artist, album, va_likely),
iter(official_candidates),
)
else:
return iter(official_candidates)
def _get_pseudo_releases(
self,
items: Sequence[Item],
official_release_id: str,
pseudo_release_ids: list[str],
) -> list[AlbumInfo]:
pseudo_releases: list[AlbumInfo] = []
for pr_id in pseudo_release_ids:
if match := self._mb.album_for_id(pr_id):
pseudo_album_info = PseudoAlbumInfo(
pseudo_release=match,
official_release=self._intercepted_candidates[
official_release_id
],
data_source=self.data_source,
)
self._log.debug(
"Using {0} release for distance calculations for album {1}",
pseudo_album_info.determine_best_ref(items),
pr_id,
)
pseudo_releases.append(pseudo_album_info)
return pseudo_releases
def _mb_plugin_simulation_matched(
self,
items: Sequence[Item],
official_candidates: list[AlbumInfo],
) -> bool:
"""Simulate how we would have been called if the MusicBrainz plugin had actually
executed.
At this point we already called ``self._mb.candidates()``,
which emits the ``mb_album_extract`` events,
so now we simulate:
1. Intercepting the ``AlbumInfo`` candidate that would have come in the
``albuminfo_received`` event.
2. Intercepting the distance calculation of the aforementioned candidate to
store its mapping.
If the official candidate is already a pseudo-release, we clean up internal
state. This is needed because the MusicBrainz plugin emits official releases
even if it receives a pseudo-release as input, so the chain would actually be:
pseudo-release input ->
official release with pseudo emitted ->
intercepted ->
pseudo-release resolved (again)
To avoid resolving again in the last step, we remove the pseudo-release's id.
"""
matched = False
for official_candidate in official_candidates:
if official_candidate.album_id in self._pseudo_release_ids:
self._intercept_mb_candidates(official_candidate)
if official_candidate.album_id in self._intercepted_candidates:
intercepted = self._intercepted_candidates[
official_candidate.album_id
]
intercepted.mapping, _, _ = assign_items(
items, intercepted.tracks
)
matched = True
if official_candidate.get("albumstatus", "") == _STATUS_PSEUDO:
self._purge_intercepted_pseudo_releases(official_candidate)
return matched
def _purge_intercepted_pseudo_releases(self, official_candidate: AlbumInfo):
rm_keys = [
album_id
for album_id, pseudo_album_ids in self._pseudo_release_ids.items()
if official_candidate.album_id in pseudo_album_ids
]
if rm_keys:
self._log.debug(
"No need to resolve {0}, removing",
rm_keys,
)
for k in rm_keys:
del self._pseudo_release_ids[k]
@override
def album_distance(
self,
@ -327,16 +162,6 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin):
instance of ``ImmutableMapping``, we know at this point that all penalties from
the normal auto-tagging flow have been applied, so we can switch to the metadata
from the pseudo-release for the final proposal.
Other instances of ``AlbumInfo`` must come from other plugins, so we just check
if we intercepted them as candidates with pseudo-releases and store their
``mapping``. This is needed because the real listeners we use never expose
information from the input ``Item``s, so we intercept that here.
The paths from the items are used to figure out which pseudo-releases should be
provided for them, which is specially important for concurrent stage execution
where we might have already intercepted releases from different import tasks
when we run.
"""
if isinstance(album_info, PseudoAlbumInfo):
@ -349,25 +174,11 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin):
new_mappings, _, _ = assign_items(items, album_info.tracks)
mapping.update(new_mappings)
elif album_info.album_id in self._intercepted_candidates:
self._log.debug("Storing mapping for {0.album_id}", album_info)
self._intercepted_candidates[album_info.album_id].mapping = mapping
return super().album_distance(items, album_info, mapping)
def album_for_id(self, album_id: str) -> Optional[AlbumInfo]:
pass
def track_for_id(self, track_id: str) -> Optional[TrackInfo]:
pass
def item_candidates(
self,
item: Item,
artist: str,
title: str,
) -> Iterable[TrackInfo]:
return []
@override
def _extract_id(self, url: str) -> Optional[str]:
return extract_release_id("MusicBrainz", url)
class PseudoAlbumInfo(AlbumInfo):
@ -398,6 +209,9 @@ class PseudoAlbumInfo(AlbumInfo):
if k not in kwargs:
self[k] = v
def get_official_release(self) -> AlbumInfo:
return self.__dict__["_official_release"]
def determine_best_ref(self, items: Sequence[Item]) -> str:
self.use_pseudo_as_ref()
pseudo_dist = self._compute_distance(items)
@ -429,6 +243,17 @@ class PseudoAlbumInfo(AlbumInfo):
else:
return self.__dict__["_official_release"].__getattr__(attr)
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
result.__dict__.update(self.__dict__)
for k, v in self.items():
result[k] = deepcopy(v, memo)
return result
class ImmutableMapping(dict[Item, TrackInfo]):
def __init__(self, *args, **kwargs):

View file

@ -323,7 +323,7 @@ def _find_actual_release_from_pseudo_release(
def _merge_pseudo_and_actual_album(
pseudo: beets.autotag.hooks.AlbumInfo, actual: beets.autotag.hooks.AlbumInfo
) -> beets.autotag.hooks.AlbumInfo | None:
) -> beets.autotag.hooks.AlbumInfo:
"""
Merges a pseudo release with its actual release.