mirror of
https://github.com/beetbox/beets.git
synced 2025-12-06 00:24:25 +01:00
365 lines
12 KiB
Python
365 lines
12 KiB
Python
# This file is part of beets.
|
|
# Copyright 2016, Adrian Sampson.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
|
|
"""Adds Chromaprint/Acoustid acoustic fingerprinting support to the
|
|
autotagger. Requires the pyacoustid library.
|
|
"""
|
|
|
|
import re
|
|
from collections import defaultdict
|
|
from collections.abc import Iterable
|
|
from functools import cached_property, partial
|
|
|
|
import acoustid
|
|
import confuse
|
|
|
|
from beets import config, ui, util
|
|
from beets.autotag.distance import Distance
|
|
from beets.autotag.hooks import TrackInfo
|
|
from beets.metadata_plugins import MetadataSourcePlugin
|
|
from beetsplug.musicbrainz import MusicBrainzPlugin
|
|
|
|
API_KEY = "1vOwZtEn"
|
|
SCORE_THRESH = 0.5
|
|
TRACK_ID_WEIGHT = 10.0
|
|
COMMON_REL_THRESH = 0.6 # How many tracks must have an album in common?
|
|
MAX_RECORDINGS = 5
|
|
MAX_RELEASES = 5
|
|
|
|
# Stores the Acoustid match information for each track. This is
|
|
# populated when an import task begins and then used when searching for
|
|
# candidates. It maps audio file paths to (recording_ids, release_ids)
|
|
# pairs. If a given path is not present in the mapping, then no match
|
|
# was found.
|
|
_matches = {}
|
|
|
|
# Stores the fingerprint and Acoustid ID for each track. This is stored
|
|
# as metadata for each track for later use but is not relevant for
|
|
# autotagging.
|
|
_fingerprints = {}
|
|
_acoustids = {}
|
|
|
|
|
|
def prefix(it, count):
|
|
"""Truncate an iterable to at most `count` items."""
|
|
for i, v in enumerate(it):
|
|
if i >= count:
|
|
break
|
|
yield v
|
|
|
|
|
|
def releases_key(release, countries, original_year):
|
|
"""Used as a key to sort releases by date then preferred country"""
|
|
date = release.get("date")
|
|
if date and original_year:
|
|
year = date.get("year", 9999)
|
|
month = date.get("month", 99)
|
|
day = date.get("day", 99)
|
|
else:
|
|
year = 9999
|
|
month = 99
|
|
day = 99
|
|
|
|
# Uses index of preferred countries to sort
|
|
country_key = 99
|
|
if release.get("country"):
|
|
for i, country in enumerate(countries):
|
|
if country.match(release["country"]):
|
|
country_key = i
|
|
break
|
|
|
|
return (year, month, day, country_key)
|
|
|
|
|
|
def acoustid_match(log, path):
|
|
"""Gets metadata for a file from Acoustid and populates the
|
|
_matches, _fingerprints, and _acoustids dictionaries accordingly.
|
|
"""
|
|
try:
|
|
duration, fp = acoustid.fingerprint_file(util.syspath(path))
|
|
except acoustid.FingerprintGenerationError as exc:
|
|
log.error(
|
|
"fingerprinting of {} failed: {}",
|
|
util.displayable_path(repr(path)),
|
|
exc,
|
|
)
|
|
return None
|
|
fp = fp.decode()
|
|
_fingerprints[path] = fp
|
|
try:
|
|
res = acoustid.lookup(
|
|
API_KEY, fp, duration, meta="recordings releases", timeout=10
|
|
)
|
|
except acoustid.AcoustidError as exc:
|
|
log.debug(
|
|
"fingerprint matching {} failed: {}",
|
|
util.displayable_path(repr(path)),
|
|
exc,
|
|
)
|
|
return None
|
|
log.debug("chroma: fingerprinted {}", util.displayable_path(repr(path)))
|
|
|
|
# Ensure the response is usable and parse it.
|
|
if res["status"] != "ok" or not res.get("results"):
|
|
log.debug("no match found")
|
|
return None
|
|
result = res["results"][0] # Best match.
|
|
if result["score"] < SCORE_THRESH:
|
|
log.debug("no results above threshold")
|
|
return None
|
|
_acoustids[path] = result["id"]
|
|
|
|
# Get recording and releases from the result
|
|
if not result.get("recordings"):
|
|
log.debug("no recordings found")
|
|
return None
|
|
recording_ids = []
|
|
releases = []
|
|
for recording in result["recordings"]:
|
|
recording_ids.append(recording["id"])
|
|
if "releases" in recording:
|
|
releases.extend(recording["releases"])
|
|
|
|
# The releases list is essentially in random order from the Acoustid lookup
|
|
# so we optionally sort it using the match.preferred configuration options.
|
|
# 'original_year' to sort the earliest first and
|
|
# 'countries' to then sort preferred countries first.
|
|
country_patterns = config["match"]["preferred"]["countries"].as_str_seq()
|
|
countries = [re.compile(pat, re.I) for pat in country_patterns]
|
|
original_year = config["match"]["preferred"]["original_year"]
|
|
releases.sort(
|
|
key=partial(
|
|
releases_key, countries=countries, original_year=original_year
|
|
)
|
|
)
|
|
release_ids = [rel["id"] for rel in releases]
|
|
|
|
log.debug(
|
|
"matched recordings {} on releases {}", recording_ids, release_ids
|
|
)
|
|
_matches[path] = recording_ids, release_ids
|
|
|
|
|
|
# Plugin structure and autotagging logic.
|
|
|
|
|
|
def _all_releases(items):
|
|
"""Given an iterable of Items, determines (according to Acoustid)
|
|
which releases the items have in common. Generates release IDs.
|
|
"""
|
|
# Count the number of "hits" for each release.
|
|
relcounts = defaultdict(int)
|
|
for item in items:
|
|
if item.path not in _matches:
|
|
continue
|
|
|
|
_, release_ids = _matches[item.path]
|
|
for release_id in release_ids:
|
|
relcounts[release_id] += 1
|
|
|
|
for release_id, count in relcounts.items():
|
|
if float(count) / len(items) > COMMON_REL_THRESH:
|
|
yield release_id
|
|
|
|
|
|
class AcoustidPlugin(MetadataSourcePlugin):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.config.add(
|
|
{
|
|
"auto": True,
|
|
}
|
|
)
|
|
config["acoustid"]["apikey"].redact = True
|
|
|
|
if self.config["auto"]:
|
|
self.register_listener("import_task_start", self.fingerprint_task)
|
|
self.register_listener("import_task_apply", apply_acoustid_metadata)
|
|
|
|
@cached_property
|
|
def mb(self) -> MusicBrainzPlugin:
|
|
return MusicBrainzPlugin()
|
|
|
|
def fingerprint_task(self, task, session):
|
|
return fingerprint_task(self._log, task, session)
|
|
|
|
def track_distance(self, item, info):
|
|
dist = Distance()
|
|
if item.path not in _matches or not info.track_id:
|
|
# Match failed or no track ID.
|
|
return dist
|
|
|
|
recording_ids, _ = _matches[item.path]
|
|
dist.add_expr("track_id", info.track_id not in recording_ids)
|
|
return dist
|
|
|
|
def candidates(self, items, artist, album, va_likely):
|
|
albums = []
|
|
for relid in prefix(_all_releases(items), MAX_RELEASES):
|
|
album = self.mb.album_for_id(relid)
|
|
if album:
|
|
albums.append(album)
|
|
|
|
self._log.debug("acoustid album candidates: {}", len(albums))
|
|
return albums
|
|
|
|
def item_candidates(self, item, artist, title) -> Iterable[TrackInfo]:
|
|
if item.path not in _matches:
|
|
return []
|
|
|
|
recording_ids, _ = _matches[item.path]
|
|
tracks = []
|
|
for recording_id in prefix(recording_ids, MAX_RECORDINGS):
|
|
track = self.mb.track_for_id(recording_id)
|
|
if track:
|
|
tracks.append(track)
|
|
self._log.debug("acoustid item candidates: {}", len(tracks))
|
|
return tracks
|
|
|
|
def album_for_id(self, *args, **kwargs):
|
|
# Lookup by fingerprint ID does not make too much sense.
|
|
return None
|
|
|
|
def track_for_id(self, *args, **kwargs):
|
|
# Lookup by fingerprint ID does not make too much sense.
|
|
return None
|
|
|
|
def commands(self):
|
|
submit_cmd = ui.Subcommand(
|
|
"submit", help="submit Acoustid fingerprints"
|
|
)
|
|
|
|
def submit_cmd_func(lib, opts, args):
|
|
try:
|
|
apikey = config["acoustid"]["apikey"].as_str()
|
|
except confuse.NotFoundError:
|
|
raise ui.UserError("no Acoustid user API key provided")
|
|
submit_items(self._log, apikey, lib.items(args))
|
|
|
|
submit_cmd.func = submit_cmd_func
|
|
|
|
fingerprint_cmd = ui.Subcommand(
|
|
"fingerprint", help="generate fingerprints for items without them"
|
|
)
|
|
|
|
def fingerprint_cmd_func(lib, opts, args):
|
|
for item in lib.items(args):
|
|
fingerprint_item(self._log, item, write=ui.should_write())
|
|
|
|
fingerprint_cmd.func = fingerprint_cmd_func
|
|
|
|
return [submit_cmd, fingerprint_cmd]
|
|
|
|
|
|
# Hooks into import process.
|
|
|
|
|
|
def fingerprint_task(log, task, session):
|
|
"""Fingerprint each item in the task for later use during the
|
|
autotagging candidate search.
|
|
"""
|
|
items = task.items if task.is_album else [task.item]
|
|
for item in items:
|
|
acoustid_match(log, item.path)
|
|
|
|
|
|
def apply_acoustid_metadata(task, session):
|
|
"""Apply Acoustid metadata (fingerprint and ID) to the task's items."""
|
|
for item in task.imported_items():
|
|
if item.path in _fingerprints:
|
|
item.acoustid_fingerprint = _fingerprints[item.path]
|
|
if item.path in _acoustids:
|
|
item.acoustid_id = _acoustids[item.path]
|
|
|
|
|
|
# UI commands.
|
|
|
|
|
|
def submit_items(log, userkey, items, chunksize=64):
|
|
"""Submit fingerprints for the items to the Acoustid server."""
|
|
data = [] # The running list of dictionaries to submit.
|
|
|
|
def submit_chunk():
|
|
"""Submit the current accumulated fingerprint data."""
|
|
log.info("submitting {} fingerprints", len(data))
|
|
try:
|
|
acoustid.submit(API_KEY, userkey, data, timeout=10)
|
|
except acoustid.AcoustidError as exc:
|
|
log.warning("acoustid submission error: {}", exc)
|
|
del data[:]
|
|
|
|
for item in items:
|
|
fp = fingerprint_item(log, item, write=ui.should_write())
|
|
|
|
# Construct a submission dictionary for this item.
|
|
item_data = {
|
|
"duration": int(item.length),
|
|
"fingerprint": fp,
|
|
}
|
|
if item.mb_trackid:
|
|
item_data["mbid"] = item.mb_trackid
|
|
log.debug("submitting MBID")
|
|
else:
|
|
item_data.update(
|
|
{
|
|
"track": item.title,
|
|
"artist": item.artist,
|
|
"album": item.album,
|
|
"albumartist": item.albumartist,
|
|
"year": item.year,
|
|
"trackno": item.track,
|
|
"discno": item.disc,
|
|
}
|
|
)
|
|
log.debug("submitting textual metadata")
|
|
data.append(item_data)
|
|
|
|
# If we have enough data, submit a chunk.
|
|
if len(data) >= chunksize:
|
|
submit_chunk()
|
|
|
|
# Submit remaining data in a final chunk.
|
|
if data:
|
|
submit_chunk()
|
|
|
|
|
|
def fingerprint_item(log, item, write=False):
|
|
"""Get the fingerprint for an Item. If the item already has a
|
|
fingerprint, it is not regenerated. If fingerprint generation fails,
|
|
return None. If the items are associated with a library, they are
|
|
saved to the database. If `write` is set, then the new fingerprints
|
|
are also written to files' metadata.
|
|
"""
|
|
# Get a fingerprint and length for this track.
|
|
if not item.length:
|
|
log.info("{.filepath}: no duration available", item)
|
|
elif item.acoustid_fingerprint:
|
|
if write:
|
|
log.info("{.filepath}: fingerprint exists, skipping", item)
|
|
else:
|
|
log.info("{.filepath}: using existing fingerprint", item)
|
|
return item.acoustid_fingerprint
|
|
else:
|
|
log.info("{.filepath}: fingerprinting", item)
|
|
try:
|
|
_, fp = acoustid.fingerprint_file(util.syspath(item.path))
|
|
item.acoustid_fingerprint = fp.decode()
|
|
if write:
|
|
log.info("{.filepath}: writing fingerprint", item)
|
|
item.try_write()
|
|
if item._db:
|
|
item.store()
|
|
return item.acoustid_fingerprint
|
|
except acoustid.FingerprintGenerationError as exc:
|
|
log.info("fingerprint generation failed: {}", exc)
|