Merge remote-tracking branch 'upstream/master' into importer-restructure

This commit is contained in:
Sebastian Mohr 2025-05-17 10:32:50 +02:00
commit a2e316d444
35 changed files with 1710 additions and 2008 deletions

View file

@ -21,7 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- name: Setup Python with poetry caching
# poetry cache requires poetry to already be installed, weirdly
uses: actions/setup-python@v5

View file

@ -9,7 +9,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: 3.9

View file

@ -53,7 +53,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@ -74,7 +74,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@ -94,7 +94,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@ -118,7 +118,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}

View file

@ -19,7 +19,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
@ -50,7 +50,7 @@ jobs:
ref: ${{ env.NEW_TAG }}
- name: Install Python tools
uses: BrandonLWhite/pipx-install-action@v0.1.1
uses: BrandonLWhite/pipx-install-action@v1.0.1
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}

View file

@ -87,6 +87,15 @@ Install `poetry`_ and `poethepoet`_ using `pipx`_::
$ pipx install poetry poethepoet
.. admonition:: Check ``tool.pipx-install`` section in ``pyproject.toml`` to
see supported versions
::
[tool.pipx-install]
poethepoet = ">=0.26"
poetry = "<2"
.. _pipx: https://pipx.pypa.io/stable
.. _pipx-installation-instructions: https://pipx.pypa.io/stable/installation/

View file

@ -17,7 +17,7 @@ from sys import stderr
import confuse
__version__ = "2.3.0"
__version__ = "2.3.1"
__author__ = "Adrian Sampson <adrian@radbox.org>"

View file

