From ab5705f444a4be8f8bf0d4910dd52c7d6322f173 Mon Sep 17 00:00:00 2001 From: asardaes Date: Sun, 5 Oct 2025 22:00:46 -0600 Subject: [PATCH] Reimplement mbpseudo plugin inheriting from MusicBrainzPlugin --- beetsplug/mbpseudo.py | 371 +++++++++++---------------------------- beetsplug/musicbrainz.py | 2 +- 2 files changed, 99 insertions(+), 274 deletions(-) diff --git a/beetsplug/mbpseudo.py b/beetsplug/mbpseudo.py index c49e5e5b6..d544a5624 100644 --- a/beetsplug/mbpseudo.py +++ b/beetsplug/mbpseudo.py @@ -14,82 +14,108 @@ """Adds pseudo-releases from MusicBrainz as candidates during import.""" -import itertools +from copy import deepcopy from typing import Any, Iterable, Optional, Sequence +import musicbrainzngs from typing_extensions import override -import beetsplug.musicbrainz as mbplugin # avoid implicit loading of main plugin -from beets.autotag import AlbumInfo from beets.autotag.distance import Distance, distance -from beets.autotag.hooks import TrackInfo +from beets.autotag.hooks import AlbumInfo, TrackInfo from beets.autotag.match import assign_items from beets.library import Item -from beets.metadata_plugins import MetadataSourcePlugin from beets.plugins import find_plugins +from beets.util.id_extractors import extract_release_id from beetsplug._typing import JSONDict +from beetsplug.musicbrainz import ( + RELEASE_INCLUDES, + MusicBrainzPlugin, + _merge_pseudo_and_actual_album, +) _STATUS_PSEUDO = "Pseudo-Release" -class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin): - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - - self.config.add({"scripts": [], "include_official_releases": False}) +class MusicBrainzPseudoReleasePlugin(MusicBrainzPlugin): + def __init__(self) -> None: + super().__init__() + self.config.add({"scripts": []}) self._scripts = self.config["scripts"].as_str_seq() - self._mb = mbplugin.MusicBrainzPlugin() - - self._pseudo_release_ids: dict[str, list[str]] = {} - self._intercepted_candidates: dict[str, AlbumInfo] = {} - self._mb_plugin_loaded_before = True - - self.register_listener("pluginload", self._on_plugins_loaded) - self.register_listener("mb_album_extract", self._intercept_mb_releases) - self.register_listener( - "albuminfo_received", self._intercept_mb_candidates - ) - self._log.debug("Desired scripts: {0}", self._scripts) + self.register_listener("pluginload", self._on_plugins_loaded) + + # noinspection PyMethodMayBeStatic def _on_plugins_loaded(self): - mb_index = None - self_index = -1 - for i, plugin in enumerate(find_plugins()): - if isinstance(plugin, mbplugin.MusicBrainzPlugin): - mb_index = i - elif isinstance(plugin, MusicBrainzPseudoReleasePlugin): - self_index = i + for plugin in find_plugins(): + if isinstance(plugin, MusicBrainzPlugin) and not isinstance( + plugin, MusicBrainzPseudoReleasePlugin + ): + raise RuntimeError( + "The musicbrainz plugin should not be enabled together with" + " the mbpseudo plugin" + ) - if mb_index and self_index < mb_index: - self._mb_plugin_loaded_before = False - self._log.warning( - "The mbpseudo plugin was loaded before the musicbrainz plugin" - ", this will result in redundant network calls" + @override + def candidates( + self, + items: Sequence[Item], + artist: str, + album: str, + va_likely: bool, + ) -> Iterable[AlbumInfo]: + if len(self._scripts) == 0: + yield from super().candidates(items, artist, album, va_likely) + else: + for album_info in super().candidates( + items, artist, album, va_likely + ): + if isinstance(album_info, PseudoAlbumInfo): + yield album_info.get_official_release() + self._log.debug( + "Using {0} release for distance calculations for album {1}", + album_info.determine_best_ref(items), + album_info.album_id, + ) + + yield album_info + + @override + def album_info(self, release: JSONDict) -> AlbumInfo: + official_release = super().album_info(release) + official_release.data_source = "MusicBrainz" + + if release.get("status") == _STATUS_PSEUDO: + return official_release + elif pseudo_release_ids := self._intercept_mb_release(release): + album_id = self._extract_id(pseudo_release_ids[0]) + raw_pseudo_release = musicbrainzngs.get_release_by_id( + album_id, RELEASE_INCLUDES ) + pseudo_release = super().album_info(raw_pseudo_release["release"]) + return PseudoAlbumInfo( + pseudo_release=_merge_pseudo_and_actual_album( + pseudo_release, official_release + ), + official_release=official_release, + data_source=self.data_source, + ) + else: + return official_release - def _intercept_mb_releases(self, data: JSONDict): + def _intercept_mb_release(self, data: JSONDict) -> list[str]: album_id = data["id"] if "id" in data else None - if ( - self._has_desired_script(data) - or not isinstance(album_id, str) - or album_id in self._pseudo_release_ids - ): - return None + if self._has_desired_script(data) or not isinstance(album_id, str): + return [] - pseudo_release_ids = [ + return [ pr_id for rel in data.get("release-relation-list", []) - if (pr_id := self._wanted_pseudo_release_id(rel)) is not None + if (pr_id := self._wanted_pseudo_release_id(album_id, rel)) + is not None ] - if len(pseudo_release_ids) > 0: - self._log.debug("Intercepted release with album id {0}", album_id) - self._pseudo_release_ids[album_id] = pseudo_release_ids - - return None - def _has_desired_script(self, release: JSONDict) -> bool: if len(self._scripts) == 0: return False @@ -100,6 +126,7 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin): def _wanted_pseudo_release_id( self, + album_id: str, relation: JSONDict, ) -> Optional[str]: if ( @@ -112,207 +139,15 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin): release = relation["release"] if "id" in release and self._has_desired_script(release): + self._log.debug( + "Adding pseudo-release {0} for main release {1}", + release["id"], + album_id, + ) return release["id"] else: return None - def _intercept_mb_candidates(self, info: AlbumInfo): - if ( - not isinstance(info, PseudoAlbumInfo) - and info.album_id in self._pseudo_release_ids - and info.album_id not in self._intercepted_candidates - ): - self._log.debug( - "Intercepted candidate with album id {0.album_id}", info - ) - self._intercepted_candidates[info.album_id] = info.copy() - - elif info.get("albumstatus", "") == _STATUS_PSEUDO: - self._purge_intercepted_pseudo_releases(info) - - def candidates( - self, - items: Sequence[Item], - artist: str, - album: str, - va_likely: bool, - ) -> Iterable[AlbumInfo]: - """Even though a candidate might have extra and/or missing tracks, the set of - paths from the items that were actually matched (which are stored in the - corresponding ``mapping``) must be a subset of the set of paths from the input - items. This helps us figure out which intercepted candidate might be relevant - for the items we get in this call even if other candidates have been - concurrently intercepted as well. - """ - - if len(self._scripts) == 0: - return [] - - try: - item_paths = {item.path for item in items} - official_release_id = next( - key - for key, info in self._intercepted_candidates.items() - if "mapping" in info - and all( - mapping_key.path in item_paths - for mapping_key in info.mapping.keys() - ) - ) - pseudo_release_ids = self._pseudo_release_ids[official_release_id] - self._log.debug( - "Processing pseudo-releases for {0}: {1}", - official_release_id, - pseudo_release_ids, - ) - except StopIteration: - official_release_id = None - pseudo_release_ids = [] - - if official_release_id is not None: - pseudo_releases = self._get_pseudo_releases( - items, official_release_id, pseudo_release_ids - ) - del self._pseudo_release_ids[official_release_id] - del self._intercepted_candidates[official_release_id] - return pseudo_releases - - if ( - any( - isinstance(plugin, mbplugin.MusicBrainzPlugin) - for plugin in find_plugins() - ) - and self._mb_plugin_loaded_before - ): - self._log.debug( - "No releases found after main MusicBrainz plugin executed" - ) - return [] - - # musicbrainz plugin isn't enabled - self._log.debug("Searching for official releases") - - try: - existing_album_id = next( - item.mb_albumid for item in items if item.mb_albumid - ) - existing_album_info = self._mb.album_for_id(existing_album_id) - if not isinstance(existing_album_info, AlbumInfo): - official_candidates = list( - self._mb.candidates(items, artist, album, va_likely) - ) - else: - official_candidates = [existing_album_info] - except StopIteration: - official_candidates = list( - self._mb.candidates(items, artist, album, va_likely) - ) - - recursion = self._mb_plugin_simulation_matched( - items, official_candidates - ) - - if recursion and not self.config.get().get("include_official_releases"): - official_candidates = [] - - self._log.debug( - "Emitting {0} official match(es)", len(official_candidates) - ) - if recursion: - self._log.debug("Matches found after search") - return itertools.chain( - self.candidates(items, artist, album, va_likely), - iter(official_candidates), - ) - else: - return iter(official_candidates) - - def _get_pseudo_releases( - self, - items: Sequence[Item], - official_release_id: str, - pseudo_release_ids: list[str], - ) -> list[AlbumInfo]: - pseudo_releases: list[AlbumInfo] = [] - for pr_id in pseudo_release_ids: - if match := self._mb.album_for_id(pr_id): - pseudo_album_info = PseudoAlbumInfo( - pseudo_release=match, - official_release=self._intercepted_candidates[ - official_release_id - ], - data_source=self.data_source, - ) - self._log.debug( - "Using {0} release for distance calculations for album {1}", - pseudo_album_info.determine_best_ref(items), - pr_id, - ) - pseudo_releases.append(pseudo_album_info) - return pseudo_releases - - def _mb_plugin_simulation_matched( - self, - items: Sequence[Item], - official_candidates: list[AlbumInfo], - ) -> bool: - """Simulate how we would have been called if the MusicBrainz plugin had actually - executed. - - At this point we already called ``self._mb.candidates()``, - which emits the ``mb_album_extract`` events, - so now we simulate: - - 1. Intercepting the ``AlbumInfo`` candidate that would have come in the - ``albuminfo_received`` event. - 2. Intercepting the distance calculation of the aforementioned candidate to - store its mapping. - - If the official candidate is already a pseudo-release, we clean up internal - state. This is needed because the MusicBrainz plugin emits official releases - even if it receives a pseudo-release as input, so the chain would actually be: - - pseudo-release input -> - official release with pseudo emitted -> - intercepted -> - pseudo-release resolved (again) - - To avoid resolving again in the last step, we remove the pseudo-release's id. - """ - - matched = False - for official_candidate in official_candidates: - if official_candidate.album_id in self._pseudo_release_ids: - self._intercept_mb_candidates(official_candidate) - - if official_candidate.album_id in self._intercepted_candidates: - intercepted = self._intercepted_candidates[ - official_candidate.album_id - ] - intercepted.mapping, _, _ = assign_items( - items, intercepted.tracks - ) - matched = True - - if official_candidate.get("albumstatus", "") == _STATUS_PSEUDO: - self._purge_intercepted_pseudo_releases(official_candidate) - - return matched - - def _purge_intercepted_pseudo_releases(self, official_candidate: AlbumInfo): - rm_keys = [ - album_id - for album_id, pseudo_album_ids in self._pseudo_release_ids.items() - if official_candidate.album_id in pseudo_album_ids - ] - if rm_keys: - self._log.debug( - "No need to resolve {0}, removing", - rm_keys, - ) - for k in rm_keys: - del self._pseudo_release_ids[k] - @override def album_distance( self, @@ -327,16 +162,6 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin): instance of ``ImmutableMapping``, we know at this point that all penalties from the normal auto-tagging flow have been applied, so we can switch to the metadata from the pseudo-release for the final proposal. - - Other instances of ``AlbumInfo`` must come from other plugins, so we just check - if we intercepted them as candidates with pseudo-releases and store their - ``mapping``. This is needed because the real listeners we use never expose - information from the input ``Item``s, so we intercept that here. - - The paths from the items are used to figure out which pseudo-releases should be - provided for them, which is specially important for concurrent stage execution - where we might have already intercepted releases from different import tasks - when we run. """ if isinstance(album_info, PseudoAlbumInfo): @@ -349,25 +174,11 @@ class MusicBrainzPseudoReleasePlugin(MetadataSourcePlugin): new_mappings, _, _ = assign_items(items, album_info.tracks) mapping.update(new_mappings) - elif album_info.album_id in self._intercepted_candidates: - self._log.debug("Storing mapping for {0.album_id}", album_info) - self._intercepted_candidates[album_info.album_id].mapping = mapping - return super().album_distance(items, album_info, mapping) - def album_for_id(self, album_id: str) -> Optional[AlbumInfo]: - pass - - def track_for_id(self, track_id: str) -> Optional[TrackInfo]: - pass - - def item_candidates( - self, - item: Item, - artist: str, - title: str, - ) -> Iterable[TrackInfo]: - return [] + @override + def _extract_id(self, url: str) -> Optional[str]: + return extract_release_id("MusicBrainz", url) class PseudoAlbumInfo(AlbumInfo): @@ -398,6 +209,9 @@ class PseudoAlbumInfo(AlbumInfo): if k not in kwargs: self[k] = v + def get_official_release(self) -> AlbumInfo: + return self.__dict__["_official_release"] + def determine_best_ref(self, items: Sequence[Item]) -> str: self.use_pseudo_as_ref() pseudo_dist = self._compute_distance(items) @@ -429,6 +243,17 @@ class PseudoAlbumInfo(AlbumInfo): else: return self.__dict__["_official_release"].__getattr__(attr) + def __deepcopy__(self, memo): + cls = self.__class__ + result = cls.__new__(cls) + + memo[id(self)] = result + result.__dict__.update(self.__dict__) + for k, v in self.items(): + result[k] = deepcopy(v, memo) + + return result + class ImmutableMapping(dict[Item, TrackInfo]): def __init__(self, *args, **kwargs): diff --git a/beetsplug/musicbrainz.py b/beetsplug/musicbrainz.py index 8e259e94b..cd53c3156 100644 --- a/beetsplug/musicbrainz.py +++ b/beetsplug/musicbrainz.py @@ -323,7 +323,7 @@ def _find_actual_release_from_pseudo_release( def _merge_pseudo_and_actual_album( pseudo: beets.autotag.hooks.AlbumInfo, actual: beets.autotag.hooks.AlbumInfo -) -> beets.autotag.hooks.AlbumInfo | None: +) -> beets.autotag.hooks.AlbumInfo: """ Merges a pseudo release with its actual release.