diff --git a/beetsplug/musicbrainz.py b/beetsplug/musicbrainz.py index 937f9a127..008c23e5a 100644 --- a/beetsplug/musicbrainz.py +++ b/beetsplug/musicbrainz.py @@ -379,517 +379,517 @@ class MusicBrainzPlugin(BeetsPlugin): data_source = "Musicbrainz" -def configure(): - """Set up the python-musicbrainz-ngs module according to settings - from the beets configuration. This should be called at startup. - """ - hostname = config["musicbrainz"]["host"].as_str() - https = config["musicbrainz"]["https"].get(bool) - # Only call set_hostname when a custom server is configured. Since - # musicbrainz-ngs connects to musicbrainz.org with HTTPS by default - if hostname != "musicbrainz.org": - musicbrainzngs.set_hostname(hostname, https) - musicbrainzngs.set_rate_limit( - config["musicbrainz"]["ratelimit_interval"].as_number(), - config["musicbrainz"]["ratelimit"].get(int), - ) - - -def track_info( - recording: dict, - index: int | None = None, - medium: int | None = None, - medium_index: int | None = None, - medium_total: int | None = None, -) -> beets.autotag.hooks.TrackInfo: - """Translates a MusicBrainz recording result dictionary into a beets - ``TrackInfo`` object. Three parameters are optional and are used - only for tracks that appear on releases (non-singletons): ``index``, - the overall track number; ``medium``, the disc number; - ``medium_index``, the track's index on its medium; ``medium_total``, - the number of tracks on the medium. Each number is a 1-based index. - """ - info = beets.autotag.hooks.TrackInfo( - title=recording["title"], - track_id=recording["id"], - index=index, - medium=medium, - medium_index=medium_index, - medium_total=medium_total, - data_source="MusicBrainz", - data_url=track_url(recording["id"]), - ) - - if recording.get("artist-credit"): - # Get the artist names. - ( - info.artist, - info.artist_sort, - info.artist_credit, - ) = _flatten_artist_credit(recording["artist-credit"]) - - ( - info.artists, - info.artists_sort, - info.artists_credit, - ) = _multi_artist_credit( - recording["artist-credit"], include_join_phrase=False + def configure(): + """Set up the python-musicbrainz-ngs module according to settings + from the beets configuration. This should be called at startup. + """ + hostname = config["musicbrainz"]["host"].as_str() + https = config["musicbrainz"]["https"].get(bool) + # Only call set_hostname when a custom server is configured. Since + # musicbrainz-ngs connects to musicbrainz.org with HTTPS by default + if hostname != "musicbrainz.org": + musicbrainzngs.set_hostname(hostname, https) + musicbrainzngs.set_rate_limit( + config["musicbrainz"]["ratelimit_interval"].as_number(), + config["musicbrainz"]["ratelimit"].get(int), ) - info.artists_ids = _artist_ids(recording["artist-credit"]) - info.artist_id = info.artists_ids[0] - if recording.get("artist-relation-list"): - info.remixer = _get_related_artist_names( - recording["artist-relation-list"], relation_type="remixer" + def track_info( + recording: dict, + index: int | None = None, + medium: int | None = None, + medium_index: int | None = None, + medium_total: int | None = None, + ) -> beets.autotag.hooks.TrackInfo: + """Translates a MusicBrainz recording result dictionary into a beets + ``TrackInfo`` object. Three parameters are optional and are used + only for tracks that appear on releases (non-singletons): ``index``, + the overall track number; ``medium``, the disc number; + ``medium_index``, the track's index on its medium; ``medium_total``, + the number of tracks on the medium. Each number is a 1-based index. + """ + info = beets.autotag.hooks.TrackInfo( + title=recording["title"], + track_id=recording["id"], + index=index, + medium=medium, + medium_index=medium_index, + medium_total=medium_total, + data_source="MusicBrainz", + data_url=track_url(recording["id"]), ) - if recording.get("length"): - info.length = int(recording["length"]) / 1000.0 + if recording.get("artist-credit"): + # Get the artist names. + ( + info.artist, + info.artist_sort, + info.artist_credit, + ) = _flatten_artist_credit(recording["artist-credit"]) - info.trackdisambig = recording.get("disambiguation") + ( + info.artists, + info.artists_sort, + info.artists_credit, + ) = _multi_artist_credit( + recording["artist-credit"], include_join_phrase=False + ) - if recording.get("isrc-list"): - info.isrc = ";".join(recording["isrc-list"]) + info.artists_ids = _artist_ids(recording["artist-credit"]) + info.artist_id = info.artists_ids[0] - lyricist = [] - composer = [] - composer_sort = [] - for work_relation in recording.get("work-relation-list", ()): - if work_relation["type"] != "performance": - continue - info.work = work_relation["work"]["title"] - info.mb_workid = work_relation["work"]["id"] - if "disambiguation" in work_relation["work"]: - info.work_disambig = work_relation["work"]["disambiguation"] + if recording.get("artist-relation-list"): + info.remixer = _get_related_artist_names( + recording["artist-relation-list"], relation_type="remixer" + ) - for artist_relation in work_relation["work"].get( - "artist-relation-list", () - ): + if recording.get("length"): + info.length = int(recording["length"]) / 1000.0 + + info.trackdisambig = recording.get("disambiguation") + + if recording.get("isrc-list"): + info.isrc = ";".join(recording["isrc-list"]) + + lyricist = [] + composer = [] + composer_sort = [] + for work_relation in recording.get("work-relation-list", ()): + if work_relation["type"] != "performance": + continue + info.work = work_relation["work"]["title"] + info.mb_workid = work_relation["work"]["id"] + if "disambiguation" in work_relation["work"]: + info.work_disambig = work_relation["work"]["disambiguation"] + + for artist_relation in work_relation["work"].get( + "artist-relation-list", () + ): + if "type" in artist_relation: + type = artist_relation["type"] + if type == "lyricist": + lyricist.append(artist_relation["artist"]["name"]) + elif type == "composer": + composer.append(artist_relation["artist"]["name"]) + composer_sort.append(artist_relation["artist"]["sort-name"]) + if lyricist: + info.lyricist = ", ".join(lyricist) + if composer: + info.composer = ", ".join(composer) + info.composer_sort = ", ".join(composer_sort) + + arranger = [] + for artist_relation in recording.get("artist-relation-list", ()): if "type" in artist_relation: type = artist_relation["type"] - if type == "lyricist": - lyricist.append(artist_relation["artist"]["name"]) - elif type == "composer": - composer.append(artist_relation["artist"]["name"]) - composer_sort.append(artist_relation["artist"]["sort-name"]) - if lyricist: - info.lyricist = ", ".join(lyricist) - if composer: - info.composer = ", ".join(composer) - info.composer_sort = ", ".join(composer_sort) + if type == "arranger": + arranger.append(artist_relation["artist"]["name"]) + if arranger: + info.arranger = ", ".join(arranger) - arranger = [] - for artist_relation in recording.get("artist-relation-list", ()): - if "type" in artist_relation: - type = artist_relation["type"] - if type == "arranger": - arranger.append(artist_relation["artist"]["name"]) - if arranger: - info.arranger = ", ".join(arranger) + # Supplementary fields provided by plugins + extra_trackdatas = plugins.send("mb_track_extract", data=recording) + for extra_trackdata in extra_trackdatas: + info.update(extra_trackdata) - # Supplementary fields provided by plugins - extra_trackdatas = plugins.send("mb_track_extract", data=recording) - for extra_trackdata in extra_trackdatas: - info.update(extra_trackdata) - - return info + return info -def album_info(release: dict) -> beets.autotag.hooks.AlbumInfo: - """Takes a MusicBrainz release result dictionary and returns a beets - AlbumInfo object containing the interesting data about that release. - """ - # Get artist name using join phrases. - artist_name, artist_sort_name, artist_credit_name = _flatten_artist_credit( - release["artist-credit"] - ) + def album_info(release: dict) -> beets.autotag.hooks.AlbumInfo: + """Takes a MusicBrainz release result dictionary and returns a beets + AlbumInfo object containing the interesting data about that release. + """ + # Get artist name using join phrases. + artist_name, artist_sort_name, artist_credit_name = _flatten_artist_credit( + release["artist-credit"] + ) - ( - artists_names, - artists_sort_names, - artists_credit_names, - ) = _multi_artist_credit( - release["artist-credit"], include_join_phrase=False - ) + ( + artists_names, + artists_sort_names, + artists_credit_names, + ) = _multi_artist_credit( + release["artist-credit"], include_join_phrase=False + ) - ntracks = sum(len(m["track-list"]) for m in release["medium-list"]) + ntracks = sum(len(m["track-list"]) for m in release["medium-list"]) - # The MusicBrainz API omits 'artist-relation-list' and 'work-relation-list' - # when the release has more than 500 tracks. So we use browse_recordings - # on chunks of tracks to recover the same information in this case. - if ntracks > BROWSE_MAXTRACKS: - log.debug("Album {} has too many tracks", release["id"]) - recording_list = [] - for i in range(0, ntracks, BROWSE_CHUNKSIZE): - log.debug("Retrieving tracks starting at {}", i) - recording_list.extend( - musicbrainzngs.browse_recordings( - release=release["id"], - limit=BROWSE_CHUNKSIZE, - includes=BROWSE_INCLUDES, - offset=i, - )["recording-list"] - ) - track_map = {r["id"]: r for r in recording_list} + # The MusicBrainz API omits 'artist-relation-list' and 'work-relation-list' + # when the release has more than 500 tracks. So we use browse_recordings + # on chunks of tracks to recover the same information in this case. + if ntracks > BROWSE_MAXTRACKS: + log.debug("Album {} has too many tracks", release["id"]) + recording_list = [] + for i in range(0, ntracks, BROWSE_CHUNKSIZE): + log.debug("Retrieving tracks starting at {}", i) + recording_list.extend( + musicbrainzngs.browse_recordings( + release=release["id"], + limit=BROWSE_CHUNKSIZE, + includes=BROWSE_INCLUDES, + offset=i, + )["recording-list"] + ) + track_map = {r["id"]: r for r in recording_list} + for medium in release["medium-list"]: + for recording in medium["track-list"]: + recording_info = track_map[recording["recording"]["id"]] + recording["recording"] = recording_info + + # Basic info. + track_infos = [] + index = 0 for medium in release["medium-list"]: - for recording in medium["track-list"]: - recording_info = track_map[recording["recording"]["id"]] - recording["recording"] = recording_info + disctitle = medium.get("title") + format = medium.get("format") - # Basic info. - track_infos = [] - index = 0 - for medium in release["medium-list"]: - disctitle = medium.get("title") - format = medium.get("format") - - if format in config["match"]["ignored_media"].as_str_seq(): - continue - - all_tracks = medium["track-list"] - if ( - "data-track-list" in medium - and not config["match"]["ignore_data_tracks"] - ): - all_tracks += medium["data-track-list"] - track_count = len(all_tracks) - - if "pregap" in medium: - all_tracks.insert(0, medium["pregap"]) - - for track in all_tracks: - if ( - "title" in track["recording"] - and track["recording"]["title"] in SKIPPED_TRACKS - ): + if format in config["match"]["ignored_media"].as_str_seq(): continue + all_tracks = medium["track-list"] if ( - "video" in track["recording"] - and track["recording"]["video"] == "true" - and config["match"]["ignore_video_tracks"] + "data-track-list" in medium + and not config["match"]["ignore_data_tracks"] ): - continue + all_tracks += medium["data-track-list"] + track_count = len(all_tracks) - # Basic information from the recording. - index += 1 - ti = track_info( - track["recording"], - index, - int(medium["position"]), - int(track["position"]), - track_count, - ) - ti.release_track_id = track["id"] - ti.disctitle = disctitle - ti.media = format - ti.track_alt = track["number"] + if "pregap" in medium: + all_tracks.insert(0, medium["pregap"]) - # Prefer track data, where present, over recording data. - if track.get("title"): - ti.title = track["title"] - if track.get("artist-credit"): - # Get the artist names. - ( - ti.artist, - ti.artist_sort, - ti.artist_credit, - ) = _flatten_artist_credit(track["artist-credit"]) + for track in all_tracks: + if ( + "title" in track["recording"] + and track["recording"]["title"] in SKIPPED_TRACKS + ): + continue - ( - ti.artists, - ti.artists_sort, - ti.artists_credit, - ) = _multi_artist_credit( - track["artist-credit"], include_join_phrase=False + if ( + "video" in track["recording"] + and track["recording"]["video"] == "true" + and config["match"]["ignore_video_tracks"] + ): + continue + + # Basic information from the recording. + index += 1 + ti = track_info( + track["recording"], + index, + int(medium["position"]), + int(track["position"]), + track_count, ) + ti.release_track_id = track["id"] + ti.disctitle = disctitle + ti.media = format + ti.track_alt = track["number"] - ti.artists_ids = _artist_ids(track["artist-credit"]) - ti.artist_id = ti.artists_ids[0] - if track.get("length"): - ti.length = int(track["length"]) / (1000.0) + # Prefer track data, where present, over recording data. + if track.get("title"): + ti.title = track["title"] + if track.get("artist-credit"): + # Get the artist names. + ( + ti.artist, + ti.artist_sort, + ti.artist_credit, + ) = _flatten_artist_credit(track["artist-credit"]) - track_infos.append(ti) + ( + ti.artists, + ti.artists_sort, + ti.artists_credit, + ) = _multi_artist_credit( + track["artist-credit"], include_join_phrase=False + ) - album_artist_ids = _artist_ids(release["artist-credit"]) - info = beets.autotag.hooks.AlbumInfo( - album=release["title"], - album_id=release["id"], - artist=artist_name, - artist_id=album_artist_ids[0], - artists=artists_names, - artists_ids=album_artist_ids, - tracks=track_infos, - mediums=len(release["medium-list"]), - artist_sort=artist_sort_name, - artists_sort=artists_sort_names, - artist_credit=artist_credit_name, - artists_credit=artists_credit_names, - data_source="MusicBrainz", - data_url=album_url(release["id"]), - barcode=release.get("barcode"), - ) - info.va = info.artist_id == VARIOUS_ARTISTS_ID - if info.va: - info.artist = config["va_name"].as_str() - info.asin = release.get("asin") - info.releasegroup_id = release["release-group"]["id"] - info.albumstatus = release.get("status") + ti.artists_ids = _artist_ids(track["artist-credit"]) + ti.artist_id = ti.artists_ids[0] + if track.get("length"): + ti.length = int(track["length"]) / (1000.0) - if release["release-group"].get("title"): - info.release_group_title = release["release-group"].get("title") + track_infos.append(ti) - # Get the disambiguation strings at the release and release group level. - if release["release-group"].get("disambiguation"): - info.releasegroupdisambig = release["release-group"].get( - "disambiguation" + album_artist_ids = _artist_ids(release["artist-credit"]) + info = beets.autotag.hooks.AlbumInfo( + album=release["title"], + album_id=release["id"], + artist=artist_name, + artist_id=album_artist_ids[0], + artists=artists_names, + artists_ids=album_artist_ids, + tracks=track_infos, + mediums=len(release["medium-list"]), + artist_sort=artist_sort_name, + artists_sort=artists_sort_names, + artist_credit=artist_credit_name, + artists_credit=artists_credit_names, + data_source="MusicBrainz", + data_url=album_url(release["id"]), + barcode=release.get("barcode"), ) - if release.get("disambiguation"): - info.albumdisambig = release.get("disambiguation") + info.va = info.artist_id == VARIOUS_ARTISTS_ID + if info.va: + info.artist = config["va_name"].as_str() + info.asin = release.get("asin") + info.releasegroup_id = release["release-group"]["id"] + info.albumstatus = release.get("status") - # Get the "classic" Release type. This data comes from a legacy API - # feature before MusicBrainz supported multiple release types. - if "type" in release["release-group"]: - reltype = release["release-group"]["type"] - if reltype: - info.albumtype = reltype.lower() + if release["release-group"].get("title"): + info.release_group_title = release["release-group"].get("title") - # Set the new-style "primary" and "secondary" release types. - albumtypes = [] - if "primary-type" in release["release-group"]: - rel_primarytype = release["release-group"]["primary-type"] - if rel_primarytype: - albumtypes.append(rel_primarytype.lower()) - if "secondary-type-list" in release["release-group"]: - if release["release-group"]["secondary-type-list"]: - for sec_type in release["release-group"]["secondary-type-list"]: - albumtypes.append(sec_type.lower()) - info.albumtypes = albumtypes + # Get the disambiguation strings at the release and release group level. + if release["release-group"].get("disambiguation"): + info.releasegroupdisambig = release["release-group"].get( + "disambiguation" + ) + if release.get("disambiguation"): + info.albumdisambig = release.get("disambiguation") - # Release events. - info.country, release_date = _preferred_release_event(release) - release_group_date = release["release-group"].get("first-release-date") - if not release_date: - # Fall back if release-specific date is not available. - release_date = release_group_date + # Get the "classic" Release type. This data comes from a legacy API + # feature before MusicBrainz supported multiple release types. + if "type" in release["release-group"]: + reltype = release["release-group"]["type"] + if reltype: + info.albumtype = reltype.lower() - if release_date: - _set_date_str(info, release_date, False) - _set_date_str(info, release_group_date, True) + # Set the new-style "primary" and "secondary" release types. + albumtypes = [] + if "primary-type" in release["release-group"]: + rel_primarytype = release["release-group"]["primary-type"] + if rel_primarytype: + albumtypes.append(rel_primarytype.lower()) + if "secondary-type-list" in release["release-group"]: + if release["release-group"]["secondary-type-list"]: + for sec_type in release["release-group"]["secondary-type-list"]: + albumtypes.append(sec_type.lower()) + info.albumtypes = albumtypes - # Label name. - if release.get("label-info-list"): - label_info = release["label-info-list"][0] - if label_info.get("label"): - label = label_info["label"]["name"] - if label != "[no label]": - info.label = label - info.catalognum = label_info.get("catalog-number") + # Release events. + info.country, release_date = _preferred_release_event(release) + release_group_date = release["release-group"].get("first-release-date") + if not release_date: + # Fall back if release-specific date is not available. + release_date = release_group_date - # Text representation data. - if release.get("text-representation"): - rep = release["text-representation"] - info.script = rep.get("script") - info.language = rep.get("language") + if release_date: + _set_date_str(info, release_date, False) + _set_date_str(info, release_group_date, True) - # Media (format). - if release["medium-list"]: - # If all media are the same, use that medium name - if len({m.get("format") for m in release["medium-list"]}) == 1: - info.media = release["medium-list"][0].get("format") - # Otherwise, let's just call it "Media" + # Label name. + if release.get("label-info-list"): + label_info = release["label-info-list"][0] + if label_info.get("label"): + label = label_info["label"]["name"] + if label != "[no label]": + info.label = label + info.catalognum = label_info.get("catalog-number") + + # Text representation data. + if release.get("text-representation"): + rep = release["text-representation"] + info.script = rep.get("script") + info.language = rep.get("language") + + # Media (format). + if release["medium-list"]: + # If all media are the same, use that medium name + if len({m.get("format") for m in release["medium-list"]}) == 1: + info.media = release["medium-list"][0].get("format") + # Otherwise, let's just call it "Media" + else: + info.media = "Media" + + if config["musicbrainz"]["genres"]: + sources = [ + release["release-group"].get("tag-list", []), + release.get("tag-list", []), + ] + genres: Counter[str] = Counter() + for source in sources: + for genreitem in source: + genres[genreitem["name"]] += int(genreitem["count"]) + info.genre = "; ".join( + genre + for genre, _count in sorted(genres.items(), key=lambda g: -g[1]) + ) + + # We might find links to external sources (Discogs, Bandcamp, ...) + external_ids = config["musicbrainz"]["external_ids"].get() + wanted_sources = {site for site, wanted in external_ids.items() if wanted} + if wanted_sources and (url_rels := release.get("url-relation-list")): + urls = {} + + for source, url in product(wanted_sources, url_rels): + if f"{source}.com" in (target := url["target"]): + urls[source] = target + log.debug( + "Found link to {} release via MusicBrainz", + source.capitalize(), + ) + + if "discogs" in urls: + info.discogs_albumid = extract_discogs_id_regex(urls["discogs"]) + if "bandcamp" in urls: + info.bandcamp_album_id = urls["bandcamp"] + if "spotify" in urls: + info.spotify_album_id = MetadataSourcePlugin._get_id( + "album", urls["spotify"], spotify_id_regex + ) + if "deezer" in urls: + info.deezer_album_id = MetadataSourcePlugin._get_id( + "album", urls["deezer"], deezer_id_regex + ) + if "beatport" in urls: + info.beatport_album_id = MetadataSourcePlugin._get_id( + "album", urls["beatport"], beatport_id_regex + ) + if "tidal" in urls: + info.tidal_album_id = urls["tidal"].split("/")[-1] + + extra_albumdatas = plugins.send("mb_album_extract", data=release) + for extra_albumdata in extra_albumdatas: + info.update(extra_albumdata) + + return info + + + def match_album( + artist: str, + album: str, + tracks: int | None = None, + extra_tags: dict[str, Any] | None = None, + ) -> Iterator[beets.autotag.hooks.AlbumInfo]: + """Searches for a single album ("release" in MusicBrainz parlance) + and returns an iterator over AlbumInfo objects. May raise a + MusicBrainzAPIError. + + The query consists of an artist name, an album name, and, + optionally, a number of tracks on the album and any other extra tags. + """ + # Build search criteria. + criteria = {"release": album.lower().strip()} + if artist is not None: + criteria["artist"] = artist.lower().strip() else: - info.media = "Media" + # Various Artists search. + criteria["arid"] = VARIOUS_ARTISTS_ID + if tracks is not None: + criteria["tracks"] = str(tracks) - if config["musicbrainz"]["genres"]: - sources = [ - release["release-group"].get("tag-list", []), - release.get("tag-list", []), - ] - genres: Counter[str] = Counter() - for source in sources: - for genreitem in source: - genres[genreitem["name"]] += int(genreitem["count"]) - info.genre = "; ".join( - genre - for genre, _count in sorted(genres.items(), key=lambda g: -g[1]) - ) + # Additional search cues from existing metadata. + if extra_tags: + for tag, value in extra_tags.items(): + key = FIELDS_TO_MB_KEYS[tag] + value = str(value).lower().strip() + if key == "catno": + value = value.replace(" ", "") + if value: + criteria[key] = value - # We might find links to external sources (Discogs, Bandcamp, ...) - external_ids = config["musicbrainz"]["external_ids"].get() - wanted_sources = {site for site, wanted in external_ids.items() if wanted} - if wanted_sources and (url_rels := release.get("url-relation-list")): - urls = {} + # Abort if we have no search terms. + if not any(criteria.values()): + return - for source, url in product(wanted_sources, url_rels): - if f"{source}.com" in (target := url["target"]): - urls[source] = target - log.debug( - "Found link to {} release via MusicBrainz", - source.capitalize(), - ) - - if "discogs" in urls: - info.discogs_albumid = extract_discogs_id_regex(urls["discogs"]) - if "bandcamp" in urls: - info.bandcamp_album_id = urls["bandcamp"] - if "spotify" in urls: - info.spotify_album_id = MetadataSourcePlugin._get_id( - "album", urls["spotify"], spotify_id_regex + try: + log.debug("Searching for MusicBrainz releases with: {!r}", criteria) + res = musicbrainzngs.search_releases( + limit=config["musicbrainz"]["searchlimit"].get(int), **criteria ) - if "deezer" in urls: - info.deezer_album_id = MetadataSourcePlugin._get_id( - "album", urls["deezer"], deezer_id_regex + except musicbrainzngs.MusicBrainzError as exc: + raise MusicBrainzAPIError( + exc, "release search", criteria, traceback.format_exc() ) - if "beatport" in urls: - info.beatport_album_id = MetadataSourcePlugin._get_id( - "album", urls["beatport"], beatport_id_regex + for release in res["release-list"]: + # The search result is missing some data (namely, the tracks), + # so we just use the ID and fetch the rest of the information. + albuminfo = album_for_id(release["id"]) + if albuminfo is not None: + yield albuminfo + + + def match_track( + artist: str, + title: str, + ) -> Iterator[beets.autotag.hooks.TrackInfo]: + """Searches for a single track and returns an iterable of TrackInfo + objects. May raise a MusicBrainzAPIError. + """ + criteria = { + "artist": artist.lower().strip(), + "recording": title.lower().strip(), + } + + if not any(criteria.values()): + return + + try: + res = musicbrainzngs.search_recordings( + limit=config["musicbrainz"]["searchlimit"].get(int), **criteria ) - if "tidal" in urls: - info.tidal_album_id = urls["tidal"].split("/")[-1] - - extra_albumdatas = plugins.send("mb_album_extract", data=release) - for extra_albumdata in extra_albumdatas: - info.update(extra_albumdata) - - return info + except musicbrainzngs.MusicBrainzError as exc: + raise MusicBrainzAPIError( + exc, "recording search", criteria, traceback.format_exc() + ) + for recording in res["recording-list"]: + yield track_info(recording) -def match_album( - artist: str, - album: str, - tracks: int | None = None, - extra_tags: dict[str, Any] | None = None, -) -> Iterator[beets.autotag.hooks.AlbumInfo]: - """Searches for a single album ("release" in MusicBrainz parlance) - and returns an iterator over AlbumInfo objects. May raise a - MusicBrainzAPIError. + def album_for_id(releaseid: str) -> beets.autotag.hooks.AlbumInfo | None: + """Fetches an album by its MusicBrainz ID and returns an AlbumInfo + object or None if the album is not found. May raise a + MusicBrainzAPIError. + """ + log.debug("Requesting MusicBrainz release {}", releaseid) + albumid = _parse_id(releaseid) + if not albumid: + log.debug("Invalid MBID ({0}).", releaseid) + return None + try: + res = musicbrainzngs.get_release_by_id(albumid, RELEASE_INCLUDES) - The query consists of an artist name, an album name, and, - optionally, a number of tracks on the album and any other extra tags. - """ - # Build search criteria. - criteria = {"release": album.lower().strip()} - if artist is not None: - criteria["artist"] = artist.lower().strip() - else: - # Various Artists search. - criteria["arid"] = VARIOUS_ARTISTS_ID - if tracks is not None: - criteria["tracks"] = str(tracks) + # resolve linked release relations + actual_res = None - # Additional search cues from existing metadata. - if extra_tags: - for tag, value in extra_tags.items(): - key = FIELDS_TO_MB_KEYS[tag] - value = str(value).lower().strip() - if key == "catno": - value = value.replace(" ", "") - if value: - criteria[key] = value + if res["release"].get("status") == "Pseudo-Release": + actual_res = _find_actual_release_from_pseudo_release(res) - # Abort if we have no search terms. - if not any(criteria.values()): - return + except musicbrainzngs.ResponseError: + log.debug("Album ID match failed.") + return None + except musicbrainzngs.MusicBrainzError as exc: + raise MusicBrainzAPIError( + exc, "get release by ID", albumid, traceback.format_exc() + ) - try: - log.debug("Searching for MusicBrainz releases with: {!r}", criteria) - res = musicbrainzngs.search_releases( - limit=config["musicbrainz"]["searchlimit"].get(int), **criteria - ) - except musicbrainzngs.MusicBrainzError as exc: - raise MusicBrainzAPIError( - exc, "release search", criteria, traceback.format_exc() - ) - for release in res["release-list"]: - # The search result is missing some data (namely, the tracks), - # so we just use the ID and fetch the rest of the information. - albuminfo = album_for_id(release["id"]) - if albuminfo is not None: - yield albuminfo + # release is potentially a pseudo release + release = album_info(res["release"]) + + # should be None unless we're dealing with a pseudo release + if actual_res is not None: + actual_release = album_info(actual_res["release"]) + return _merge_pseudo_and_actual_album(release, actual_release) + else: + return release -def match_track( - artist: str, - title: str, -) -> Iterator[beets.autotag.hooks.TrackInfo]: - """Searches for a single track and returns an iterable of TrackInfo - objects. May raise a MusicBrainzAPIError. - """ - criteria = { - "artist": artist.lower().strip(), - "recording": title.lower().strip(), - } - - if not any(criteria.values()): - return - - try: - res = musicbrainzngs.search_recordings( - limit=config["musicbrainz"]["searchlimit"].get(int), **criteria - ) - except musicbrainzngs.MusicBrainzError as exc: - raise MusicBrainzAPIError( - exc, "recording search", criteria, traceback.format_exc() - ) - for recording in res["recording-list"]: - yield track_info(recording) - - -def album_for_id(releaseid: str) -> beets.autotag.hooks.AlbumInfo | None: - """Fetches an album by its MusicBrainz ID and returns an AlbumInfo - object or None if the album is not found. May raise a - MusicBrainzAPIError. - """ - log.debug("Requesting MusicBrainz release {}", releaseid) - albumid = _parse_id(releaseid) - if not albumid: - log.debug("Invalid MBID ({0}).", releaseid) - return None - try: - res = musicbrainzngs.get_release_by_id(albumid, RELEASE_INCLUDES) - - # resolve linked release relations - actual_res = None - - if res["release"].get("status") == "Pseudo-Release": - actual_res = _find_actual_release_from_pseudo_release(res) - - except musicbrainzngs.ResponseError: - log.debug("Album ID match failed.") - return None - except musicbrainzngs.MusicBrainzError as exc: - raise MusicBrainzAPIError( - exc, "get release by ID", albumid, traceback.format_exc() - ) - - # release is potentially a pseudo release - release = album_info(res["release"]) - - # should be None unless we're dealing with a pseudo release - if actual_res is not None: - actual_release = album_info(actual_res["release"]) - return _merge_pseudo_and_actual_album(release, actual_release) - else: - return release - - -def track_for_id(releaseid: str) -> beets.autotag.hooks.TrackInfo | None: - """Fetches a track by its MusicBrainz ID. Returns a TrackInfo object - or None if no track is found. May raise a MusicBrainzAPIError. - """ - trackid = _parse_id(releaseid) - if not trackid: - log.debug("Invalid MBID ({0}).", releaseid) - return None - try: - res = musicbrainzngs.get_recording_by_id(trackid, TRACK_INCLUDES) - except musicbrainzngs.ResponseError: - log.debug("Track ID match failed.") - return None - except musicbrainzngs.MusicBrainzError as exc: - raise MusicBrainzAPIError( - exc, "get recording by ID", trackid, traceback.format_exc() - ) - return track_info(res["recording"]) + def track_for_id(releaseid: str) -> beets.autotag.hooks.TrackInfo | None: + """Fetches a track by its MusicBrainz ID. Returns a TrackInfo object + or None if no track is found. May raise a MusicBrainzAPIError. + """ + trackid = _parse_id(releaseid) + if not trackid: + log.debug("Invalid MBID ({0}).", releaseid) + return None + try: + res = musicbrainzngs.get_recording_by_id(trackid, TRACK_INCLUDES) + except musicbrainzngs.ResponseError: + log.debug("Track ID match failed.") + return None + except musicbrainzngs.MusicBrainzError as exc: + raise MusicBrainzAPIError( + exc, "get recording by ID", trackid, traceback.format_exc() + ) + return track_info(res["recording"])