@ -18,17 +18,16 @@ from __future__ import annotations
import re
from functools import total_ordering
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, TypeVar
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
from jellyfish import levenshtein_distance
from unidecode import unidecode
from beets import config, logging, plugins
from beets.autotag import mb
from beets import config, logging
from beets.util import as_string, cached_classproperty
if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from collections.abc import Iterator
from beets.library import Item
@ -56,7 +55,7 @@ class AttrDict(dict[str, V]):
return id(self)
class AlbumInfo(AttrDict):
class AlbumInfo(AttrDict[Any]):
"""Describes a canonical release that may be used to match a release
in the library. Consists of these data members:
@ -166,7 +165,7 @@ class AlbumInfo(AttrDict):
return dupe
class TrackInfo(AttrDict):
class TrackInfo(AttrDict[Any]):
"""Describes a canonical track present on a release. Appears as part
of an AlbumInfo's ``tracks`` list. Consists of these data members:
@ -357,8 +356,8 @@ class Distance:
for each individual penalty.
"""
def __init__(self):
self._penalties = {}
def __init__(self) -> None:
self._penalties: dict[str, list[float]] = {}
self.tracks: dict[TrackInfo, Distance] = {}
@cached_classproperty
@ -591,99 +590,3 @@ class AlbumMatch(NamedTuple):
class TrackMatch(NamedTuple):
distance: Distance
info: TrackInfo
# Aggregation of sources.
def album_for_mbid(release_id: str) -> AlbumInfo | None:
"""Get an AlbumInfo object for a MusicBrainz release ID. Return None
if the ID is not found.
"""
try:
if album := mb.album_for_id(release_id):
plugins.send("albuminfo_received", info=album)
return album
except mb.MusicBrainzAPIError as exc:
exc.log(log)
return None
def track_for_mbid(recording_id: str) -> TrackInfo | None:
"""Get a TrackInfo object for a MusicBrainz recording ID. Return None
if the ID is not found.
"""
try:
if track := mb.track_for_id(recording_id):
plugins.send("trackinfo_received", info=track)
return track
except mb.MusicBrainzAPIError as exc:
exc.log(log)
return None
def album_for_id(_id: str) -> AlbumInfo | None:
"""Get AlbumInfo object for the given ID string."""
return album_for_mbid(_id) or plugins.album_for_id(_id)
def track_for_id(_id: str) -> TrackInfo | None:
"""Get TrackInfo object for the given ID string."""
return track_for_mbid(_id) or plugins.track_for_id(_id)
def invoke_mb(call_func: Callable, *args):
try:
return call_func(*args)
except mb.MusicBrainzAPIError as exc:
exc.log(log)
return ()
@plugins.notify_info_yielded("albuminfo_received")
def album_candidates(
items: list[Item],
artist: str,
album: str,
va_likely: bool,
extra_tags: dict,
) -> Iterable[tuple]:
"""Search for album matches. ``items`` is a list of Item objects
that make up the album. ``artist`` and ``album`` are the respective
names (strings), which may be derived from the item list or may be
entered by the user. ``va_likely`` is a boolean indicating whether
the album is likely to be a "various artists" release. ``extra_tags``
is an optional dictionary of additional tags used to further
constrain the search.
"""
if config["musicbrainz"]["enabled"]:
# Base candidates if we have album and artist to match.
if artist and album:
yield from invoke_mb(
mb.match_album, artist, album, len(items), extra_tags
)
# Also add VA matches from MusicBrainz where appropriate.
if va_likely and album:
yield from invoke_mb(
mb.match_album, None, album, len(items), extra_tags
)
# Candidates from plugins.
yield from plugins.candidates(items, artist, album, va_likely, extra_tags)
@plugins.notify_info_yielded("trackinfo_received")
def item_candidates(item: Item, artist: str, title: str) -> Iterable[tuple]:
"""Search for item matches. ``item`` is the Item to be matched.
``artist`` and ``title`` are strings and either reflect the item or
are specified by the user.
"""
# MusicBrainz candidates.
if config["musicbrainz"]["enabled"] and artist and title:
yield from invoke_mb(mb.match_track, artist, title)
# Plugin candidates.
yield from plugins.item_candidates(item, artist, title)

View file

@ -335,8 +335,8 @@ def distance(
return dist
def match_by_id(items: Iterable[Item]):
"""If the items are tagged with a MusicBrainz album ID, returns an
def match_by_id(items: Iterable[Item]) -> AlbumInfo | None:
"""If the items are tagged with an external source ID, return an
AlbumInfo object for the corresponding album. Otherwise, returns
None.
"""
@ -356,7 +356,7 @@ def match_by_id(items: Iterable[Item]):
return None
# If all album IDs are equal, look up the album.
log.debug("Searching for discovered album ID: {0}", first)
return hooks.album_for_mbid(first)
return plugins.album_for_id(first)
def _recommendation(
@ -511,15 +511,14 @@ def tag_album(
if search_ids:
for search_id in search_ids:
log.debug("Searching for album ID: {0}", search_id)
if info := hooks.album_for_id(search_id):
if info := plugins.album_for_id(search_id):
_add_candidate(items, candidates, info)
# Use existing metadata or text search.
else:
# Try search based on current ID.
id_info = match_by_id(items)
if id_info:
_add_candidate(items, candidates, id_info)
if info := match_by_id(items):
_add_candidate(items, candidates, info)
rec = _recommendation(list(candidates.values()))
log.debug("Album ID match recommendation is {0}", rec)
if candidates and not config["import"]["timid"]:
@ -540,12 +539,6 @@ def tag_album(
search_artist, search_album = cur_artist, cur_album
log.debug("Search terms: {0} - {1}", search_artist, search_album)
extra_tags = None
if config["musicbrainz"]["extra_tags"]:
tag_list = config["musicbrainz"]["extra_tags"].get()
extra_tags = {k: v for (k, v) in likelies.items() if k in tag_list}
log.debug("Additional search terms: {0}", extra_tags)
# Is this album likely to be a "various artist" release?
va_likely = (
(not consensus["artist"])
@ -555,8 +548,8 @@ def tag_album(
log.debug("Album might be VA: {0}", va_likely)
# Get the results from the data sources.
for matched_candidate in hooks.album_candidates(
items, search_artist, search_album, va_likely, extra_tags
for matched_candidate in plugins.candidates(
items, search_artist, search_album, va_likely
):
_add_candidate(items, candidates, matched_candidate)
@ -576,22 +569,21 @@ def tag_item(
"""Find metadata for a single track. Return a `Proposal` consisting
of `TrackMatch` objects.
`search_artist` and `search_title` may be used
to override the current metadata for the purposes of the MusicBrainz
title. `search_ids` may be used for restricting the search to a list
of metadata backend IDs.
`search_artist` and `search_title` may be used to override the item
metadata in the search query. `search_ids` may be used for restricting the
search to a list of metadata backend IDs.
"""
# Holds candidates found so far: keys are MBIDs; values are
# (distance, TrackInfo) pairs.
candidates = {}
rec: Recommendation | None = None
# First, try matching by MusicBrainz ID.
# First, try matching by the external source ID.
trackids = search_ids or [t for t in [item.mb_trackid] if t]
if trackids:
for trackid in trackids:
log.debug("Searching for track ID: {0}", trackid)
if info := hooks.track_for_id(trackid):
if info := plugins.track_for_id(trackid):
dist = track_distance(item, info, incl_artist=True)
candidates[info.track_id] = hooks.TrackMatch(dist, info)
# If this is a good match, then don't keep searching.
@ -612,12 +604,14 @@ def tag_item(
return Proposal([], Recommendation.none)
# Search terms.
if not (search_artist and search_title):
search_artist, search_title = item.artist, item.title
search_artist = search_artist or item.artist
search_title = search_title or item.title
log.debug("Item search terms: {0} - {1}", search_artist, search_title)
# Get and evaluate candidate metadata.
for track_info in hooks.item_candidates(item, search_artist, search_title):
for track_info in plugins.item_candidates(
item, search_artist, search_title
):
dist = track_distance(item, track_info, incl_artist=True)
candidates[track_info.track_id] = hooks.TrackMatch(dist, track_info)

View file

@ -1,891 +0,0 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Searches for albums in the MusicBrainz database."""
from __future__ import annotations
import re
import traceback
from collections import Counter
from itertools import product
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
import musicbrainzngs
import beets
import beets.autotag.hooks
from beets import config, logging, plugins, util
from beets.plugins import MetadataSourcePlugin
from beets.util.id_extractors import (
beatport_id_regex,
deezer_id_regex,
extract_discogs_id_regex,
spotify_id_regex,
)
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
VARIOUS_ARTISTS_ID = "89ad4ac3-39f7-470e-963a-56509c546377"
BASE_URL = "https://musicbrainz.org/"
SKIPPED_TRACKS = ["[data track]"]
FIELDS_TO_MB_KEYS = {
"catalognum": "catno",
"country": "country",
"label": "label",
"barcode": "barcode",
"media": "format",
"year": "date",
}
musicbrainzngs.set_useragent("beets", beets.__version__, "https://beets.io/")
class MusicBrainzAPIError(util.HumanReadableError):
"""An error while talking to MusicBrainz. The `query` field is the
parameter to the action and may have any type.
"""
def __init__(self, reason, verb, query, tb=None):
self.query = query
if isinstance(reason, musicbrainzngs.WebServiceError):
reason = "MusicBrainz not reachable"
super().__init__(reason, verb, tb)
def get_message(self):
return "{} in {} with query {}".format(
self._reasonstr(), self.verb, repr(self.query)
)
log = logging.getLogger("beets")
RELEASE_INCLUDES = list(
{
"artists",
"media",
"recordings",
"release-groups",
"labels",
"artist-credits",
"aliases",
"recording-level-rels",
"work-rels",
"work-level-rels",
"artist-rels",
"isrcs",
"url-rels",
"release-rels",
"tags",
}
& set(musicbrainzngs.VALID_INCLUDES["release"])
)
TRACK_INCLUDES = list(
{
"artists",
"aliases",
"isrcs",
"work-level-rels",
"artist-rels",
}
& set(musicbrainzngs.VALID_INCLUDES["recording"])
)
BROWSE_INCLUDES = [
"artist-credits",
"work-rels",
"artist-rels",
"recording-rels",
"release-rels",
]
if "work-level-rels" in musicbrainzngs.VALID_BROWSE_INCLUDES["recording"]:
BROWSE_INCLUDES.append("work-level-rels")
BROWSE_CHUNKSIZE = 100
BROWSE_MAXTRACKS = 500
def track_url(trackid: str) -> str:
return urljoin(BASE_URL, "recording/" + trackid)
def album_url(albumid: str) -> str:
return urljoin(BASE_URL, "release/" + albumid)
def configure():
"""Set up the python-musicbrainz-ngs module according to settings
from the beets configuration. This should be called at startup.
"""
hostname = config["musicbrainz"]["host"].as_str()
https = config["musicbrainz"]["https"].get(bool)
# Only call set_hostname when a custom server is configured. Since
# musicbrainz-ngs connects to musicbrainz.org with HTTPS by default
if hostname != "musicbrainz.org":
musicbrainzngs.set_hostname(hostname, https)
musicbrainzngs.set_rate_limit(
config["musicbrainz"]["ratelimit_interval"].as_number(),
config["musicbrainz"]["ratelimit"].get(int),
)
def _preferred_alias(aliases: list):
"""Given an list of alias structures for an artist credit, select
and return the user's preferred alias alias or None if no matching
alias is found.
"""
if not aliases:
return
# Only consider aliases that have locales set.
aliases = [a for a in aliases if "locale" in a]
# Get any ignored alias types and lower case them to prevent case issues
ignored_alias_types = config["import"]["ignored_alias_types"].as_str_seq()
ignored_alias_types = [a.lower() for a in ignored_alias_types]
# Search configured locales in order.
for locale in config["import"]["languages"].as_str_seq():
# Find matching primary aliases for this locale that are not
# being ignored
matches = []
for a in aliases:
if (
a["locale"] == locale
and "primary" in a
and a.get("type", "").lower() not in ignored_alias_types
):
matches.append(a)
# Skip to the next locale if we have no matches
if not matches:
continue
return matches[0]
def _preferred_release_event(
release: dict[str, Any],
) -> tuple[str | None, str | None]:
"""Given a release, select and return the user's preferred release
event as a tuple of (country, release_date). Fall back to the
default release event if a preferred event is not found.
"""
preferred_countries: Sequence[str] = config["match"]["preferred"][
"countries"
].as_str_seq()
for country in preferred_countries:
for event in release.get("release-event-list", {}):
try:
if country in event["area"]["iso-3166-1-code-list"]:
return country, event["date"]
except KeyError:
pass
return release.get("country"), release.get("date")
def _multi_artist_credit(
credit: list[dict], include_join_phrase: bool
) -> tuple[list[str], list[str], list[str]]:
"""Given a list representing an ``artist-credit`` block, accumulate
data into a triple of joined artist name lists: canonical, sort, and
credit.
"""
artist_parts = []
artist_sort_parts = []
artist_credit_parts = []
for el in credit:
if isinstance(el, str):
# Join phrase.
if include_join_phrase:
artist_parts.append(el)
artist_credit_parts.append(el)
artist_sort_parts.append(el)
else:
alias = _preferred_alias(el["artist"].get("alias-list", ()))
# An artist.
if alias:
cur_artist_name = alias["alias"]
else:
cur_artist_name = el["artist"]["name"]
artist_parts.append(cur_artist_name)
# Artist sort name.
if alias:
artist_sort_parts.append(alias["sort-name"])
elif "sort-name" in el["artist"]:
artist_sort_parts.append(el["artist"]["sort-name"])
else:
artist_sort_parts.append(cur_artist_name)
# Artist credit.
if "name" in el:
artist_credit_parts.append(el["name"])
else:
artist_credit_parts.append(cur_artist_name)
return (
artist_parts,
artist_sort_parts,
artist_credit_parts,
)
def _flatten_artist_credit(credit: list[dict]) -> tuple[str, str, str]:
"""Given a list representing an ``artist-credit`` block, flatten the
data into a triple of joined artist name strings: canonical, sort, and
credit.
"""
artist_parts, artist_sort_parts, artist_credit_parts = _multi_artist_credit(
credit, include_join_phrase=True
)
return (
"".join(artist_parts),
"".join(artist_sort_parts),
"".join(artist_credit_parts),
)
def _artist_ids(credit: list[dict]) -> list[str]:
"""
Given a list representing an ``artist-credit``,
return a list of artist IDs
"""
artist_ids: list[str] = []
for el in credit:
if isinstance(el, dict):
artist_ids.append(el["artist"]["id"])
return artist_ids
def _get_related_artist_names(relations, relation_type):
"""Given a list representing the artist relationships extract the names of
the remixers and concatenate them.
"""
related_artists = []
for relation in relations:
if relation["type"] == relation_type:
related_artists.append(relation["artist"]["name"])
return ", ".join(related_artists)
def track_info(
recording: dict,
index: int | None = None,
medium: int | None = None,
medium_index: int | None = None,
medium_total: int | None = None,
) -> beets.autotag.hooks.TrackInfo:
"""Translates a MusicBrainz recording result dictionary into a beets
``TrackInfo`` object. Three parameters are optional and are used
only for tracks that appear on releases (non-singletons): ``index``,
the overall track number; ``medium``, the disc number;
``medium_index``, the track's index on its medium; ``medium_total``,
the number of tracks on the medium. Each number is a 1-based index.
"""
info = beets.autotag.hooks.TrackInfo(
title=recording["title"],
track_id=recording["id"],
index=index,
medium=medium,
medium_index=medium_index,
medium_total=medium_total,
data_source="MusicBrainz",
data_url=track_url(recording["id"]),
)
if recording.get("artist-credit"):
# Get the artist names.
(
info.artist,
info.artist_sort,
info.artist_credit,
) = _flatten_artist_credit(recording["artist-credit"])
(
info.artists,
info.artists_sort,
info.artists_credit,
) = _multi_artist_credit(
recording["artist-credit"], include_join_phrase=False
)
info.artists_ids = _artist_ids(recording["artist-credit"])
info.artist_id = info.artists_ids[0]
if recording.get("artist-relation-list"):
info.remixer = _get_related_artist_names(
recording["artist-relation-list"], relation_type="remixer"
)
if recording.get("length"):
info.length = int(recording["length"]) / 1000.0
info.trackdisambig = recording.get("disambiguation")
if recording.get("isrc-list"):
info.isrc = ";".join(recording["isrc-list"])
lyricist = []
composer = []
composer_sort = []
for work_relation in recording.get("work-relation-list", ()):
if work_relation["type"] != "performance":
continue
info.work = work_relation["work"]["title"]
info.mb_workid = work_relation["work"]["id"]
if "disambiguation" in work_relation["work"]:
info.work_disambig = work_relation["work"]["disambiguation"]
for artist_relation in work_relation["work"].get(
"artist-relation-list", ()
):
if "type" in artist_relation:
type = artist_relation["type"]
if type == "lyricist":
lyricist.append(artist_relation["artist"]["name"])
elif type == "composer":
composer.append(artist_relation["artist"]["name"])
composer_sort.append(artist_relation["artist"]["sort-name"])
if lyricist:
info.lyricist = ", ".join(lyricist)
if composer:
info.composer = ", ".join(composer)
info.composer_sort = ", ".join(composer_sort)
arranger = []
for artist_relation in recording.get("artist-relation-list", ()):
if "type" in artist_relation:
type = artist_relation["type"]
if type == "arranger":
arranger.append(artist_relation["artist"]["name"])
if arranger:
info.arranger = ", ".join(arranger)
# Supplementary fields provided by plugins
extra_trackdatas = plugins.send("mb_track_extract", data=recording)
for extra_trackdata in extra_trackdatas:
info.update(extra_trackdata)
return info
def _set_date_str(
info: beets.autotag.hooks.AlbumInfo,
date_str: str,
original: bool = False,
):
"""Given a (possibly partial) YYYY-MM-DD string and an AlbumInfo
object, set the object's release date fields appropriately. If
`original`, then set the original_year, etc., fields.
"""
if date_str:
date_parts = date_str.split("-")
for key in ("year", "month", "day"):
if date_parts:
date_part = date_parts.pop(0)
try:
date_num = int(date_part)
except ValueError:
continue
if original:
key = "original_" + key
setattr(info, key, date_num)
def album_info(release: dict) -> beets.autotag.hooks.AlbumInfo:
"""Takes a MusicBrainz release result dictionary and returns a beets
AlbumInfo object containing the interesting data about that release.
"""
# Get artist name using join phrases.
artist_name, artist_sort_name, artist_credit_name = _flatten_artist_credit(
release["artist-credit"]
)
(
artists_names,
artists_sort_names,
artists_credit_names,
) = _multi_artist_credit(
release["artist-credit"], include_join_phrase=False
)
ntracks = sum(len(m["track-list"]) for m in release["medium-list"])
# The MusicBrainz API omits 'artist-relation-list' and 'work-relation-list'
# when the release has more than 500 tracks. So we use browse_recordings
# on chunks of tracks to recover the same information in this case.
if ntracks > BROWSE_MAXTRACKS:
log.debug("Album {} has too many tracks", release["id"])
recording_list = []
for i in range(0, ntracks, BROWSE_CHUNKSIZE):
log.debug("Retrieving tracks starting at {}", i)
recording_list.extend(
musicbrainzngs.browse_recordings(
release=release["id"],
limit=BROWSE_CHUNKSIZE,
includes=BROWSE_INCLUDES,
offset=i,
)["recording-list"]
)
track_map = {r["id"]: r for r in recording_list}
for medium in release["medium-list"]:
for recording in medium["track-list"]:
recording_info = track_map[recording["recording"]["id"]]
recording["recording"] = recording_info
# Basic info.
track_infos = []
index = 0
for medium in release["medium-list"]:
disctitle = medium.get("title")
format = medium.get("format")
if format in config["match"]["ignored_media"].as_str_seq():
continue
all_tracks = medium["track-list"]
if (
"data-track-list" in medium
and not config["match"]["ignore_data_tracks"]
):
all_tracks += medium["data-track-list"]
track_count = len(all_tracks)
if "pregap" in medium:
all_tracks.insert(0, medium["pregap"])
for track in all_tracks:
if (
"title" in track["recording"]
and track["recording"]["title"] in SKIPPED_TRACKS
):
continue
if (
"video" in track["recording"]
and track["recording"]["video"] == "true"
and config["match"]["ignore_video_tracks"]
):
continue
# Basic information from the recording.
index += 1
ti = track_info(
track["recording"],
index,
int(medium["position"]),
int(track["position"]),
track_count,
)
ti.release_track_id = track["id"]
ti.disctitle = disctitle
ti.media = format
ti.track_alt = track["number"]
# Prefer track data, where present, over recording data.
if track.get("title"):
ti.title = track["title"]
if track.get("artist-credit"):
# Get the artist names.
(
ti.artist,
ti.artist_sort,
ti.artist_credit,
) = _flatten_artist_credit(track["artist-credit"])
(
ti.artists,
ti.artists_sort,
ti.artists_credit,
) = _multi_artist_credit(
track["artist-credit"], include_join_phrase=False
)
ti.artists_ids = _artist_ids(track["artist-credit"])
ti.artist_id = ti.artists_ids[0]
if track.get("length"):
ti.length = int(track["length"]) / (1000.0)
track_infos.append(ti)
album_artist_ids = _artist_ids(release["artist-credit"])
info = beets.autotag.hooks.AlbumInfo(
album=release["title"],
album_id=release["id"],
artist=artist_name,
artist_id=album_artist_ids[0],
artists=artists_names,
artists_ids=album_artist_ids,
tracks=track_infos,
mediums=len(release["medium-list"]),
artist_sort=artist_sort_name,
artists_sort=artists_sort_names,
artist_credit=artist_credit_name,
artists_credit=artists_credit_names,
data_source="MusicBrainz",
data_url=album_url(release["id"]),
barcode=release.get("barcode"),
)
info.va = info.artist_id == VARIOUS_ARTISTS_ID
if info.va:
info.artist = config["va_name"].as_str()
info.asin = release.get("asin")
info.releasegroup_id = release["release-group"]["id"]
info.albumstatus = release.get("status")
if release["release-group"].get("title"):
info.release_group_title = release["release-group"].get("title")
# Get the disambiguation strings at the release and release group level.
if release["release-group"].get("disambiguation"):
info.releasegroupdisambig = release["release-group"].get(
"disambiguation"
)
if release.get("disambiguation"):
info.albumdisambig = release.get("disambiguation")
# Get the "classic" Release type. This data comes from a legacy API
# feature before MusicBrainz supported multiple release types.
if "type" in release["release-group"]:
reltype = release["release-group"]["type"]
if reltype:
info.albumtype = reltype.lower()
# Set the new-style "primary" and "secondary" release types.
albumtypes = []
if "primary-type" in release["release-group"]:
rel_primarytype = release["release-group"]["primary-type"]
if rel_primarytype:
albumtypes.append(rel_primarytype.lower())
if "secondary-type-list" in release["release-group"]:
if release["release-group"]["secondary-type-list"]:
for sec_type in release["release-group"]["secondary-type-list"]:
albumtypes.append(sec_type.lower())
info.albumtypes = albumtypes
# Release events.
info.country, release_date = _preferred_release_event(release)
release_group_date = release["release-group"].get("first-release-date")
if not release_date:
# Fall back if release-specific date is not available.
release_date = release_group_date
if release_date:
_set_date_str(info, release_date, False)
_set_date_str(info, release_group_date, True)
# Label name.
if release.get("label-info-list"):
label_info = release["label-info-list"][0]
if label_info.get("label"):
label = label_info["label"]["name"]
if label != "[no label]":
info.label = label
info.catalognum = label_info.get("catalog-number")
# Text representation data.
if release.get("text-representation"):
rep = release["text-representation"]
info.script = rep.get("script")
info.language = rep.get("language")
# Media (format).
if release["medium-list"]:
# If all media are the same, use that medium name
if len({m.get("format") for m in release["medium-list"]}) == 1:
info.media = release["medium-list"][0].get("format")
# Otherwise, let's just call it "Media"
else:
info.media = "Media"
if config["musicbrainz"]["genres"]:
sources = [
release["release-group"].get("tag-list", []),
release.get("tag-list", []),
]
genres: Counter[str] = Counter()
for source in sources:
for genreitem in source:
genres[genreitem["name"]] += int(genreitem["count"])
info.genre = "; ".join(
genre
for genre, _count in sorted(genres.items(), key=lambda g: -g[1])
)
# We might find links to external sources (Discogs, Bandcamp, ...)
external_ids = config["musicbrainz"]["external_ids"].get()
wanted_sources = {site for site, wanted in external_ids.items() if wanted}
if wanted_sources and (url_rels := release.get("url-relation-list")):
urls = {}
for source, url in product(wanted_sources, url_rels):
if f"{source}.com" in (target := url["target"]):
urls[source] = target
log.debug(
"Found link to {} release via MusicBrainz",
source.capitalize(),
)
if "discogs" in urls:
info.discogs_albumid = extract_discogs_id_regex(urls["discogs"])
if "bandcamp" in urls:
info.bandcamp_album_id = urls["bandcamp"]
if "spotify" in urls:
info.spotify_album_id = MetadataSourcePlugin._get_id(
"album", urls["spotify"], spotify_id_regex
)
if "deezer" in urls:
info.deezer_album_id = MetadataSourcePlugin._get_id(
"album", urls["deezer"], deezer_id_regex
)
if "beatport" in urls:
info.beatport_album_id = MetadataSourcePlugin._get_id(
"album", urls["beatport"], beatport_id_regex
)
if "tidal" in urls:
info.tidal_album_id = urls["tidal"].split("/")[-1]
extra_albumdatas = plugins.send("mb_album_extract", data=release)
for extra_albumdata in extra_albumdatas:
info.update(extra_albumdata)
return info
def match_album(
artist: str,
album: str,
tracks: int | None = None,
extra_tags: dict[str, Any] | None = None,
) -> Iterator[beets.autotag.hooks.AlbumInfo]:
"""Searches for a single album ("release" in MusicBrainz parlance)
and returns an iterator over AlbumInfo objects. May raise a
MusicBrainzAPIError.
The query consists of an artist name, an album name, and,
optionally, a number of tracks on the album and any other extra tags.
"""
# Build search criteria.
criteria = {"release": album.lower().strip()}
if artist is not None:
criteria["artist"] = artist.lower().strip()
else:
# Various Artists search.
criteria["arid"] = VARIOUS_ARTISTS_ID
if tracks is not None:
criteria["tracks"] = str(tracks)
# Additional search cues from existing metadata.
if extra_tags:
for tag, value in extra_tags.items():
key = FIELDS_TO_MB_KEYS[tag]
value = str(value).lower().strip()
if key == "catno":
value = value.replace(" ", "")
if value:
criteria[key] = value
# Abort if we have no search terms.
if not any(criteria.values()):
return
try:
log.debug("Searching for MusicBrainz releases with: {!r}", criteria)
res = musicbrainzngs.search_releases(
limit=config["musicbrainz"]["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "release search", criteria, traceback.format_exc()
)
for release in res["release-list"]:
# The search result is missing some data (namely, the tracks),
# so we just use the ID and fetch the rest of the information.
albuminfo = album_for_id(release["id"])
if albuminfo is not None:
yield albuminfo
def match_track(
artist: str,
title: str,
) -> Iterator[beets.autotag.hooks.TrackInfo]:
"""Searches for a single track and returns an iterable of TrackInfo
objects. May raise a MusicBrainzAPIError.
"""
criteria = {
"artist": artist.lower().strip(),
"recording": title.lower().strip(),
}
if not any(criteria.values()):
return
try:
res = musicbrainzngs.search_recordings(
limit=config["musicbrainz"]["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "recording search", criteria, traceback.format_exc()
)
for recording in res["recording-list"]:
yield track_info(recording)
def _parse_id(s: str) -> str | None:
"""Search for a MusicBrainz ID in the given string and return it. If
no ID can be found, return None.
"""
# Find the first thing that looks like a UUID/MBID.
match = re.search("[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}", s)
if match is not None:
return match.group() if match else None
return None
def _is_translation(r):
_trans_key = "transl-tracklisting"
return r["type"] == _trans_key and r["direction"] == "backward"
def _find_actual_release_from_pseudo_release(
pseudo_rel: dict,
) -> dict | None:
try:
relations = pseudo_rel["release"]["release-relation-list"]
except KeyError:
return None
# currently we only support trans(liter)ation's
translations = [r for r in relations if _is_translation(r)]
if not translations:
return None
actual_id = translations[0]["target"]
return musicbrainzngs.get_release_by_id(actual_id, RELEASE_INCLUDES)
def _merge_pseudo_and_actual_album(
pseudo: beets.autotag.hooks.AlbumInfo, actual: beets.autotag.hooks.AlbumInfo
) -> beets.autotag.hooks.AlbumInfo | None:
"""
Merges a pseudo release with its actual release.
This implementation is naive, it doesn't overwrite fields,
like status or ids.
According to the ticket PICARD-145, the main release id should be used.
But the ticket has been in limbo since over a decade now.
It also suggests the introduction of the tag `musicbrainz_pseudoreleaseid`,
but as of this field can't be found in any official Picard docs,
hence why we did not implement that for now.
"""
merged = pseudo.copy()
from_actual = {
k: actual[k]
for k in [
"media",
"mediums",
"country",
"catalognum",
"year",
"month",
"day",
"original_year",
"original_month",
"original_day",
"label",
"barcode",
"asin",
"style",
"genre",
]
}
merged.update(from_actual)
return merged
def album_for_id(releaseid: str) -> beets.autotag.hooks.AlbumInfo | None:
"""Fetches an album by its MusicBrainz ID and returns an AlbumInfo
object or None if the album is not found. May raise a
MusicBrainzAPIError.
"""
log.debug("Requesting MusicBrainz release {}", releaseid)
albumid = _parse_id(releaseid)
if not albumid:
log.debug("Invalid MBID ({0}).", releaseid)
return None
try:
res = musicbrainzngs.get_release_by_id(albumid, RELEASE_INCLUDES)
# resolve linked release relations
actual_res = None
if res["release"].get("status") == "Pseudo-Release":
actual_res = _find_actual_release_from_pseudo_release(res)
except musicbrainzngs.ResponseError:
log.debug("Album ID match failed.")
return None
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "get release by ID", albumid, traceback.format_exc()
)
# release is potentially a pseudo release
release = album_info(res["release"])
# should be None unless we're dealing with a pseudo release
if actual_res is not None:
actual_release = album_info(actual_res["release"])
return _merge_pseudo_and_actual_album(release, actual_release)
else:
return release
def track_for_id(releaseid: str) -> beets.autotag.hooks.TrackInfo | None:
"""Fetches a track by its MusicBrainz ID. Returns a TrackInfo object
or None if no track is found. May raise a MusicBrainzAPIError.
"""
trackid = _parse_id(releaseid)
if not trackid:
log.debug("Invalid MBID ({0}).", releaseid)
return None
try:
res = musicbrainzngs.get_recording_by_id(trackid, TRACK_INCLUDES)
except musicbrainzngs.ResponseError:
log.debug("Track ID match failed.")
return None
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "get recording by ID", trackid, traceback.format_exc()
)
return track_info(res["recording"])

View file

@ -6,7 +6,8 @@ statefile: state.pickle
# --------------- Plugins ---------------
plugins: []
plugins: [musicbrainz]
pluginpath: []
# --------------- Import ---------------
@ -163,22 +164,6 @@ sort_case_insensitive: yes
overwrite_null:
album: []
track: []
musicbrainz:
enabled: yes
host: musicbrainz.org
https: no
ratelimit: 1
ratelimit_interval: 1.0
searchlimit: 5
extra_tags: []
genres: no
external_ids:
discogs: no
bandcamp: no
spotify: no
deezer: no
beatport: no
tidal: no
match:
strong_rec_thresh: 0.04

View file

@ -22,7 +22,6 @@ import re
import sys
import traceback
from collections import defaultdict
from collections.abc import Iterable
from functools import wraps
from typing import (
TYPE_CHECKING,
@ -46,14 +45,18 @@ else:
if TYPE_CHECKING:
from collections.abc import Iterator
from confuse import ConfigView
from beets.autotag import AlbumInfo, Distance, TrackInfo
from beets.dbcore import Query
from beets.dbcore.db import FieldQueryType, SQLiteType
from beets.dbcore.db import FieldQueryType
from beets.dbcore.types import Type
from beets.importer import ImportSession, ImportTask
from beets.library import Album, Item, Library
from beets.ui import Subcommand
from beets.util.id_extractors import RegexDict
# TYPE_CHECKING guard is needed for any derived type
# which uses an import from `beets.library` and `beets.imported`
@ -64,6 +67,11 @@ if TYPE_CHECKING:
AnyModel = TypeVar("AnyModel", Album, Item)
P = ParamSpec("P")
Ret = TypeVar("Ret", bound=Any)
Listener = Callable[..., None]
IterF = Callable[P, Iterator[Ret]]
PLUGIN_NAMESPACE = "beetsplug"
@ -74,11 +82,6 @@ LASTFM_KEY = "2dc3914abf35f0d9c92d97d8f8e42b43"
log = logging.getLogger("beets")
P = ParamSpec("P")
Ret = TypeVar("Ret", bound=Any)
Listener = Callable[..., None]
class PluginConflictError(Exception):
"""Indicates that the services provided by one plugin conflict with
those of another.
@ -224,7 +227,7 @@ class BeetsPlugin:
def album_distance(
self,
items: list[Item],
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
@ -242,22 +245,29 @@ class BeetsPlugin:
album: str,
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Sequence[AlbumInfo]:
"""Should return a sequence of AlbumInfo objects that match the
album whose items are provided.
) -> Iterator[AlbumInfo]:
"""Return :py:class:`AlbumInfo` candidates that match the given album.
:param items: List of items in the album
:param artist: Album artist
:param album: Album name
:param va_likely: Whether the album is likely to be by various artists
:param extra_tags: is a an optional dictionary of extra tags to search.
Only relevant to :py:class:`MusicBrainzPlugin` autotagger and can be
ignored by other plugins
"""
return ()
yield from ()
def item_candidates(
self,
item: Item,
artist: str,
title: str,
) -> Sequence[TrackInfo]:
"""Should return a sequence of TrackInfo objects that match the
item provided.
self, item: Item, artist: str, title: str
) -> Iterator[TrackInfo]:
"""Return :py:class:`TrackInfo` candidates that match the given track.
:param item: Track item
:param artist: Track artist
:param title: Track title
"""
return ()
yield from ()
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Return an AlbumInfo object or None if no matching release was
@ -422,10 +432,10 @@ def queries() -> dict[str, type[Query]]:
return out
def types(model_cls: type[AnyModel]) -> dict[str, type[SQLiteType]]:
def types(model_cls: type[AnyModel]) -> dict[str, Type]:
# Gives us `item_types` and `album_types`
attr_name = f"{model_cls.__name__.lower()}_types"
types: dict[str, type[SQLiteType]] = {}
types: dict[str, Type] = {}
for plugin in find_plugins():
plugin_types = getattr(plugin, attr_name, {})
for field in plugin_types:
@ -462,7 +472,7 @@ def track_distance(item: Item, info: TrackInfo) -> Distance:
def album_distance(
items: list[Item],
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:
@ -475,24 +485,38 @@ def album_distance(
return dist
def candidates(
items: list[Item],
artist: str,
album: str,
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Iterable[AlbumInfo]:
"""Gets MusicBrainz candidates for an album from each plugin."""
for plugin in find_plugins():
yield from plugin.candidates(
items, artist, album, va_likely, extra_tags
)
def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
"""Makes a generator send the event 'event' every time it yields.
This decorator is supposed to decorate a generator, but any function
returning an iterable should work.
Each yielded value is passed to plugins using the 'info' parameter of
'send'.
"""
def decorator(func: IterF[P, Ret]) -> IterF[P, Ret]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Iterator[Ret]:
for v in func(*args, **kwargs):
send(event, info=v)
yield v
return wrapper
return decorator
def item_candidates(item: Item, artist: str, title: str) -> Iterable[TrackInfo]:
"""Gets MusicBrainz candidates for an item from the plugins."""
@notify_info_yielded("albuminfo_received")
def candidates(*args, **kwargs) -> Iterator[AlbumInfo]:
"""Return matching album candidates from all plugins."""
for plugin in find_plugins():
yield from plugin.item_candidates(item, artist, title)
yield from plugin.candidates(*args, **kwargs)
@notify_info_yielded("trackinfo_received")
def item_candidates(*args, **kwargs) -> Iterator[TrackInfo]:
"""Return matching track candidates from all plugins."""
for plugin in find_plugins():
yield from plugin.item_candidates(*args, **kwargs)
def album_for_id(_id: str) -> AlbumInfo | None:
@ -673,7 +697,7 @@ def sanitize_pairs(
... )
[('foo', 'baz'), ('foo', 'bar'), ('key', 'value'), ('foo', 'foobar')]
"""
pairs_all: list[tuple[str, str]] = list(pairs_all)
pairs_all = list(pairs_all)
seen: set[tuple[str, str]] = set()
others = [x for x in pairs_all if x not in pairs]
res: list[tuple[str, str]] = []
@ -695,32 +719,6 @@ def sanitize_pairs(
return res
IterF = Callable[P, Iterable[Ret]]
def notify_info_yielded(
event: str,
) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
"""Makes a generator send the event 'event' every time it yields.
This decorator is supposed to decorate a generator, but any function
returning an iterable should work.
Each yielded value is passed to plugins using the 'info' parameter of
'send'.
"""
def decorator(
generator: IterF[P, Ret],
) -> IterF[P, Ret]:
def decorated(*args: P.args, **kwargs: P.kwargs) -> Iterable[Ret]:
for v in generator(*args, **kwargs):
send(event, info=v)
yield v
return decorated
return decorator
def get_distance(
config: ConfigView, data_source: str, info: AlbumInfo | TrackInfo
) -> Distance:
@ -772,15 +770,6 @@ class Response(TypedDict):
id: str
class RegexDict(TypedDict):
"""A dictionary containing a regex pattern and the number of the
match group.
"""
pattern: str
match_group: int
R = TypeVar("R", bound=Response)
@ -828,9 +817,7 @@ class MetadataSourcePlugin(Generic[R], BeetsPlugin, metaclass=abc.ABCMeta):
raise NotImplementedError
@abc.abstractmethod
def track_for_id(
self, track_id: str | None = None, track_data: R | None = None
) -> TrackInfo | None:
def track_for_id(self, track_id: str) -> TrackInfo | None:
raise NotImplementedError
@staticmethod
@ -911,44 +898,26 @@ class MetadataSourcePlugin(Generic[R], BeetsPlugin, metaclass=abc.ABCMeta):
album: str,
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Sequence[AlbumInfo]:
"""Returns a list of AlbumInfo objects for Search API results
matching an ``album`` and ``artist`` (if not various).
:param items: List of items comprised by an album to be matched.
:param artist: The artist of the album to be matched.
:param album: The name of the album to be matched.
:param va_likely: True if the album to be matched likely has
Various Artists.
"""
) -> Iterator[AlbumInfo]:
query_filters = {"album": album}
if not va_likely:
query_filters["artist"] = artist
results = self._search_api(query_type="album", filters=query_filters)
albums = [self.album_for_id(album_id=r["id"]) for r in results]
return [a for a in albums if a is not None]
for result in self._search_api("album", query_filters):
if info := self.album_for_id(result["id"]):
yield info
def item_candidates(
self, item: Item, artist: str, title: str
) -> Sequence[TrackInfo]:
"""Returns a list of TrackInfo objects for Search API results
matching ``title`` and ``artist``.
:param item: Singleton item to be matched.
:param artist: The artist of the track to be matched.
:param title: The title of the track to be matched.
"""
track_responses = self._search_api(
query_type="track", keywords=title, filters={"artist": artist}
)
tracks = [self.track_for_id(track_data=r) for r in track_responses]
return [t for t in tracks if t is not None]
) -> Iterator[TrackInfo]:
for result in self._search_api(
"track", {"artist": artist}, keywords=title
):
if info := self.track_for_id(result["id"]):
yield info
def album_distance(
self,
items: list[Item],
items: Sequence[Item],
album_info: AlbumInfo,
mapping: dict[Item, TrackInfo],
) -> Distance:

View file

@ -35,6 +35,7 @@ import subprocess
import sys
import unittest
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from io import StringIO
@ -48,7 +49,7 @@ from mediafile import Image, MediaFile
import beets
import beets.plugins
from beets import autotag, importer, logging, util
from beets import importer, logging, util
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.importer import ImportSession
from beets.library import Album, Item, Library
@ -447,6 +448,11 @@ class PluginMixin(ConfigMixin):
plugin: ClassVar[str]
preload_plugin: ClassVar[bool] = True
original_item_types = dict(Item._types)
original_album_types = dict(Album._types)
original_item_queries = dict(Item._queries)
original_album_queries = dict(Album._queries)
def setup_beets(self):
super().setup_beets()
if self.preload_plugin:
@ -470,13 +476,8 @@ class PluginMixin(ConfigMixin):
# Take a backup of the original _types and _queries to restore
# when unloading.
Item._original_types = dict(Item._types)
Album._original_types = dict(Album._types)
Item._types.update(beets.plugins.types(Item))
Album._types.update(beets.plugins.types(Album))
Item._original_queries = dict(Item._queries)
Album._original_queries = dict(Album._queries)
Item._queries.update(beets.plugins.named_queries(Item))
Album._queries.update(beets.plugins.named_queries(Album))
@ -488,10 +489,10 @@ class PluginMixin(ConfigMixin):
self.config["plugins"] = []
beets.plugins._classes = set()
beets.plugins._instances = {}
Item._types = getattr(Item, "_original_types", {})
Album._types = getattr(Album, "_original_types", {})
Item._queries = getattr(Item, "_original_queries", {})
Album._queries = getattr(Album, "_original_queries", {})
Item._types = self.original_item_types
Album._types = self.original_album_types
Item._queries = self.original_item_queries
Album._queries = self.original_album_queries
@contextmanager
def configure_plugin(self, config: Any):
@ -774,6 +775,7 @@ class TerminalImportMixin(ImportHelper):
)
@dataclass
class AutotagStub:
"""Stub out MusicBrainz album and track matcher and control what the
autotagger returns.
@ -784,47 +786,42 @@ class AutotagStub:
GOOD = "GOOD"
BAD = "BAD"
MISSING = "MISSING"
"""Generate an album match for all but one track
"""
matching: str
length = 2
matching = IDENT
def install(self):
self.mb_match_album = autotag.mb.match_album
self.mb_match_track = autotag.mb.match_track
self.mb_album_for_id = autotag.mb.album_for_id
self.mb_track_for_id = autotag.mb.track_for_id
autotag.mb.match_album = self.match_album
autotag.mb.match_track = self.match_track
autotag.mb.album_for_id = self.album_for_id
autotag.mb.track_for_id = self.track_for_id
self.patchers = [
patch("beets.plugins.album_for_id", lambda *_: None),
patch("beets.plugins.track_for_id", lambda *_: None),
patch("beets.plugins.candidates", self.candidates),
patch("beets.plugins.item_candidates", self.item_candidates),
]
for p in self.patchers:
p.start()
return self
def restore(self):
autotag.mb.match_album = self.mb_match_album
autotag.mb.match_track = self.mb_match_track
autotag.mb.album_for_id = self.mb_album_for_id
autotag.mb.track_for_id = self.mb_track_for_id
for p in self.patchers:
p.stop()
def match_album(self, albumartist, album, tracks, extra_tags):
def candidates(self, items, artist, album, va_likely, extra_tags=None):
if self.matching == self.IDENT:
yield self._make_album_match(albumartist, album, tracks)
yield self._make_album_match(artist, album, len(items))
elif self.matching == self.GOOD:
for i in range(self.length):
yield self._make_album_match(albumartist, album, tracks, i)
yield self._make_album_match(artist, album, len(items), i)
elif self.matching == self.BAD:
for i in range(self.length):
yield self._make_album_match(albumartist, album, tracks, i + 1)
yield self._make_album_match(artist, album, len(items), i + 1)
elif self.matching == self.MISSING:
yield self._make_album_match(albumartist, album, tracks, missing=1)
yield self._make_album_match(artist, album, len(items), missing=1)
def match_track(self, artist, title):
def item_candidates(self, item, artist, title):
yield TrackInfo(
title=title.replace("Tag", "Applied"),
track_id="trackid",
@ -834,12 +831,6 @@ class AutotagStub:
index=0,
)
def album_for_id(self, mbid):
return None
def track_for_id(self, mbid):
return None
def _make_track_match(self, artist, album, number):
return TrackInfo(
title="Applied Track %d" % number,
@ -877,6 +868,15 @@ class AutotagStub:
)
class AutotagImportTestCase(ImportTestCase):
matching = AutotagStub.IDENT
def setUp(self):
super().setUp()
self.matcher = AutotagStub(self.matching).install()
self.addCleanup(self.matcher.restore)
class FetchImageHelper:
"""Helper mixin for mocking requests when fetching images
with remote art sources.

View file

@ -17,6 +17,8 @@ interface. To invoke the CLI, just call beets.ui.main(). The actual
CLI commands are implemented in the ui.commands module.
"""
from __future__ import annotations
import errno
import optparse
import os.path
@ -27,17 +29,19 @@ import sys
import textwrap
import traceback
from difflib import SequenceMatcher
from typing import Any, Callable
from typing import TYPE_CHECKING, Any, Callable
import confuse
from beets import config, library, logging, plugins, util
from beets.autotag import mb
from beets.dbcore import db
from beets.dbcore import query as db_query
from beets.util import as_string
from beets.util.functemplate import template
if TYPE_CHECKING:
from types import ModuleType
# On Windows platforms, use colorama to support "ANSI" terminal colors.
if sys.platform == "win32":
try:
@ -570,7 +574,7 @@ COLOR_NAMES = [
"text_diff_removed",
"text_diff_changed",
]
COLORS = None
COLORS: dict[str, list[str]] | None = None
def _colorize(color, text):
@ -1623,7 +1627,9 @@ optparse.Option.ALWAYS_TYPED_ACTIONS += ("callback",)
# The main entry point and bootstrapping.
def _load_plugins(options, config):
def _load_plugins(
options: optparse.Values, config: confuse.LazyConfig
) -> ModuleType:
"""Load the plugins specified on the command line or in the configuration."""
paths = config["pluginpath"].as_str_seq(split=False)
paths = [util.normpath(p) for p in paths]
@ -1648,6 +1654,11 @@ def _load_plugins(options, config):
)
else:
plugin_list = config["plugins"].as_str_seq()
# TODO: Remove in v2.4 or v3
if "musicbrainz" in config and config["musicbrainz"].get().get(
"enabled"
):
plugin_list.append("musicbrainz")
# Exclude any plugins that were specified on the command line
if options.exclude is not None:
@ -1664,9 +1675,6 @@ def _setup(options, lib=None):
Returns a list of subcommands, a list of plugins, and a library instance.
"""
# Configure the MusicBrainz API.
mb.configure()
config = _configure(options)
plugins = _load_plugins(options, config)

View file

@ -363,7 +363,7 @@ class ChangeRepresentation:
self.indent_header + f"Match ({dist_string(self.match.distance)}):"
)
if self.match.info.get("album"):
if isinstance(self.match.info, autotag.hooks.AlbumInfo):
# Matching an album - print that
artist_album_str = (
f"{self.match.info.artist}" + f" - {self.match.info.album}"

View file

@ -15,20 +15,31 @@
"""Helpers around the extraction of album/track ID's from metadata sources."""
import re
from typing import TypedDict
class RegexDict(TypedDict):
"""A dictionary containing a regex pattern and the number of the
match group.
"""
pattern: str
match_group: int
# Spotify IDs consist of 22 alphanumeric characters
# (zero-left-padded base62 representation of randomly generated UUID4)
spotify_id_regex = {
spotify_id_regex: RegexDict = {
"pattern": r"(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})",
"match_group": 2,
}
deezer_id_regex = {
deezer_id_regex: RegexDict = {
"pattern": r"(^|deezer\.com/)([a-z]*/)?({}/)?(\d+)",
"match_group": 4,
}
beatport_id_regex = {
beatport_id_regex: RegexDict = {
"pattern": r"(^|beatport\.com/release/.+/)(\d+)$",
"match_group": 2,
}

View file

@ -14,10 +14,11 @@
"""Adds an album template field for formatted album types."""
from beets.autotag.mb import VARIOUS_ARTISTS_ID
from beets.library import Album
from beets.plugins import BeetsPlugin
from .musicbrainz import VARIOUS_ARTISTS_ID
class AlbumTypesPlugin(BeetsPlugin):
"""Adds an album template field for formatted album types."""

View file

@ -18,13 +18,14 @@ autotagger. Requires the pyacoustid library.
import re
from collections import defaultdict
from functools import partial
from functools import cached_property, partial
import acoustid
import confuse
from beets import config, plugins, ui, util
from beets.autotag import hooks
from beets.autotag.hooks import Distance
from beetsplug.musicbrainz import MusicBrainzPlugin
API_KEY = "1vOwZtEn"
SCORE_THRESH = 0.5
@ -182,11 +183,15 @@ class AcoustidPlugin(plugins.BeetsPlugin):
self.register_listener("import_task_start", self.fingerprint_task)
self.register_listener("import_task_apply", apply_acoustid_metadata)
@cached_property
def mb(self) -> MusicBrainzPlugin:
return MusicBrainzPlugin()
def fingerprint_task(self, task, session):
return fingerprint_task(self._log, task, session)
def track_distance(self, item, info):
dist = hooks.Distance()
dist = Distance()
if item.path not in _matches or not info.track_id:
# Match failed or no track ID.
return dist
@ -198,7 +203,7 @@ class AcoustidPlugin(plugins.BeetsPlugin):
def candidates(self, items, artist, album, va_likely, extra_tags=None):
albums = []
for relid in prefix(_all_releases(items), MAX_RELEASES):
album = hooks.album_for_mbid(relid)
album = self.mb.album_for_id(relid)
if album:
albums.append(album)
@ -212,7 +217,7 @@ class AcoustidPlugin(plugins.BeetsPlugin):
recording_ids, _ = _matches[item.path]
tracks = []
for recording_id in prefix(recording_ids, MAX_RECORDINGS):
track = hooks.track_for_mbid(recording_id)
track = self.mb.track_for_id(recording_id)
if track:
tracks.append(track)
self._log.debug("acoustid item candidates: {0}", len(tracks))

View file

@ -16,8 +16,7 @@
from collections import defaultdict
from beets import autotag, library, ui, util
from beets.autotag import hooks
from beets import autotag, library, plugins, ui, util
from beets.plugins import BeetsPlugin, apply_item_changes
@ -80,7 +79,7 @@ class MBSyncPlugin(BeetsPlugin):
)
continue
if not (track_info := hooks.track_for_id(item.mb_trackid)):
if not (track_info := plugins.track_for_id(item.mb_trackid)):
self._log.info(
"Recording ID not found: {0.mb_trackid} for track {0}", item
)
@ -101,7 +100,7 @@ class MBSyncPlugin(BeetsPlugin):
self._log.info("Skipping album with no mb_albumid: {}", album)
continue
if not (album_info := hooks.album_for_id(album.mb_albumid)):
if not (album_info := plugins.album_for_id(album.mb_albumid)):
self._log.info(
"Release ID {0.mb_albumid} not found for album {0}", album
)

View file

@ -21,8 +21,7 @@ from collections.abc import Iterator
import musicbrainzngs
from musicbrainzngs.musicbrainz import MusicBrainzError
from beets import config
from beets.autotag import hooks
from beets import config, plugins
from beets.dbcore import types
from beets.library import Album, Item, Library
from beets.plugins import BeetsPlugin
@ -223,7 +222,7 @@ class MissingPlugin(BeetsPlugin):
item_mbids = {x.mb_trackid for x in album.items()}
# fetch missing items
# TODO: Implement caching that without breaking other stuff
if album_info := hooks.album_for_id(album.mb_albumid):
if album_info := plugins.album_for_id(album.mb_albumid):
for track_info in album_info.tracks:
if track_info.track_id not in item_mbids:
self._log.debug(

922
beetsplug/musicbrainz.py Normal file
View file

@ -0,0 +1,922 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Searches for albums in the MusicBrainz database."""
from __future__ import annotations
import re
import traceback
from collections import Counter
from itertools import product
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
import musicbrainzngs
import beets
import beets.autotag.hooks
from beets import config, plugins, util
from beets.plugins import BeetsPlugin, MetadataSourcePlugin
from beets.util.id_extractors import (
beatport_id_regex,
deezer_id_regex,
extract_discogs_id_regex,
spotify_id_regex,
)
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from beets.library import Item
from ._typing import JSONDict
VARIOUS_ARTISTS_ID = "89ad4ac3-39f7-470e-963a-56509c546377"
BASE_URL = "https://musicbrainz.org/"
SKIPPED_TRACKS = ["[data track]"]
FIELDS_TO_MB_KEYS = {
"catalognum": "catno",
"country": "country",
"label": "label",
"barcode": "barcode",
"media": "format",
"year": "date",
}
musicbrainzngs.set_useragent("beets", beets.__version__, "https://beets.io/")
class MusicBrainzAPIError(util.HumanReadableError):
"""An error while talking to MusicBrainz. The `query` field is the
parameter to the action and may have any type.
"""
def __init__(self, reason, verb, query, tb=None):
self.query = query
if isinstance(reason, musicbrainzngs.WebServiceError):
reason = "MusicBrainz not reachable"
super().__init__(reason, verb, tb)
def get_message(self):
return "{} in {} with query {}".format(
self._reasonstr(), self.verb, repr(self.query)
)
RELEASE_INCLUDES = list(
{
"artists",
"media",
"recordings",
"release-groups",
"labels",
"artist-credits",
"aliases",
"recording-level-rels",
"work-rels",
"work-level-rels",
"artist-rels",
"isrcs",
"url-rels",
"release-rels",
"tags",
}
& set(musicbrainzngs.VALID_INCLUDES["release"])
)
TRACK_INCLUDES = list(
{
"artists",
"aliases",
"isrcs",
"work-level-rels",
"artist-rels",
}
& set(musicbrainzngs.VALID_INCLUDES["recording"])
)
BROWSE_INCLUDES = [
"artist-credits",
"work-rels",
"artist-rels",
"recording-rels",
"release-rels",
]
if "work-level-rels" in musicbrainzngs.VALID_BROWSE_INCLUDES["recording"]:
BROWSE_INCLUDES.append("work-level-rels")
BROWSE_CHUNKSIZE = 100
BROWSE_MAXTRACKS = 500
def _preferred_alias(aliases: list[JSONDict]):
"""Given an list of alias structures for an artist credit, select
and return the user's preferred alias alias or None if no matching
alias is found.
"""
if not aliases:
return
# Only consider aliases that have locales set.
valid_aliases = [a for a in aliases if "locale" in a]
# Get any ignored alias types and lower case them to prevent case issues
ignored_alias_types = config["import"]["ignored_alias_types"].as_str_seq()
ignored_alias_types = [a.lower() for a in ignored_alias_types]
# Search configured locales in order.
for locale in config["import"]["languages"].as_str_seq():
# Find matching primary aliases for this locale that are not
# being ignored
matches = []
for alias in valid_aliases:
if (
alias["locale"] == locale
and "primary" in alias
and alias.get("type", "").lower() not in ignored_alias_types
):
matches.append(alias)
# Skip to the next locale if we have no matches
if not matches:
continue
return matches[0]
def _multi_artist_credit(
credit: list[JSONDict], include_join_phrase: bool
) -> tuple[list[str], list[str], list[str]]:
"""Given a list representing an ``artist-credit`` block, accumulate
data into a triple of joined artist name lists: canonical, sort, and
credit.
"""
artist_parts = []
artist_sort_parts = []
artist_credit_parts = []
for el in credit:
if isinstance(el, str):
# Join phrase.
if include_join_phrase:
artist_parts.append(el)
artist_credit_parts.append(el)
artist_sort_parts.append(el)
else:
alias = _preferred_alias(el["artist"].get("alias-list", ()))
# An artist.
if alias:
cur_artist_name = alias["alias"]
else:
cur_artist_name = el["artist"]["name"]
artist_parts.append(cur_artist_name)
# Artist sort name.
if alias:
artist_sort_parts.append(alias["sort-name"])
elif "sort-name" in el["artist"]:
artist_sort_parts.append(el["artist"]["sort-name"])
else:
artist_sort_parts.append(cur_artist_name)
# Artist credit.
if "name" in el:
artist_credit_parts.append(el["name"])
else:
artist_credit_parts.append(cur_artist_name)
return (
artist_parts,
artist_sort_parts,
artist_credit_parts,
)
def track_url(trackid: str) -> str:
return urljoin(BASE_URL, "recording/" + trackid)
def _flatten_artist_credit(credit: list[JSONDict]) -> tuple[str, str, str]:
"""Given a list representing an ``artist-credit`` block, flatten the
data into a triple of joined artist name strings: canonical, sort, and
credit.
"""
artist_parts, artist_sort_parts, artist_credit_parts = _multi_artist_credit(
credit, include_join_phrase=True
)
return (
"".join(artist_parts),
"".join(artist_sort_parts),
"".join(artist_credit_parts),
)
def _artist_ids(credit: list[JSONDict]) -> list[str]:
"""
Given a list representing an ``artist-credit``,
return a list of artist IDs
"""
artist_ids: list[str] = []
for el in credit:
if isinstance(el, dict):
artist_ids.append(el["artist"]["id"])
return artist_ids
def _get_related_artist_names(relations, relation_type):
"""Given a list representing the artist relationships extract the names of
the remixers and concatenate them.
"""
related_artists = []
for relation in relations:
if relation["type"] == relation_type:
related_artists.append(relation["artist"]["name"])
return ", ".join(related_artists)
def album_url(albumid: str) -> str:
return urljoin(BASE_URL, "release/" + albumid)
def _preferred_release_event(
release: dict[str, Any],
) -> tuple[str | None, str | None]:
"""Given a release, select and return the user's preferred release
event as a tuple of (country, release_date). Fall back to the
default release event if a preferred event is not found.
"""
preferred_countries: Sequence[str] = config["match"]["preferred"][
"countries"
].as_str_seq()
for country in preferred_countries:
for event in release.get("release-event-list", {}):
try:
if country in event["area"]["iso-3166-1-code-list"]:
return country, event["date"]
except KeyError:
pass
return release.get("country"), release.get("date")
def _set_date_str(
info: beets.autotag.hooks.AlbumInfo,
date_str: str,
original: bool = False,
):
"""Given a (possibly partial) YYYY-MM-DD string and an AlbumInfo
object, set the object's release date fields appropriately. If
`original`, then set the original_year, etc., fields.
"""
if date_str:
date_parts = date_str.split("-")
for key in ("year", "month", "day"):
if date_parts:
date_part = date_parts.pop(0)
try:
date_num = int(date_part)
except ValueError:
continue
if original:
key = "original_" + key
setattr(info, key, date_num)
def _parse_id(s: str) -> str | None:
"""Search for a MusicBrainz ID in the given string and return it. If
no ID can be found, return None.
"""
# Find the first thing that looks like a UUID/MBID.
match = re.search("[a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}", s)
if match is not None:
return match.group() if match else None
return None
def _is_translation(r):
_trans_key = "transl-tracklisting"
return r["type"] == _trans_key and r["direction"] == "backward"
def _find_actual_release_from_pseudo_release(
pseudo_rel: JSONDict,
) -> JSONDict | None:
try:
relations = pseudo_rel["release"]["release-relation-list"]
except KeyError:
return None
# currently we only support trans(liter)ation's
translations = [r for r in relations if _is_translation(r)]
if not translations:
return None
actual_id = translations[0]["target"]
return musicbrainzngs.get_release_by_id(actual_id, RELEASE_INCLUDES)
def _merge_pseudo_and_actual_album(
pseudo: beets.autotag.hooks.AlbumInfo, actual: beets.autotag.hooks.AlbumInfo
) -> beets.autotag.hooks.AlbumInfo | None:
"""
Merges a pseudo release with its actual release.
This implementation is naive, it doesn't overwrite fields,
like status or ids.
According to the ticket PICARD-145, the main release id should be used.
But the ticket has been in limbo since over a decade now.
It also suggests the introduction of the tag `musicbrainz_pseudoreleaseid`,
but as of this field can't be found in any official Picard docs,
hence why we did not implement that for now.
"""
merged = pseudo.copy()
from_actual = {
k: actual[k]
for k in [
"media",
"mediums",
"country",
"catalognum",
"year",
"month",
"day",
"original_year",
"original_month",
"original_day",
"label",
"barcode",
"asin",
"style",
"genre",
]
}
merged.update(from_actual)
return merged
class MusicBrainzPlugin(BeetsPlugin):
data_source = "Musicbrainz"
def __init__(self):
"""Set up the python-musicbrainz-ngs module according to settings
from the beets configuration. This should be called at startup.
"""
super().__init__()
self.config.add(
{
"host": "musicbrainz.org",
"https": False,
"ratelimit": 1,
"ratelimit_interval": 1,
"searchlimit": 5,
"genres": False,
"external_ids": {
"discogs": False,
"bandcamp": False,
"spotify": False,
"deezer": False,
"tidal": False,
},
"extra_tags": {},
},
)
hostname = self.config["host"].as_str()
https = self.config["https"].get(bool)
# Only call set_hostname when a custom server is configured. Since
# musicbrainz-ngs connects to musicbrainz.org with HTTPS by default
if hostname != "musicbrainz.org":
musicbrainzngs.set_hostname(hostname, https)
musicbrainzngs.set_rate_limit(
self.config["ratelimit_interval"].as_number(),
self.config["ratelimit"].get(int),
)
def track_info(
self,
recording: JSONDict,
index: int | None = None,
medium: int | None = None,
medium_index: int | None = None,
medium_total: int | None = None,
) -> beets.autotag.hooks.TrackInfo:
"""Translates a MusicBrainz recording result dictionary into a beets
``TrackInfo`` object. Three parameters are optional and are used
only for tracks that appear on releases (non-singletons): ``index``,
the overall track number; ``medium``, the disc number;
``medium_index``, the track's index on its medium; ``medium_total``,
the number of tracks on the medium. Each number is a 1-based index.
"""
info = beets.autotag.hooks.TrackInfo(
title=recording["title"],
track_id=recording["id"],
index=index,
medium=medium,
medium_index=medium_index,
medium_total=medium_total,
data_source="MusicBrainz",
data_url=track_url(recording["id"]),
)
if recording.get("artist-credit"):
# Get the artist names.
(
info.artist,
info.artist_sort,
info.artist_credit,
) = _flatten_artist_credit(recording["artist-credit"])
(
info.artists,
info.artists_sort,
info.artists_credit,
) = _multi_artist_credit(
recording["artist-credit"], include_join_phrase=False
)
info.artists_ids = _artist_ids(recording["artist-credit"])
info.artist_id = info.artists_ids[0]
if recording.get("artist-relation-list"):
info.remixer = _get_related_artist_names(
recording["artist-relation-list"], relation_type="remixer"
)
if recording.get("length"):
info.length = int(recording["length"]) / 1000.0
info.trackdisambig = recording.get("disambiguation")
if recording.get("isrc-list"):
info.isrc = ";".join(recording["isrc-list"])
lyricist = []
composer = []
composer_sort = []
for work_relation in recording.get("work-relation-list", ()):
if work_relation["type"] != "performance":
continue
info.work = work_relation["work"]["title"]
info.mb_workid = work_relation["work"]["id"]
if "disambiguation" in work_relation["work"]:
info.work_disambig = work_relation["work"]["disambiguation"]
for artist_relation in work_relation["work"].get(
"artist-relation-list", ()
):
if "type" in artist_relation:
type = artist_relation["type"]
if type == "lyricist":
lyricist.append(artist_relation["artist"]["name"])
elif type == "composer":
composer.append(artist_relation["artist"]["name"])
composer_sort.append(
artist_relation["artist"]["sort-name"]
)
if lyricist:
info.lyricist = ", ".join(lyricist)
if composer:
info.composer = ", ".join(composer)
info.composer_sort = ", ".join(composer_sort)
arranger = []
for artist_relation in recording.get("artist-relation-list", ()):
if "type" in artist_relation:
type = artist_relation["type"]
if type == "arranger":
arranger.append(artist_relation["artist"]["name"])
if arranger:
info.arranger = ", ".join(arranger)
# Supplementary fields provided by plugins
extra_trackdatas = plugins.send("mb_track_extract", data=recording)
for extra_trackdata in extra_trackdatas:
info.update(extra_trackdata)
return info
def album_info(self, release: JSONDict) -> beets.autotag.hooks.AlbumInfo:
"""Takes a MusicBrainz release result dictionary and returns a beets
AlbumInfo object containing the interesting data about that release.
"""
# Get artist name using join phrases.
artist_name, artist_sort_name, artist_credit_name = (
_flatten_artist_credit(release["artist-credit"])
)
(
artists_names,
artists_sort_names,
artists_credit_names,
) = _multi_artist_credit(
release["artist-credit"], include_join_phrase=False
)
ntracks = sum(len(m["track-list"]) for m in release["medium-list"])
# The MusicBrainz API omits 'artist-relation-list' and 'work-relation-list'
# when the release has more than 500 tracks. So we use browse_recordings
# on chunks of tracks to recover the same information in this case.
if ntracks > BROWSE_MAXTRACKS:
self._log.debug("Album {} has too many tracks", release["id"])
recording_list = []
for i in range(0, ntracks, BROWSE_CHUNKSIZE):
self._log.debug("Retrieving tracks starting at {}", i)
recording_list.extend(
musicbrainzngs.browse_recordings(
release=release["id"],
limit=BROWSE_CHUNKSIZE,
includes=BROWSE_INCLUDES,
offset=i,
)["recording-list"]
)
track_map = {r["id"]: r for r in recording_list}
for medium in release["medium-list"]:
for recording in medium["track-list"]:
recording_info = track_map[recording["recording"]["id"]]
recording["recording"] = recording_info
# Basic info.
track_infos = []
index = 0
for medium in release["medium-list"]:
disctitle = medium.get("title")
format = medium.get("format")
if format in config["match"]["ignored_media"].as_str_seq():
continue
all_tracks = medium["track-list"]
if (
"data-track-list" in medium
and not config["match"]["ignore_data_tracks"]
):
all_tracks += medium["data-track-list"]
track_count = len(all_tracks)
if "pregap" in medium:
all_tracks.insert(0, medium["pregap"])
for track in all_tracks:
if (
"title" in track["recording"]
and track["recording"]["title"] in SKIPPED_TRACKS
):
continue
if (
"video" in track["recording"]
and track["recording"]["video"] == "true"
and config["match"]["ignore_video_tracks"]
):
continue
# Basic information from the recording.
index += 1
ti = self.track_info(
track["recording"],
index,
int(medium["position"]),
int(track["position"]),
track_count,
)
ti.release_track_id = track["id"]
ti.disctitle = disctitle
ti.media = format
ti.track_alt = track["number"]
# Prefer track data, where present, over recording data.
if track.get("title"):
ti.title = track["title"]
if track.get("artist-credit"):
# Get the artist names.
(
ti.artist,
ti.artist_sort,
ti.artist_credit,
) = _flatten_artist_credit(track["artist-credit"])
(
ti.artists,
ti.artists_sort,
ti.artists_credit,
) = _multi_artist_credit(
track["artist-credit"], include_join_phrase=False
)
ti.artists_ids = _artist_ids(track["artist-credit"])
ti.artist_id = ti.artists_ids[0]
if track.get("length"):
ti.length = int(track["length"]) / (1000.0)
track_infos.append(ti)
album_artist_ids = _artist_ids(release["artist-credit"])
info = beets.autotag.hooks.AlbumInfo(
album=release["title"],
album_id=release["id"],
artist=artist_name,
artist_id=album_artist_ids[0],
artists=artists_names,
artists_ids=album_artist_ids,
tracks=track_infos,
mediums=len(release["medium-list"]),
artist_sort=artist_sort_name,
artists_sort=artists_sort_names,
artist_credit=artist_credit_name,
artists_credit=artists_credit_names,
data_source="MusicBrainz",
data_url=album_url(release["id"]),
barcode=release.get("barcode"),
)
info.va = info.artist_id == VARIOUS_ARTISTS_ID
if info.va:
info.artist = config["va_name"].as_str()
info.asin = release.get("asin")
info.releasegroup_id = release["release-group"]["id"]
info.albumstatus = release.get("status")
if release["release-group"].get("title"):
info.release_group_title = release["release-group"].get("title")
# Get the disambiguation strings at the release and release group level.
if release["release-group"].get("disambiguation"):
info.releasegroupdisambig = release["release-group"].get(
"disambiguation"
)
if release.get("disambiguation"):
info.albumdisambig = release.get("disambiguation")
# Get the "classic" Release type. This data comes from a legacy API
# feature before MusicBrainz supported multiple release types.
if "type" in release["release-group"]:
reltype = release["release-group"]["type"]
if reltype:
info.albumtype = reltype.lower()
# Set the new-style "primary" and "secondary" release types.
albumtypes = []
if "primary-type" in release["release-group"]:
rel_primarytype = release["release-group"]["primary-type"]
if rel_primarytype:
albumtypes.append(rel_primarytype.lower())
if "secondary-type-list" in release["release-group"]:
if release["release-group"]["secondary-type-list"]:
for sec_type in release["release-group"]["secondary-type-list"]:
albumtypes.append(sec_type.lower())
info.albumtypes = albumtypes
# Release events.
info.country, release_date = _preferred_release_event(release)
release_group_date = release["release-group"].get("first-release-date")
if not release_date:
# Fall back if release-specific date is not available.
release_date = release_group_date
if release_date:
_set_date_str(info, release_date, False)
_set_date_str(info, release_group_date, True)
# Label name.
if release.get("label-info-list"):
label_info = release["label-info-list"][0]
if label_info.get("label"):
label = label_info["label"]["name"]
if label != "[no label]":
info.label = label
info.catalognum = label_info.get("catalog-number")
# Text representation data.
if release.get("text-representation"):
rep = release["text-representation"]
info.script = rep.get("script")
info.language = rep.get("language")
# Media (format).
if release["medium-list"]:
# If all media are the same, use that medium name
if len({m.get("format") for m in release["medium-list"]}) == 1:
info.media = release["medium-list"][0].get("format")
# Otherwise, let's just call it "Media"
else:
info.media = "Media"
if self.config["genres"]:
sources = [
release["release-group"].get("tag-list", []),
release.get("tag-list", []),
]
genres: Counter[str] = Counter()
for source in sources:
for genreitem in source:
genres[genreitem["name"]] += int(genreitem["count"])
info.genre = "; ".join(
genre
for genre, _count in sorted(genres.items(), key=lambda g: -g[1])
)
# We might find links to external sources (Discogs, Bandcamp, ...)
external_ids = self.config["external_ids"].get()
wanted_sources = {
site for site, wanted in external_ids.items() if wanted
}
if wanted_sources and (url_rels := release.get("url-relation-list")):
urls = {}
for source, url in product(wanted_sources, url_rels):
if f"{source}.com" in (target := url["target"]):
urls[source] = target
self._log.debug(
"Found link to {} release via MusicBrainz",
source.capitalize(),
)
if "discogs" in urls:
info.discogs_albumid = extract_discogs_id_regex(urls["discogs"])
if "bandcamp" in urls:
info.bandcamp_album_id = urls["bandcamp"]
if "spotify" in urls:
info.spotify_album_id = MetadataSourcePlugin._get_id(
"album", urls["spotify"], spotify_id_regex
)
if "deezer" in urls:
info.deezer_album_id = MetadataSourcePlugin._get_id(
"album", urls["deezer"], deezer_id_regex
)
if "beatport" in urls:
info.beatport_album_id = MetadataSourcePlugin._get_id(
"album", urls["beatport"], beatport_id_regex
)
if "tidal" in urls:
info.tidal_album_id = urls["tidal"].split("/")[-1]
extra_albumdatas = plugins.send("mb_album_extract", data=release)
for extra_albumdata in extra_albumdatas:
info.update(extra_albumdata)
return info
def candidates(
self,
items: list[Item],
artist: str,
album: str,
va_likely: bool,
extra_tags: dict[str, Any] | None = None,
) -> Iterator[beets.autotag.hooks.AlbumInfo]:
"""Searches for a single album ("release" in MusicBrainz parlance)
and returns an iterator over AlbumInfo objects. May raise a
MusicBrainzAPIError.
The query consists of an artist name, an album name, and,
optionally, a number of tracks on the album and any other extra tags.
"""
# Build search criteria.
criteria = {"release": album.lower().strip()}
if artist is not None:
criteria["artist"] = artist.lower().strip()
else:
# Various Artists search.
criteria["arid"] = VARIOUS_ARTISTS_ID
if track_count := len(items):
criteria["tracks"] = str(track_count)
if self.config["extra_tags"]:
tag_list = self.config["extra_tags"].get()
self._log.debug("Additional search terms: {0}", tag_list)
for tag, value in tag_list.items():
if key := FIELDS_TO_MB_KEYS.get(tag):
value = str(value).lower().strip()
if key == "catno":
value = value.replace(" ", "")
if value:
criteria[key] = value
# Abort if we have no search terms.
if not any(criteria.values()):
return
try:
self._log.debug(
"Searching for MusicBrainz releases with: {!r}", criteria
)
res = musicbrainzngs.search_releases(
limit=self.config["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "release search", criteria, traceback.format_exc()
)
for release in res["release-list"]:
# The search result is missing some data (namely, the tracks),
# so we just use the ID and fetch the rest of the information.
albuminfo = self.album_for_id(release["id"])
if albuminfo is not None:
yield albuminfo
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterator[beets.autotag.hooks.TrackInfo]:
"""Searches for a single track and returns an iterable of TrackInfo
objects. May raise a MusicBrainzAPIError.
"""
criteria = {
"artist": artist.lower().strip(),
"recording": title.lower().strip(),
}
if not any(criteria.values()):
return
try:
res = musicbrainzngs.search_recordings(
limit=self.config["searchlimit"].get(int), **criteria
)
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "recording search", criteria, traceback.format_exc()
)
for recording in res["recording-list"]:
yield self.track_info(recording)
def album_for_id(
self, album_id: str
) -> beets.autotag.hooks.AlbumInfo | None:
"""Fetches an album by its MusicBrainz ID and returns an AlbumInfo
object or None if the album is not found. May raise a
MusicBrainzAPIError.
"""
self._log.debug("Requesting MusicBrainz release {}", album_id)
albumid = _parse_id(album_id)
if not albumid:
self._log.debug("Invalid MBID ({0}).", album_id)
return None
try:
res = musicbrainzngs.get_release_by_id(albumid, RELEASE_INCLUDES)
# resolve linked release relations
actual_res = None
if res["release"].get("status") == "Pseudo-Release":
actual_res = _find_actual_release_from_pseudo_release(res)
except musicbrainzngs.ResponseError:
self._log.debug("Album ID match failed.")
return None
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "get release by ID", albumid, traceback.format_exc()
)
# release is potentially a pseudo release
release = self.album_info(res["release"])
# should be None unless we're dealing with a pseudo release
if actual_res is not None:
actual_release = self.album_info(actual_res["release"])
return _merge_pseudo_and_actual_album(release, actual_release)
else:
return release
def track_for_id(
self, track_id: str
) -> beets.autotag.hooks.TrackInfo | None:
"""Fetches a track by its MusicBrainz ID. Returns a TrackInfo object
or None if no track is found. May raise a MusicBrainzAPIError.
"""
trackid = _parse_id(track_id)
if not trackid:
self._log.debug("Invalid MBID ({0}).", track_id)
return None
try:
res = musicbrainzngs.get_recording_by_id(trackid, TRACK_INCLUDES)
except musicbrainzngs.ResponseError:
self._log.debug("Track ID match failed.")
return None
except musicbrainzngs.MusicBrainzError as exc:
raise MusicBrainzAPIError(
exc, "get recording by ID", trackid, traceback.format_exc()
)
return self.track_info(res["recording"])

View file

@ -10,14 +10,23 @@ New features:
Bug fixes:
For packagers:
Other changes:
2.3.1 (May 14, 2025)
--------------------
Bug fixes:
* :doc:`/reference/pathformat`: Fixed a regression where path legalization
incorrectly removed parts of user-configured path formats that followed a dot
(**.**).
:bug:`5771`
For packagers:
Other changes:
* Force ``poetry`` version below 2 to avoid it mangling file modification times
in ``sdist`` package.
:bug:`5770`
2.3.0 (May 07, 2025)
--------------------
@ -27,6 +36,13 @@ been dropped.
New features:
* :doc:`plugins/musicbrainz`: The MusicBrainz autotagger has been moved to
a separate plugin. The default :ref:`plugins-config` includes `musicbrainz`,
but if you've customized your `plugins` list in your configuration, you'll
need to explicitly add `musicbrainz` to continue using this functionality.
Configuration option `musicbrainz.enabled` has thus been deprecated.
:bug:`2686`
:bug:`4605`
* :doc:`plugins/lastgenre`: The new configuration option, ``keep_existing``,
provides more fine-grained control over how pre-populated genre tags are
handled. The ``force`` option now behaves in a more conventional manner.
@ -115,8 +131,8 @@ Other changes:
:bug:`5539`
* :doc:`/plugins/smartplaylist`: URL-encode additional item `fields` within generated
EXTM3U playlists instead of JSON-encoding them.
* typehints: `./beets/importer.py` file now has improved typehints.
* typehints: `./beets/plugins.py` file now includes typehints.
* typehints: `./beets/importer.py` file now has improved typehints.
* typehints: `./beets/plugins.py` file now includes typehints.
* :doc:`plugins/ftintitle`: Optimize the plugin by avoiding unnecessary writes
to the database.
* Database models are now serializable with pickle.

View file

@ -12,7 +12,7 @@ project = "beets"
copyright = "2016, Adrian Sampson"
version = "2.3"
release = "2.3.0"
release = "2.3.1"
pygments_style = "sphinx"

View file

@ -13,7 +13,9 @@ Using Plugins
-------------
To use one of the plugins included with beets (see the rest of this page for a
list), just use the ``plugins`` option in your :doc:`config.yaml </reference/config>` file, like so::
list), just use the ``plugins`` option in your :doc:`config.yaml </reference/config>` file:
.. code-block:: sh
plugins: inline convert web
@ -21,7 +23,9 @@ The value for ``plugins`` can be a space-separated list of plugin names or a
YAML list like ``[foo, bar]``. You can see which plugins are currently enabled
by typing ``beet version``.
Each plugin has its own set of options that can be defined in a section bearing its name::
Each plugin has its own set of options that can be defined in a section bearing its name:
.. code-block:: yaml
plugins: inline convert web
@ -30,10 +34,11 @@ Each plugin has its own set of options that can be defined in a section bearing
Some plugins have special dependencies that you'll need to install. The
documentation page for each plugin will list them in the setup instructions.
For some, you can use ``pip``'s "extras" feature to install the dependencies,
like this::
For some, you can use ``pip``'s "extras" feature to install the dependencies:
pip install beets[fetchart,lyrics,lastgenre]
.. code-block:: sh
pip install "beets[fetchart,lyrics,lastgenre]"
.. _metadata-source-plugin-configuration:
@ -48,7 +53,9 @@ plugins share the following configuration option:
Default: ``0.5``.
For example, to equally consider matches from Discogs and MusicBrainz add the
following to your configuration::
following to your configuration:
.. code-block:: yaml
plugins: discogs
@ -111,6 +118,7 @@ following to your configuration::
missing
mpdstats
mpdupdate
musicbrainz
parentwork
permissions
play
@ -142,21 +150,26 @@ Autotagger Extensions
Use acoustic fingerprinting to identify audio files with
missing or incorrect metadata.
:doc:`discogs <discogs>`
Search for releases in the `Discogs`_ database.
:doc:`spotify <spotify>`
Search for releases in the `Spotify`_ database.
:doc:`deezer <deezer>`
Search for releases in the `Deezer`_ database.
:doc:`discogs <discogs>`
Search for releases in the `Discogs`_ database.
:doc:`fromfilename <fromfilename>`
Guess metadata for untagged tracks from their filenames.
.. _Discogs: https://www.discogs.com/
:doc:`musicbrainz <musicbrainz>`
Search for releases in the `MusicBrainz`_ database.
:doc:`spotify <spotify>`
Search for releases in the `Spotify`_ database.
.. _Deezer: https://www.deezer.com
.. _Discogs: https://www.discogs.com
.. _MusicBrainz: https://www.musicbrainz.com
.. _Spotify: https://www.spotify.com
.. _Deezer: https://www.deezer.com/
Metadata
--------
@ -465,6 +478,10 @@ Here are a few of the plugins written by the beets community:
`dsedivec`_
Has two plugins: ``edit`` and ``moveall``.
`beets-filetote`_
Helps bring non-music extra files, attachments, and artifacts during
imports and CLI file manipulation actions (`beet move`, etc.).
`beets-follow`_
Lets you check for new albums from artists you like.
@ -560,6 +577,7 @@ Here are a few of the plugins written by the beets community:
.. _cmus: http://cmus.sourceforge.net/
.. _beet-amazon: https://github.com/jmwatte/beet-amazon
.. _beets-alternatives: https://github.com/geigerzaehler/beets-alternatives
.. _beets-filetote: https://github.com/gtronset/beets-filetote
.. _beets-follow: https://github.com/nolsto/beets-follow
.. _beets-ibroadcast: https://github.com/ctrueden/beets-ibroadcast
.. _iBroadcast: https://ibroadcast.com/

View file

@ -0,0 +1,153 @@
MusicBrainz Plugin
==================
The ``musicbrainz`` plugin extends the autotagger's search capabilities to
include matches from the `MusicBrainz`_ database.
.. _MusicBrainz: https://musicbrainz.org/
Installation
------------
To use the ``musicbrainz`` plugin, enable it in your configuration (see
:ref:`using-plugins`)
.. _musicbrainz-config:
Configuration
-------------
Default
^^^^^^^
.. code-block:: yaml
musicbrainz:
host: musicbrainz.org
https: no
ratelimit: 1
ratelimit_interval: 1.0
searchlimit: 5
extra_tags: []
genres: no
external_ids:
discogs: no
bandcamp: no
spotify: no
deezer: no
beatport: no
tidal: no
You can instruct beets to use `your own MusicBrainz database`_ instead of
the `main server`_. Use the ``host``, ``https`` and ``ratelimit`` options
under a ``musicbrainz:`` header, like so
.. code-block:: yaml
musicbrainz:
host: localhost:5000
https: no
ratelimit: 100
The ``host`` key, of course, controls the Web server hostname (and port,
optionally) that will be contacted by beets (default: musicbrainz.org).
The ``https`` key makes the client use HTTPS instead of HTTP. This setting applies
only to custom servers. The official MusicBrainz server always uses HTTPS. (Default: no.)
The server must have search indices enabled (see `Building search indexes`_).
The ``ratelimit`` option, an integer, controls the number of Web service requests
per second (default: 1). **Do not change the rate limit setting** if you're
using the main MusicBrainz server---on this public server, you're `limited`_
to one request per second.
.. _your own MusicBrainz database: https://musicbrainz.org/doc/MusicBrainz_Server/Setup
.. _main server: https://musicbrainz.org/
.. _limited: https://musicbrainz.org/doc/XML_Web_Service/Rate_Limiting
.. _Building search indexes: https://musicbrainz.org/doc/Development/Search_server_setup
.. _musicbrainz.enabled:
enabled
~~~~~~~
.. deprecated:: 2.3
Add `musicbrainz` to the `plugins` list instead.
This option allows you to disable using MusicBrainz as a metadata source. This applies
if you use plugins that fetch data from alternative sources and should make the import
process quicker.
Default: ``yes``.
.. _searchlimit:
searchlimit
~~~~~~~~~~~
The number of matches returned when sending search queries to the
MusicBrainz server.
Default: ``5``.
.. _extra_tags:
extra_tags
~~~~~~~~~~
By default, beets will use only the artist, album, and track count to query
MusicBrainz. Additional tags to be queried can be supplied with the
``extra_tags`` setting. For example
.. code-block:: yaml
musicbrainz:
extra_tags: [year, catalognum, country, media, label]
This setting should improve the autotagger results if the metadata with the
given tags match the metadata returned by MusicBrainz.
Note that the only tags supported by this setting are the ones listed in the
above example.
Default: ``[]``
.. _genres:
genres
~~~~~~
Use MusicBrainz genre tags to populate (and replace if it's already set) the
``genre`` tag. This will make it a list of all the genres tagged for the
release and the release-group on MusicBrainz, separated by "; " and sorted by
the total number of votes.
Default: ``no``
.. _musicbrainz.external_ids:
external_ids
~~~~~~~~~~~~
Set any of the ``external_ids`` options to ``yes`` to enable the MusicBrainz
importer to look for links to related metadata sources. If such a link is
available the release ID will be extracted from the URL provided and imported
to the beets library
.. code-block:: yaml
musicbrainz:
external_ids:
discogs: yes
spotify: yes
bandcamp: yes
beatport: yes
deezer: yes
tidal: yes
The library fields of the corresponding :ref:`autotagger_extensions` are used
to save the data (``discogs_albumid``, ``bandcamp_album_id``,
``spotify_album_id``, ``beatport_album_id``, ``deezer_album_id``,
``tidal_album_id``). On re-imports existing data will be overwritten.
The default of all options is ``no``.

View file

@ -58,6 +58,8 @@ directory
The directory to which files will be copied/moved when adding them to the
library. Defaults to a folder called ``Music`` in your home directory.
.. _plugins-config:
plugins
~~~~~~~
@ -874,115 +876,6 @@ This feature is currently supported by the :doc:`/plugins/discogs` and the
Default: ``yes``.
.. _musicbrainz-config:
MusicBrainz Options
-------------------
You can instruct beets to use `your own MusicBrainz database`_ instead of
the `main server`_. Use the ``host``, ``https`` and ``ratelimit`` options
under a ``musicbrainz:`` header, like so::
musicbrainz:
host: localhost:5000
https: no
ratelimit: 100
The ``host`` key, of course, controls the Web server hostname (and port,
optionally) that will be contacted by beets (default: musicbrainz.org).
The ``https`` key makes the client use HTTPS instead of HTTP. This setting applies
only to custom servers. The official MusicBrainz server always uses HTTPS. (Default: no.)
The server must have search indices enabled (see `Building search indexes`_).
The ``ratelimit`` option, an integer, controls the number of Web service requests
per second (default: 1). **Do not change the rate limit setting** if you're
using the main MusicBrainz server---on this public server, you're `limited`_
to one request per second.
.. _your own MusicBrainz database: https://musicbrainz.org/doc/MusicBrainz_Server/Setup
.. _main server: https://musicbrainz.org/
.. _limited: https://musicbrainz.org/doc/XML_Web_Service/Rate_Limiting
.. _Building search indexes: https://musicbrainz.org/doc/Development/Search_server_setup
.. _musicbrainz.enabled:
enabled
~~~~~~~
This option allows you to disable using MusicBrainz as a metadata source. This applies
if you use plugins that fetch data from alternative sources and should make the import
process quicker.
Default: ``yes``.
.. _searchlimit:
searchlimit
~~~~~~~~~~~
The number of matches returned when sending search queries to the
MusicBrainz server.
Default: ``5``.
.. _extra_tags:
extra_tags
~~~~~~~~~~
By default, beets will use only the artist, album, and track count to query
MusicBrainz. Additional tags to be queried can be supplied with the
``extra_tags`` setting. For example::
musicbrainz:
extra_tags: [year, catalognum, country, media, label]
This setting should improve the autotagger results if the metadata with the
given tags match the metadata returned by MusicBrainz.
Note that the only tags supported by this setting are the ones listed in the
above example.
Default: ``[]``
.. _genres:
genres
~~~~~~
Use MusicBrainz genre tags to populate (and replace if it's already set) the
``genre`` tag. This will make it a list of all the genres tagged for the
release and the release-group on MusicBrainz, separated by "; " and sorted by
the total number of votes.
Default: ``no``
.. _musicbrainz.external_ids:
external_ids
~~~~~~~~~~~~
Set any of the ``external_ids`` options to ``yes`` to enable the MusicBrainz
importer to look for links to related metadata sources. If such a link is
available the release ID will be extracted from the URL provided and imported
to the beets library::
musicbrainz:
external_ids:
discogs: yes
spotify: yes
bandcamp: yes
beatport: yes
deezer: yes
tidal: yes
The library fields of the corresponding :ref:`autotagger_extensions` are used
to save the data (``discogs_albumid``, ``bandcamp_album_id``,
``spotify_album_id``, ``beatport_album_id``, ``deezer_album_id``,
``tidal_album_id``). On re-imports existing data will be overwritten.
The default of all options is ``no``.
.. _match-config:
Autotagger Matching Options

471
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "beets"
version = "2.3.0"
version = "2.3.1"
description = "music tagger and library organizer"
authors = ["Adrian Sampson <adrian@radbox.org>"]
maintainers = ["Serene-Arc"]
@ -159,7 +159,7 @@ build-backend = "poetry.core.masonry.api"
[tool.pipx-install]
poethepoet = ">=0.26"
poetry = ">=1.8"
poetry = ">=1.8,<2"
[tool.poe.tasks.build]
help = "Build the package"

View file

@ -16,9 +16,9 @@
from collections.abc import Sequence
from beets.autotag.mb import VARIOUS_ARTISTS_ID
from beets.test.helper import PluginTestCase
from beetsplug.albumtypes import AlbumTypesPlugin
from beetsplug.musicbrainz import VARIOUS_ARTISTS_ID
class AlbumTypesPluginTest(PluginTestCase):

View file

@ -19,9 +19,9 @@ from beets.dbcore.query import TrueQuery
from beets.library import Item
from beets.test import _common
from beets.test.helper import (
AutotagImportTestCase,
AutotagStub,
BeetsTestCase,
ImportTestCase,
PluginMixin,
TerminalImportMixin,
control_stdin,
@ -316,10 +316,12 @@ class EditCommandTest(EditMixin, BeetsTestCase):
@_common.slow_test()
class EditDuringImporterTestCase(
EditMixin, TerminalImportMixin, ImportTestCase
EditMixin, TerminalImportMixin, AutotagImportTestCase
):
"""TODO"""
matching = AutotagStub.GOOD
IGNORED = ["added", "album_id", "id", "mtime", "path"]
def setUp(self):
@ -327,12 +329,6 @@ class EditDuringImporterTestCase(
# Create some mediafiles, and store them for comparison.
self.prepare_album_for_import(1)
self.items_orig = [Item.from_path(f.path) for f in self.import_media]
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.GOOD
def tearDown(self):
super().tearDown()
self.matcher.restore()
@_common.slow_test()

View file

@ -20,7 +20,7 @@ import os
import pytest
from beets import importer
from beets.test.helper import AutotagStub, ImportTestCase, PluginMixin
from beets.test.helper import AutotagImportTestCase, PluginMixin
from beets.util import displayable_path, syspath
from beetsplug.importadded import ImportAddedPlugin
@ -41,7 +41,7 @@ def modify_mtimes(paths, offset=-60000):
os.utime(syspath(path), (mstat.st_atime, mstat.st_mtime + offset * i))
class ImportAddedTest(PluginMixin, ImportTestCase):
class ImportAddedTest(PluginMixin, AutotagImportTestCase):
# The minimum mtime of the files to be imported
plugin = "importadded"
min_mtime = None
@ -56,15 +56,9 @@ class ImportAddedTest(PluginMixin, ImportTestCase):
self.min_mtime = min(
os.path.getmtime(mfile.path) for mfile in self.import_media
)
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.IDENT
self.importer = self.setup_importer()
self.importer.add_choice(importer.Action.APPLY)
def tearDown(self):
super().tearDown()
self.matcher.restore()
def find_media_file(self, item):
"""Find the pre-import MediaFile for an Item"""
for m in self.import_media:

View file

@ -14,8 +14,7 @@
from beets.test.helper import (
AutotagStub,
ImportTestCase,
AutotagImportTestCase,
PluginMixin,
TerminalImportMixin,
capture_stdout,
@ -23,23 +22,18 @@ from beets.test.helper import (
)
class MBSubmitPluginTest(PluginMixin, TerminalImportMixin, ImportTestCase):
class MBSubmitPluginTest(
PluginMixin, TerminalImportMixin, AutotagImportTestCase
):
plugin = "mbsubmit"
def setUp(self):
super().setUp()
self.prepare_album_for_import(2)
self.setup_importer()
self.matcher = AutotagStub().install()
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_print_tracks_output(self):
"""Test the output of the "print tracks" choice."""
self.matcher.matching = AutotagStub.BAD
with capture_stdout() as output:
with control_stdin("\n".join(["p", "s"])):
# Print tracks; Skip
@ -55,8 +49,6 @@ class MBSubmitPluginTest(PluginMixin, TerminalImportMixin, ImportTestCase):
def test_print_tracks_output_as_tracks(self):
"""Test the output of the "print tracks" choice, as singletons."""
self.matcher.matching = AutotagStub.BAD
with capture_stdout() as output:
with control_stdin("\n".join(["t", "s", "p", "s"])):
# as Tracks; Skip; Print tracks; Skip

View file

@ -17,11 +17,17 @@
from unittest import mock
from beets import config
from beets.autotag import mb
from beets.test.helper import BeetsTestCase
from beetsplug import musicbrainz
class MBAlbumInfoTest(BeetsTestCase):
class MusicBrainzTestCase(BeetsTestCase):
def setUp(self):
super().setUp()
self.mb = musicbrainz.MusicBrainzPlugin()
class MBAlbumInfoTest(MusicBrainzTestCase):
def _make_release(
self,
date_str="2009",
@ -210,7 +216,7 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_parse_release_with_year(self):
release = self._make_release("1984")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.album == "ALBUM TITLE"
assert d.album_id == "ALBUM ID"
assert d.artist == "ARTIST NAME"
@ -221,12 +227,12 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_parse_release_type(self):
release = self._make_release("1984")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.albumtype == "album"
def test_parse_release_full_date(self):
release = self._make_release("1987-03-31")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.original_year == 1987
assert d.original_month == 3
assert d.original_day == 31
@ -238,7 +244,7 @@ class MBAlbumInfoTest(BeetsTestCase):
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
t = d.tracks
assert len(t) == 2
assert t[0].title == "TITLE ONE"
@ -255,7 +261,7 @@ class MBAlbumInfoTest(BeetsTestCase):
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
t = d.tracks
assert t[0].medium_index == 1
assert t[0].index == 1
@ -269,7 +275,7 @@ class MBAlbumInfoTest(BeetsTestCase):
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.mediums == 1
t = d.tracks
assert t[0].medium == 1
@ -296,7 +302,7 @@ class MBAlbumInfoTest(BeetsTestCase):
}
)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.mediums == 2
t = d.tracks
assert t[0].medium == 1
@ -308,79 +314,81 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_parse_release_year_month_only(self):
release = self._make_release("1987-03")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.original_year == 1987
assert d.original_month == 3
def test_no_durations(self):
tracks = [self._make_track("TITLE", "ID", None)]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.tracks[0].length is None
def test_track_length_overrides_recording_length(self):
tracks = [self._make_track("TITLE", "ID", 1.0 * 1000.0)]
release = self._make_release(tracks=tracks, track_length=2.0 * 1000.0)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.tracks[0].length == 2.0
def test_no_release_date(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert not d.original_year
assert not d.original_month
assert not d.original_day
def test_various_artists_defaults_false(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert not d.va
def test_detect_various_artists(self):
release = self._make_release(None)
release["artist-credit"][0]["artist"]["id"] = mb.VARIOUS_ARTISTS_ID
d = mb.album_info(release)
release["artist-credit"][0]["artist"]["id"] = (
musicbrainz.VARIOUS_ARTISTS_ID
)
d = self.mb.album_info(release)
assert d.va
def test_parse_artist_sort_name(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.artist_sort == "ARTIST SORT NAME"
def test_parse_releasegroupid(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.releasegroup_id == "RELEASE GROUP ID"
def test_parse_asin(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.asin == "ALBUM ASIN"
def test_parse_catalognum(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.catalognum == "CATALOG NUMBER"
def test_parse_textrepr(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.script == "SCRIPT"
assert d.language == "LANGUAGE"
def test_parse_country(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.country == "COUNTRY"
def test_parse_status(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.albumstatus == "STATUS"
def test_parse_barcode(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.barcode == "BARCODE"
def test_parse_media(self):
@ -389,12 +397,12 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(None, tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.media == "FORMAT"
def test_parse_disambig(self):
release = self._make_release(None)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.albumdisambig == "R_DISAMBIGUATION"
assert d.releasegroupdisambig == "RG_DISAMBIGUATION"
@ -404,7 +412,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(None, tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
t = d.tracks
assert t[0].disctitle == "MEDIUM TITLE"
assert t[1].disctitle == "MEDIUM TITLE"
@ -412,13 +420,13 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_missing_language(self):
release = self._make_release(None)
del release["text-representation"]["language"]
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.language is None
def test_parse_recording_artist(self):
tracks = [self._make_track("a", "b", 1, True)]
release = self._make_release(None, tracks=tracks)
track = mb.album_info(release).tracks[0]
track = self.mb.album_info(release).tracks[0]
assert track.artist == "RECORDING ARTIST NAME"
assert track.artist_id == "RECORDING ARTIST ID"
assert track.artist_sort == "RECORDING ARTIST SORT NAME"
@ -427,7 +435,7 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_parse_recording_artist_multi(self):
tracks = [self._make_track("a", "b", 1, True, multi_artist_credit=True)]
release = self._make_release(None, tracks=tracks)
track = mb.album_info(release).tracks[0]
track = self.mb.album_info(release).tracks[0]
assert track.artist == "RECORDING ARTIST NAME & RECORDING ARTIST 2 NAME"
assert track.artist_id == "RECORDING ARTIST ID"
assert (
@ -459,7 +467,7 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_track_artist_overrides_recording_artist(self):
tracks = [self._make_track("a", "b", 1, True)]
release = self._make_release(None, tracks=tracks, track_artist=True)
track = mb.album_info(release).tracks[0]
track = self.mb.album_info(release).tracks[0]
assert track.artist == "TRACK ARTIST NAME"
assert track.artist_id == "TRACK ARTIST ID"
assert track.artist_sort == "TRACK ARTIST SORT NAME"
@ -470,7 +478,7 @@ class MBAlbumInfoTest(BeetsTestCase):
release = self._make_release(
None, tracks=tracks, track_artist=True, multi_artist_credit=True
)
track = mb.album_info(release).tracks[0]
track = self.mb.album_info(release).tracks[0]
assert track.artist == "TRACK ARTIST NAME & TRACK ARTIST 2 NAME"
assert track.artist_id == "TRACK ARTIST ID"
assert (
@ -495,12 +503,12 @@ class MBAlbumInfoTest(BeetsTestCase):
def test_parse_recording_remixer(self):
tracks = [self._make_track("a", "b", 1, remixer=True)]
release = self._make_release(None, tracks=tracks)
track = mb.album_info(release).tracks[0]
track = self.mb.album_info(release).tracks[0]
assert track.remixer == "RECORDING REMIXER ARTIST NAME"
def test_data_source(self):
release = self._make_release()
d = mb.album_info(release)
d = self.mb.album_info(release)
assert d.data_source == "MusicBrainz"
def test_ignored_media(self):
@ -510,7 +518,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(tracks=tracks, medium_format="IGNORED1")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 0
def test_no_ignored_media(self):
@ -520,7 +528,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(tracks=tracks, medium_format="NON-IGNORED")
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 2
def test_skip_data_track(self):
@ -530,7 +538,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 2
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -546,7 +554,7 @@ class MBAlbumInfoTest(BeetsTestCase):
)
]
release = self._make_release(tracks=tracks, data_tracks=data_tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 2
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -563,7 +571,7 @@ class MBAlbumInfoTest(BeetsTestCase):
)
]
release = self._make_release(tracks=tracks, data_tracks=data_tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 3
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -578,7 +586,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 2
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -594,7 +602,7 @@ class MBAlbumInfoTest(BeetsTestCase):
)
]
release = self._make_release(tracks=tracks, data_tracks=data_tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 2
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -610,7 +618,7 @@ class MBAlbumInfoTest(BeetsTestCase):
self._make_track("TITLE TWO", "ID TWO", 200.0 * 1000.0),
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 3
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE VIDEO"
@ -629,7 +637,7 @@ class MBAlbumInfoTest(BeetsTestCase):
)
]
release = self._make_release(tracks=tracks, data_tracks=data_tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
assert len(d.tracks) == 3
assert d.tracks[0].title == "TITLE ONE"
assert d.tracks[1].title == "TITLE TWO"
@ -647,7 +655,7 @@ class MBAlbumInfoTest(BeetsTestCase):
]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
d = self.mb.album_info(release)
t = d.tracks
assert len(t) == 2
assert t[0].trackdisambig is None
@ -657,18 +665,18 @@ class MBAlbumInfoTest(BeetsTestCase):
class ParseIDTest(BeetsTestCase):
def test_parse_id_correct(self):
id_string = "28e32c71-1450-463e-92bf-e0a46446fc11"
out = mb._parse_id(id_string)
out = musicbrainz._parse_id(id_string)
assert out == id_string
def test_parse_id_non_id_returns_none(self):
id_string = "blah blah"
out = mb._parse_id(id_string)
out = musicbrainz._parse_id(id_string)
assert out is None
def test_parse_id_url_finds_id(self):
id_string = "28e32c71-1450-463e-92bf-e0a46446fc11"
id_url = "https://musicbrainz.org/entity/%s" % id_string
out = mb._parse_id(id_url)
out = musicbrainz._parse_id(id_url)
assert out == id_string
@ -696,24 +704,28 @@ class ArtistFlatteningTest(BeetsTestCase):
def test_single_artist(self):
credit = [self._credit_dict()]
a, s, c = mb._flatten_artist_credit(credit)
a, s, c = musicbrainz._flatten_artist_credit(credit)
assert a == "NAME"
assert s == "SORT"
assert c == "CREDIT"
a, s, c = mb._multi_artist_credit(credit, include_join_phrase=False)
a, s, c = musicbrainz._multi_artist_credit(
credit, include_join_phrase=False
)
assert a == ["NAME"]
assert s == ["SORT"]
assert c == ["CREDIT"]
def test_two_artists(self):
credit = [self._credit_dict("a"), " AND ", self._credit_dict("b")]
a, s, c = mb._flatten_artist_credit(credit)
a, s, c = musicbrainz._flatten_artist_credit(credit)
assert a == "NAMEa AND NAMEb"
assert s == "SORTa AND SORTb"
assert c == "CREDITa AND CREDITb"
a, s, c = mb._multi_artist_credit(credit, include_join_phrase=False)
a, s, c = musicbrainz._multi_artist_credit(
credit, include_join_phrase=False
)
assert a == ["NAMEa", "NAMEb"]
assert s == ["SORTa", "SORTb"]
assert c == ["CREDITa", "CREDITb"]
@ -730,36 +742,36 @@ class ArtistFlatteningTest(BeetsTestCase):
# test no alias
config["import"]["languages"] = [""]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("NAME", "SORT", "CREDIT")
# test en primary
config["import"]["languages"] = ["en"]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("ALIASen", "ALIASSORTen", "CREDIT")
# test en_GB en primary
config["import"]["languages"] = ["en_GB", "en"]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("ALIASen_GB", "ALIASSORTen_GB", "CREDIT")
# test en en_GB primary
config["import"]["languages"] = ["en", "en_GB"]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("ALIASen", "ALIASSORTen", "CREDIT")
# test fr primary
config["import"]["languages"] = ["fr"]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("ALIASfr_P", "ALIASSORTfr_P", "CREDIT")
# test for not matching non-primary
config["import"]["languages"] = ["pt_BR", "fr"]
flat = mb._flatten_artist_credit([credit_dict])
flat = musicbrainz._flatten_artist_credit([credit_dict])
assert flat == ("ALIASfr_P", "ALIASSORTfr_P", "CREDIT")
class MBLibraryTest(BeetsTestCase):
class MBLibraryTest(MusicBrainzTestCase):
def test_match_track(self):
with mock.patch("musicbrainzngs.search_recordings") as p:
p.return_value = {
@ -771,13 +783,13 @@ class MBLibraryTest(BeetsTestCase):
}
],
}
ti = list(mb.match_track("hello", "there"))[0]
ti = list(self.mb.item_candidates(None, "hello", "there"))[0]
p.assert_called_with(artist="hello", recording="there", limit=5)
assert ti.title == "foo"
assert ti.track_id == "bar"
def test_match_album(self):
def test_candidates(self):
mbid = "d2a6f856-b553-40a0-ac54-a321e8e2da99"
with mock.patch("musicbrainzngs.search_releases") as sp:
sp.return_value = {
@ -824,7 +836,7 @@ class MBLibraryTest(BeetsTestCase):
}
}
ai = list(mb.match_album("hello", "there"))[0]
ai = list(self.mb.candidates([], "hello", "there", False))[0]
sp.assert_called_with(artist="hello", release="there", limit=5)
gp.assert_called_with(mbid, mock.ANY)
@ -833,13 +845,13 @@ class MBLibraryTest(BeetsTestCase):
def test_match_track_empty(self):
with mock.patch("musicbrainzngs.search_recordings") as p:
til = list(mb.match_track(" ", " "))
til = list(self.mb.item_candidates(None, " ", " "))
assert not p.called
assert til == []
def test_match_album_empty(self):
def test_candidates_empty(self):
with mock.patch("musicbrainzngs.search_releases") as p:
ail = list(mb.match_album(" ", " "))
ail = list(self.mb.candidates([], " ", " ", False))
assert not p.called
assert ail == []
@ -927,7 +939,7 @@ class MBLibraryTest(BeetsTestCase):
with mock.patch("musicbrainzngs.get_release_by_id") as gp:
gp.side_effect = side_effect
album = mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
album = self.mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
assert album.country == "COUNTRY"
def test_pseudo_releases_with_empty_links(self):
@ -972,7 +984,7 @@ class MBLibraryTest(BeetsTestCase):
with mock.patch("musicbrainzngs.get_release_by_id") as gp:
gp.side_effect = side_effect
album = mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
album = self.mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
assert album.country is None
def test_pseudo_releases_without_links(self):
@ -1016,7 +1028,7 @@ class MBLibraryTest(BeetsTestCase):
with mock.patch("musicbrainzngs.get_release_by_id") as gp:
gp.side_effect = side_effect
album = mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
album = self.mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
assert album.country is None
def test_pseudo_releases_with_unsupported_links(self):
@ -1067,5 +1079,5 @@ class MBLibraryTest(BeetsTestCase):
with mock.patch("musicbrainzngs.get_release_by_id") as gp:
gp.side_effect = side_effect
album = mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
album = self.mb.album_for_id("d2a6f856-b553-40a0-ac54-a321e8e2da02")
assert album.country is None

View file

@ -0,0 +1,37 @@
import os
from mediafile import MediaFile
from beets.test.helper import AsIsImporterMixin, ImportTestCase, PluginMixin
class ScrubbedImportTest(AsIsImporterMixin, PluginMixin, ImportTestCase):
db_on_disk = True
plugin = "scrub"
def test_tags_not_scrubbed(self):
with self.configure_plugin({"auto": False}):
self.run_asis_importer(write=True)
for item in self.lib.items():
imported_file = MediaFile(os.path.join(item.path))
assert imported_file.artist == "Tag Artist"
assert imported_file.album == "Tag Album"
def test_tags_restored(self):
with self.configure_plugin({"auto": True}):
self.run_asis_importer(write=True)
for item in self.lib.items():
imported_file = MediaFile(os.path.join(item.path))
assert imported_file.artist == "Tag Artist"
assert imported_file.album == "Tag Album"
def test_tags_not_restored(self):
with self.configure_plugin({"auto": True}):
self.run_asis_importer(write=False)
for item in self.lib.items():
imported_file = MediaFile(os.path.join(item.path))
assert imported_file.artist is None
assert imported_file.album is None

View file

@ -39,6 +39,7 @@ from beets.test import _common
from beets.test.helper import (
NEEDS_REFLINK,
AsIsImporterMixin,
AutotagImportTestCase,
AutotagStub,
BeetsTestCase,
ImportTestCase,
@ -49,53 +50,6 @@ from beets.test.helper import (
from beets.util import bytestring_path, displayable_path, syspath
class ScrubbedImportTest(AsIsImporterMixin, PluginMixin, ImportTestCase):
db_on_disk = True
plugin = "scrub"
def test_tags_not_scrubbed(self):
config["plugins"] = ["scrub"]
config["scrub"]["auto"] = False
config["import"]["write"] = True
for mediafile in self.import_media:
assert mediafile.artist == "Tag Artist"
assert mediafile.album == "Tag Album"
self.run_asis_importer()
for item in self.lib.items():
imported_file = os.path.join(item.path)
imported_file = MediaFile(imported_file)
assert imported_file.artist == "Tag Artist"
assert imported_file.album == "Tag Album"
def test_tags_restored(self):
config["plugins"] = ["scrub"]
config["scrub"]["auto"] = True
config["import"]["write"] = True
for mediafile in self.import_media:
assert mediafile.artist == "Tag Artist"
assert mediafile.album == "Tag Album"
self.run_asis_importer()
for item in self.lib.items():
imported_file = os.path.join(item.path)
imported_file = MediaFile(imported_file)
assert imported_file.artist == "Tag Artist"
assert imported_file.album == "Tag Album"
def test_tags_not_restored(self):
config["plugins"] = ["scrub"]
config["scrub"]["auto"] = True
config["import"]["write"] = False
for mediafile in self.import_media:
assert mediafile.artist == "Tag Artist"
assert mediafile.album == "Tag Album"
self.run_asis_importer()
for item in self.lib.items():
imported_file = os.path.join(item.path)
imported_file = MediaFile(imported_file)
assert imported_file.artist is None
assert imported_file.album is None
@_common.slow_test()
class NonAutotaggedImportTest(AsIsImporterMixin, ImportTestCase):
db_on_disk = True
@ -306,7 +260,7 @@ class ImportPasswordRarTest(ImportZipTest):
return os.path.join(_common.RSRC, b"password.rar")
class ImportSingletonTest(ImportTestCase):
class ImportSingletonTest(AutotagImportTestCase):
"""Test ``APPLY`` and ``ASIS`` choices for an import session with
singletons config set to True.
"""
@ -315,11 +269,6 @@ class ImportSingletonTest(ImportTestCase):
super().setUp()
self.prepare_album_for_import(1)
self.importer = self.setup_singleton_importer()
self.matcher = AutotagStub().install()
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_apply_asis_adds_track(self):
assert self.lib.items().get() is None
@ -432,19 +381,13 @@ class ImportSingletonTest(ImportTestCase):
assert item.disc == disc
class ImportTest(ImportTestCase):
class ImportTest(AutotagImportTestCase):
"""Test APPLY, ASIS and SKIP choices."""
def setUp(self):
super().setUp()
self.prepare_album_for_import(1)
self.setup_importer()
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.IDENT
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_apply_asis_adds_album(self):
assert self.lib.albums().get() is None
@ -639,18 +582,13 @@ class ImportTest(ImportTestCase):
assert item.disc == disc
class ImportTracksTest(ImportTestCase):
class ImportTracksTest(AutotagImportTestCase):
"""Test TRACKS and APPLY choice."""
def setUp(self):
super().setUp()
self.prepare_album_for_import(1)
self.setup_importer()
self.matcher = AutotagStub().install()
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_apply_tracks_adds_singleton_track(self):
assert self.lib.items().get() is None
@ -673,18 +611,13 @@ class ImportTracksTest(ImportTestCase):
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
class ImportCompilationTest(ImportTestCase):
class ImportCompilationTest(AutotagImportTestCase):
"""Test ASIS import of a folder containing tracks with different artists."""
def setUp(self):
super().setUp()
self.prepare_album_for_import(3)
self.setup_importer()
self.matcher = AutotagStub().install()
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_asis_homogenous_sets_albumartist(self):
self.importer.add_choice(importer.Action.ASIS)
@ -783,21 +716,16 @@ class ImportCompilationTest(ImportTestCase):
assert asserted_multi_artists_1
class ImportExistingTest(ImportTestCase):
class ImportExistingTest(AutotagImportTestCase):
"""Test importing files that are already in the library directory."""
def setUp(self):
super().setUp()
self.prepare_album_for_import(1)
self.matcher = AutotagStub().install()
self.reimporter = self.setup_importer(import_dir=self.libdir)
self.importer = self.setup_importer()
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_does_not_duplicate_item(self):
self.importer.run()
assert len(self.lib.items()) == 1
@ -904,12 +832,12 @@ class ImportExistingTest(ImportTestCase):
self.assertNotExists(self.import_media[0].path)
class GroupAlbumsImportTest(ImportTestCase):
class GroupAlbumsImportTest(AutotagImportTestCase):
matching = AutotagStub.NONE
def setUp(self):
super().setUp()
self.prepare_album_for_import(3)
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.NONE
self.setup_importer()
# Split tracks into two albums and use both as-is
@ -917,10 +845,6 @@ class GroupAlbumsImportTest(ImportTestCase):
self.importer.add_choice(importer.Action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_add_album_for_different_artist_and_different_album(self):
self.import_media[0].artist = "Artist B"
self.import_media[0].album = "Album B"
@ -976,17 +900,13 @@ class GlobalGroupAlbumsImportTest(GroupAlbumsImportTest):
config["import"]["group_albums"] = True
class ChooseCandidateTest(ImportTestCase):
class ChooseCandidateTest(AutotagImportTestCase):
matching = AutotagStub.BAD
def setUp(self):
super().setUp()
self.prepare_album_for_import(1)
self.setup_importer()
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.BAD
def tearDown(self):
super().tearDown()
self.matcher.restore()
def test_choose_first_candidate(self):
self.importer.add_choice(1)
@ -1094,26 +1014,22 @@ class InferAlbumDataTest(BeetsTestCase):
assert not self.items[0].comp
def match_album_mock(*args, **kwargs):
def album_candidates_mock(*args, **kwargs):
"""Create an AlbumInfo object for testing."""
track_info = TrackInfo(
title="new title",
track_id="trackid",
index=0,
)
album_info = AlbumInfo(
yield AlbumInfo(
artist="artist",
album="album",
tracks=[track_info],
tracks=[TrackInfo(title="new title", track_id="trackid", index=0)],
album_id="albumid",
artist_id="artistid",
flex="flex",
)
return iter([album_info])
@patch("beets.autotag.mb.match_album", Mock(side_effect=match_album_mock))
class ImportDuplicateAlbumTest(ImportTestCase):
@patch("beets.plugins.candidates", Mock(side_effect=album_candidates_mock))
class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase):
plugin = "musicbrainz"
def setUp(self):
super().setUp()
@ -1219,20 +1135,16 @@ class ImportDuplicateAlbumTest(ImportTestCase):
return album
def match_track_mock(*args, **kwargs):
return iter(
[
TrackInfo(
artist="artist",
title="title",
track_id="new trackid",
index=0,
)
]
def item_candidates_mock(*args, **kwargs):
yield TrackInfo(
artist="artist",
title="title",
track_id="new trackid",
index=0,
)
@patch("beets.autotag.mb.match_track", Mock(side_effect=match_track_mock))
@patch("beets.plugins.item_candidates", Mock(side_effect=item_candidates_mock))
class ImportDuplicateSingletonTest(ImportTestCase):
def setUp(self):
super().setUp()
@ -1566,7 +1478,7 @@ class MultiDiscAlbumsInDirTest(BeetsTestCase):
assert len(items) == 3
class ReimportTest(ImportTestCase):
class ReimportTest(AutotagImportTestCase):
"""Test "re-imports", in which the autotagging machinery is used for
music that's already in the library.
@ -1575,6 +1487,8 @@ class ReimportTest(ImportTestCase):
attributes and the added date.
"""
matching = AutotagStub.GOOD
def setUp(self):
super().setUp()
@ -1589,14 +1503,6 @@ class ReimportTest(ImportTestCase):
item.added = 4747.0
item.store()
# Set up an import pipeline with a "good" match.
self.matcher = AutotagStub().install()
self.matcher.matching = AutotagStub.GOOD
def tearDown(self):
super().tearDown()
self.matcher.restore()
def _setup_session(self, singletons=False):
self.setup_importer(import_dir=self.libdir, singletons=singletons)
self.importer.add_choice(importer.Action.APPLY)
@ -1672,27 +1578,22 @@ class ReimportTest(ImportTestCase):
def test_reimported_album_not_preserves_flexattr(self):
self._setup_session()
assert self._album().data_source == "original_source"
self.importer.run()
assert self._album().data_source == "match_source"
class ImportPretendTest(ImportTestCase):
class ImportPretendTest(AutotagImportTestCase):
"""Test the pretend commandline option"""
def setUp(self):
super().setUp()
self.matcher = AutotagStub().install()
self.io.install()
self.album_track_path = self.prepare_album_for_import(1)[0]
self.single_path = self.prepare_track_for_import(2, self.import_path)
self.album_path = self.album_track_path.parent
def tearDown(self):
super().tearDown()
self.matcher.restore()
def __run(self, importer):
with capture_log() as logs:
importer.run()
@ -1701,6 +1602,7 @@ class ImportPretendTest(ImportTestCase):
assert len(self.lib.albums()) == 0
return [line for line in logs if not line.startswith("Sending event:")]
assert self._album().data_source == "original_source"
def test_import_singletons_pretend(self):
assert self.__run(self.setup_singleton_importer(pretend=True)) == [
@ -1725,112 +1627,64 @@ class ImportPretendTest(ImportTestCase):
assert self.__run(importer) == [f"No files imported from {empty_path}"]
# Helpers for ImportMusicBrainzIdTest.
def mocked_get_album_by_id(id_):
"""Return album candidate for the given id.
def mocked_get_release_by_id(
id_, includes=[], release_status=[], release_type=[]
):
"""Mimic musicbrainzngs.get_release_by_id, accepting only a restricted list
of MB ids (ID_RELEASE_0, ID_RELEASE_1). The returned dict differs only in
the release title and artist name, so that ID_RELEASE_0 is a closer match
to the items created by ImportHelper.prepare_album_for_import()."""
The two albums differ only in the release title and artist name, so that
ID_RELEASE_0 is a closer match to the items created by
ImportHelper.prepare_album_for_import().
"""
# Map IDs to (release title, artist), so the distances are different.
releases = {
ImportMusicBrainzIdTest.ID_RELEASE_0: ("VALID_RELEASE_0", "TAG ARTIST"),
ImportMusicBrainzIdTest.ID_RELEASE_1: (
"VALID_RELEASE_1",
"DISTANT_MATCH",
),
}
album, artist = {
ImportIdTest.ID_RELEASE_0: ("VALID_RELEASE_0", "TAG ARTIST"),
ImportIdTest.ID_RELEASE_1: ("VALID_RELEASE_1", "DISTANT_MATCH"),
}[id_]
return {
"release": {
"title": releases[id_][0],
"id": id_,
"medium-list": [
{
"track-list": [
{
"id": "baz",
"recording": {
"title": "foo",
"id": "bar",
"length": 59,
},
"position": 9,
"number": "A2",
}
],
"position": 5,
}
],
"artist-credit": [
{
"artist": {
"name": releases[id_][1],
"id": "some-id",
},
}
],
"release-group": {
"id": "another-id",
},
"status": "Official",
}
}
return AlbumInfo(
album_id=id_,
album=album,
artist_id="some-id",
artist=artist,
albumstatus="Official",
tracks=[
TrackInfo(
track_id="bar",
title="foo",
artist_id="some-id",
artist=artist,
length=59,
index=9,
track_allt="A2",
)
],
)
def mocked_get_recording_by_id(
id_, includes=[], release_status=[], release_type=[]
):
"""Mimic musicbrainzngs.get_recording_by_id, accepting only a restricted
list of MB ids (ID_RECORDING_0, ID_RECORDING_1). The returned dict differs
only in the recording title and artist name, so that ID_RECORDING_0 is a
closer match to the items created by ImportHelper.prepare_album_for_import().
def mocked_get_track_by_id(id_):
"""Return track candidate for the given id.
The two tracks differ only in the release title and artist name, so that
ID_RELEASE_0 is a closer match to the items created by
ImportHelper.prepare_album_for_import().
"""
# Map IDs to (recording title, artist), so the distances are different.
releases = {
ImportMusicBrainzIdTest.ID_RECORDING_0: (
"VALID_RECORDING_0",
"TAG ARTIST",
),
ImportMusicBrainzIdTest.ID_RECORDING_1: (
"VALID_RECORDING_1",
"DISTANT_MATCH",
),
}
title, artist = {
ImportIdTest.ID_RECORDING_0: ("VALID_RECORDING_0", "TAG ARTIST"),
ImportIdTest.ID_RECORDING_1: ("VALID_RECORDING_1", "DISTANT_MATCH"),
}[id_]
return {
"recording": {
"title": releases[id_][0],
"id": id_,
"length": 59,
"artist-credit": [
{
"artist": {
"name": releases[id_][1],
"id": "some-id",
},
}
],
}
}
return TrackInfo(
track_id=id_,
title=title,
artist_id="some-id",
artist=artist,
length=59,
)
@patch(
"musicbrainzngs.get_recording_by_id",
Mock(side_effect=mocked_get_recording_by_id),
)
@patch(
"musicbrainzngs.get_release_by_id",
Mock(side_effect=mocked_get_release_by_id),
)
class ImportMusicBrainzIdTest(ImportTestCase):
"""Test the --musicbrainzid argument."""
MB_RELEASE_PREFIX = "https://musicbrainz.org/release/"
MB_RECORDING_PREFIX = "https://musicbrainz.org/recording/"
@patch("beets.plugins.track_for_id", Mock(side_effect=mocked_get_track_by_id))
@patch("beets.plugins.album_for_id", Mock(side_effect=mocked_get_album_by_id))
class ImportIdTest(ImportTestCase):
ID_RELEASE_0 = "00000000-0000-0000-0000-000000000000"
ID_RELEASE_1 = "11111111-1111-1111-1111-111111111111"
ID_RECORDING_0 = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
@ -1841,21 +1695,14 @@ class ImportMusicBrainzIdTest(ImportTestCase):
self.prepare_album_for_import(1)
def test_one_mbid_one_album(self):
self.setup_importer(
search_ids=[self.MB_RELEASE_PREFIX + self.ID_RELEASE_0]
)
self.setup_importer(search_ids=[self.ID_RELEASE_0])
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get().album == "VALID_RELEASE_0"
def test_several_mbid_one_album(self):
self.setup_importer(
search_ids=[
self.MB_RELEASE_PREFIX + self.ID_RELEASE_0,
self.MB_RELEASE_PREFIX + self.ID_RELEASE_1,
]
)
self.setup_importer(search_ids=[self.ID_RELEASE_0, self.ID_RELEASE_1])
self.importer.add_choice(2) # Pick the 2nd best match (release 1).
self.importer.add_choice(importer.Action.APPLY)
@ -1863,9 +1710,7 @@ class ImportMusicBrainzIdTest(ImportTestCase):
assert self.lib.albums().get().album == "VALID_RELEASE_1"
def test_one_mbid_one_singleton(self):
self.setup_singleton_importer(
search_ids=[self.MB_RECORDING_PREFIX + self.ID_RECORDING_0]
)
self.setup_singleton_importer(search_ids=[self.ID_RECORDING_0])
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
@ -1873,10 +1718,7 @@ class ImportMusicBrainzIdTest(ImportTestCase):
def test_several_mbid_one_singleton(self):
self.setup_singleton_importer(
search_ids=[
self.MB_RECORDING_PREFIX + self.ID_RECORDING_0,
self.MB_RECORDING_PREFIX + self.ID_RECORDING_1,
]
search_ids=[self.ID_RECORDING_0, self.ID_RECORDING_1]
)
self.importer.add_choice(2) # Pick the 2nd best match (recording 1).
@ -1889,11 +1731,7 @@ class ImportMusicBrainzIdTest(ImportTestCase):
task = importer.ImportTask(
paths=self.import_dir, toppath="top path", items=[_common.item()]
)
task.search_ids = [
self.MB_RELEASE_PREFIX + self.ID_RELEASE_0,
self.MB_RELEASE_PREFIX + self.ID_RELEASE_1,
"an invalid and discarded id",
]
task.search_ids = [self.ID_RELEASE_0, self.ID_RELEASE_1]
task.lookup_candidates()
assert {"VALID_RELEASE_0", "VALID_RELEASE_1"} == {
@ -1905,11 +1743,7 @@ class ImportMusicBrainzIdTest(ImportTestCase):
task = importer.SingletonImportTask(
toppath="top path", item=_common.item()
)
task.search_ids = [
self.MB_RECORDING_PREFIX + self.ID_RECORDING_0,
self.MB_RECORDING_PREFIX + self.ID_RECORDING_1,
"an invalid and discarded id",
]
task.search_ids = [self.ID_RECORDING_0, self.ID_RECORDING_1]
task.lookup_candidates()
assert {"VALID_RECORDING_0", "VALID_RECORDING_1"} == {

View file

@ -347,7 +347,8 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
def setUp(self):
super().setUp()
self.setup_importer()
self.matcher = AutotagStub().install()
self.matcher = AutotagStub(AutotagStub.IDENT).install()
self.addCleanup(self.matcher.restore)
# keep track of ui.input_option() calls
self.input_options_patcher = patch(
"beets.ui.input_options", side_effect=ui.input_options
@ -357,7 +358,6 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
def tearDown(self):
super().tearDown()
self.input_options_patcher.stop()
self.matcher.restore()
def test_plugin_choices_in_ui_input_options_album(self):
"""Test the presence of plugin choices on the prompt (album)."""