beets/beetsplug/chroma.py
2012-04-01 20:41:27 -07:00

248 lines
8.2 KiB
Python

# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Adds Chromaprint/Acoustid acoustic fingerprinting support to the
autotagger. Requires the pyacoustid library.
"""
from __future__ import with_statement
from beets import plugins
from beets import ui
from beets import util
from beets.autotag import hooks
import acoustid
import logging
from collections import defaultdict
API_KEY = '1vOwZtEn'
SCORE_THRESH = 0.5
TRACK_ID_WEIGHT = 10.0
COMMON_REL_THRESH = 0.6 # How many tracks must have an album in common?
log = logging.getLogger('beets')
# Stores the Acoustid match information for each track. This is
# populated when an import task begins and then used when searching for
# candidates. It maps audio file paths to (recording_id, release_ids)
# pairs. If a given path is not present in the mapping, then no match
# was found.
_matches = {}
# Stores the fingerprint and Acoustid ID for each track. This is stored
# as metadata for each track for later use but is not relevant for
# autotagging.
_fingerprints = {}
_acoustids = {}
# The user's Acoustid API key, if provided.
_userkey = None
def acoustid_match(path):
"""Gets metadata for a file from Acoustid and populates the
_matches, _fingerprints, and _acoustids dictionaries accordingly.
"""
try:
duration, fp = acoustid.fingerprint_file(path)
except acoustid.FingerprintGenerationError, exc:
log.error('fingerprinting of %s failed: %s' %
(repr(path), str(exc)))
return None
_fingerprints[path] = fp
try:
res = acoustid.lookup(API_KEY, fp, duration,
meta='recordings releases')
except acoustid.AcoustidError, exc:
log.debug('fingerprint matching %s failed: %s' %
(repr(path), str(exc)))
return None
log.debug('chroma: fingerprinted %s' % repr(path))
# Ensure the response is usable and parse it.
if res['status'] != 'ok' or not res.get('results'):
log.debug('chroma: no match found')
return None
result = res['results'][0]
if result['score'] < SCORE_THRESH:
log.debug('chroma: no results above threshold')
return None
_acoustids[path] = result['id']
# Get recordings from the result.
if not result.get('recordings'):
log.debug('chroma: no recordings found')
return None
recording = result['recordings'][0]
recording_id = recording['id']
if 'releases' in recording:
release_ids = [rel['id'] for rel in recording['releases']]
else:
release_ids = []
log.debug('chroma: matched recording {}'.format(recording_id))
_matches[path] = recording_id, release_ids
# Plugin structure and autotagging logic.
def _all_releases(items):
"""Given an iterable of Items, determines (according to Acoustid)
which releases the items have in common. Generates release IDs.
"""
# Count the number of "hits" for each release.
relcounts = defaultdict(int)
for item in items:
if item.path not in _matches:
continue
_, release_ids = _matches[item.path]
for release_id in release_ids:
relcounts[release_id] += 1
for release_id, count in relcounts.iteritems():
if float(count) / len(items) > COMMON_REL_THRESH:
yield release_id
class AcoustidPlugin(plugins.BeetsPlugin):
def track_distance(self, item, info):
if item.path not in _matches:
# Match failed.
return 0.0, 0.0
recording_id, _ = _matches[item.path]
if info.track_id == recording_id:
dist = 0.0
else:
dist = TRACK_ID_WEIGHT
return dist, TRACK_ID_WEIGHT
def candidates(self, items):
albums = []
for relid in _all_releases(items):
album = hooks._album_for_id(relid)
if album:
albums.append(album)
log.debug('acoustid album candidates: %i' % len(albums))
return albums
def item_candidates(self, item):
if item.path not in _matches:
return 0.0, 0.0
recording_id, _ = _matches[item.path]
track = hooks._track_for_id(recording_id)
if track:
log.debug('found acoustid item candidate')
return [track]
else:
log.debug('no acoustid item candidate found')
return []
def configure(self, config):
global _userkey
_userkey = ui.config_val(config, 'acoustid', 'apikey', None)
def commands(self):
submit_cmd = ui.Subcommand('submit',
help='submit Acoustid fingerprints')
def submit_cmd_func(lib, config, opts, args):
if not _userkey:
raise ui.UserError('no Acoustid user API key provided')
submit_items(_userkey, lib.items(ui.decargs(args)))
submit_cmd.func = submit_cmd_func
return [submit_cmd]
# Hooks into import process.
@AcoustidPlugin.listen('import_task_start')
def fingerprint_task(config=None, task=None):
"""Fingerprint each item in the task for later use during the
autotagging candidate search.
"""
for item in task.all_items():
acoustid_match(item.path)
@AcoustidPlugin.listen('import_task_apply')
def apply_acoustid_metadata(config=None, task=None):
"""Apply Acoustid metadata (fingerprint and ID) to the task's items.
"""
for item in task.all_items():
if item.path in _fingerprints:
item.acoustid_fingerprint = _fingerprints[item.path]
if item.path in _acoustids:
item.acoustid_id = _acoustids[item.path]
# UI commands.
def submit_items(userkey, items, chunksize=64):
"""Submit fingerprints for the items to the Acoustid server.
"""
data = [] # The running list of dictionaries to submit.
def submit_chunk():
"""Submit the current accumulated fingerprint data."""
log.info('submitting {} fingerprints'.format(len(data)))
acoustid.submit(API_KEY, userkey, data)
del data[:]
for item in items:
# Get a fingerprint and length for this track.
if not item.length:
log.info(u'{}: no duration available'.format(
util.displayable_path(item.path)
))
continue
elif item.acoustid_fingerprint:
log.info(u'{}: using existing fingerprint'.format(
util.displayable_path(item.path)
))
fp = item.acoustid_fingerprint
else:
log.info(u'{}: fingerprinting'.format(
util.displayable_path(item.path)
))
try:
_, fp = acoustid.fingerprint_file(item.path)
except acoustid.FingerprintGenerationError, exc:
log.info('fingerprint generation failed')
continue
# Construct a submission dictionary for this item.
item_data = {
'duration': int(item.length),
'fingerprint': fp,
}
if item.mb_trackid:
item_data['mbid'] = item.mb_trackid
log.debug('submitting MBID')
else:
item_data.update({
'track': item.title,
'artist': item.artist,
'album': item.album,
'albumartist': item.albumartist,
'year': item.year,
'trackno': item.track,
'discno': item.disc,
})
log.debug('submitting textual metadata')
data.append(item_data)
# If we have enough data, submit a chunk.
if len(data) > chunksize:
submit_chunk()
# Submit remaining data in a final chunk.
submit_chunk()