mirror of
https://github.com/beetbox/beets.git
synced 2025-12-24 01:25:47 +01:00
new proposal
This commit is contained in:
parent
00ec247515
commit
6f5d4d1328
2 changed files with 372 additions and 377 deletions
372
beetsplug/echonest.py
Normal file
372
beetsplug/echonest.py
Normal file
|
|
@ -0,0 +1,372 @@
|
|||
import time
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from beets import util, config, plugins, ui, library
|
||||
import pyechonest
|
||||
import pyechonest.song
|
||||
import pyechonest.track
|
||||
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
# If a request at the EchoNest fails, we want to retry the request RETRIES
|
||||
# times and wait between retries for RETRY_INTERVAL seconds.
|
||||
RETRIES = 10
|
||||
RETRY_INTERVAL = 10
|
||||
|
||||
# for converting files
|
||||
import os
|
||||
import tempfile
|
||||
from string import Template
|
||||
from subprocess import Popen
|
||||
DEVNULL = open(os.devnull, 'wb')
|
||||
|
||||
# The attributes we can import and where to store them
|
||||
# Note: We use echonest_id (song_id) and echonest_fingerprint to speed up
|
||||
# lookups. They are not listed as attributes here.
|
||||
ATTRIBUTES = {
|
||||
'energy' : 'echonest_energy',
|
||||
'liveness' : 'echonest_liveness',
|
||||
'speechiness' : 'echonest_speechiness',
|
||||
'acousticness' : 'echonest_acousticness',
|
||||
'danceability' : 'echonest_danceability',
|
||||
'valence' : 'echonest_valence',
|
||||
'tempo' : 'bpm',
|
||||
}
|
||||
|
||||
def _splitstrip(string):
|
||||
"""Split string at comma and return the stripped values as array."""
|
||||
return [ s.strip() for s in string.split(u',') ]
|
||||
|
||||
class EchonestMetadataPlugin(plugins.BeetsPlugin):
|
||||
_songs = {}
|
||||
_attributes = []
|
||||
_no_mapping = []
|
||||
|
||||
def __init__(self):
|
||||
super(EchonestMetadataPlugin, self).__init__()
|
||||
self.config.add({
|
||||
'auto' : True,
|
||||
'apikey' : u'NY2KTZHQ0QDSHBAP6',
|
||||
'codegen' : None,
|
||||
'upload' : True
|
||||
'convert' : True
|
||||
})
|
||||
for k, v in ATTRIBUTES.iteritems():
|
||||
self.config.add({k:v})
|
||||
|
||||
pyechonest.config.ECHO_NEST_API_KEY = \
|
||||
config['echonest']['apikey'].get(unicode)
|
||||
|
||||
if config['echonest']['codegen'].get() is not None:
|
||||
pyechonest.config.CODEGEN_BINARY_OVERRIDE = \
|
||||
config['echonest']['codegen'].get(unicode)
|
||||
|
||||
self.register_listener('import_task_start', self.fetch_song_task)
|
||||
self.register_listener('import_task_apply', self.apply_metadata_task)
|
||||
|
||||
def _echofun(self, func, **kwargs):
|
||||
"""Wrapper for requests to the EchoNest API. Will retry up to RETRIES
|
||||
times and wait between retries for RETRY_INTERVAL seconds.
|
||||
"""
|
||||
for i in range(RETRIES):
|
||||
try:
|
||||
result = func(**kwargs)
|
||||
except pyechonest.util.EchoNestAPIError as e:
|
||||
if e.code == 3:
|
||||
# reached access limit per minute
|
||||
time.sleep(RETRY_INTERVAL)
|
||||
elif e.code == 5:
|
||||
# specified identifier does not exist
|
||||
# no use in trying again.
|
||||
log.debug(u'echonest: {}'.format(e))
|
||||
return None
|
||||
else:
|
||||
log.error(u'echonest: {0}'.format(e.args[0][0]))
|
||||
return None
|
||||
except (pyechonest.util.EchoNestIOError, socket.error) as e:
|
||||
log.warn(u'echonest: IO error: {0}'.format(e))
|
||||
time.sleep(RETRY_INTERVAL)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
# If we exited the loop without breaking, then we used up all
|
||||
# our allotted retries.
|
||||
raise Exception(u'exceeded retries')
|
||||
return None
|
||||
return result
|
||||
|
||||
def fingerprint(self, item):
|
||||
"""Get the fingerprint for this item from the EchoNest. If we already
|
||||
have a fingerprint, return it and don't calculate it again.
|
||||
"""
|
||||
if item.get('echonest_fingerprint', None) is not None:
|
||||
try:
|
||||
code = self._echofun(pyechonest.util.codegen, filename=item.path)
|
||||
item['echonest_fingerprint'] = code[0]['code']
|
||||
item.write()
|
||||
except Exception as exc:
|
||||
log.error(u'echonest: fingerprinting failed: {0}'
|
||||
.format(str(exc)))
|
||||
return None
|
||||
return item.get('echonest_fingerprint')
|
||||
|
||||
def convert(self, item):
|
||||
"""Converts an item in an unsupported media format to ogg. Config
|
||||
pending.
|
||||
This is stolen from Jakob Schnitzers convert plugin.
|
||||
"""
|
||||
fd, dest = tempfile.mkstemp(u'.ogg')
|
||||
os.close(fd)
|
||||
source = item.path
|
||||
# FIXME: use avconv?
|
||||
command = u'ffmpeg -i $source -y -acodec libvorbis -vn -aq 2 $dest'.split(u' ')
|
||||
log.info(u'echonest: encoding {0} to {1}'
|
||||
.format(util.displayable_path(source),
|
||||
util.displayable_path(dest)))
|
||||
opts = []
|
||||
for arg in command:
|
||||
arg = arg.encode('utf-8')
|
||||
opts.append(Template(arg).substitute({
|
||||
'source': source,
|
||||
'dest': dest
|
||||
}))
|
||||
|
||||
try:
|
||||
encode = Popen(opts, close_fds=True, stderr=DEVNULL)
|
||||
encode.wait()
|
||||
except Exception as exc:
|
||||
log.error(u'echonest: encode failed: {0}'.format(str(exc)))
|
||||
util.remove(dest)
|
||||
util.prune_dirs(os.path.dirname(dest))
|
||||
return None
|
||||
|
||||
if encode.returncode != 0:
|
||||
log.info(u'echonest: encoding {0} failed ({1}). Cleaning up...'
|
||||
.format(util.displayable_path(source), encode.returncode))
|
||||
util.remove(dest)
|
||||
util.prune_dirs(os.path.dirname(dest))
|
||||
return None
|
||||
log.info(u'Finished encoding {0}'.format(util.displayable_path(source)))
|
||||
return dest
|
||||
|
||||
def analyze(self, item):
|
||||
"""Upload the item to the EchoNest for analysis. May require to
|
||||
convert the item to a supported media format.
|
||||
"""
|
||||
try:
|
||||
source = item.path
|
||||
if item.format.lower() not in ['wav', 'mp3', 'au', 'ogg', 'mp4', 'm4a']:
|
||||
if not config['echonest']['convert'].get(bool):
|
||||
raise Exception(u'format {} not supported for upload'
|
||||
.format(item.format))
|
||||
else:
|
||||
source = self.convert(item)
|
||||
if source is None:
|
||||
raise Exception(u'failed to convert file'
|
||||
.format(item.format))
|
||||
log.info(u'echonest: uploading file, be patient')
|
||||
track = self._echofun(pyechonest.track.track_from_filename,
|
||||
filename=source)
|
||||
if track is None:
|
||||
raise Exception(u'failed to upload file')
|
||||
|
||||
# Sometimes we have a track but no song. I guess this happens for
|
||||
# new / unverified songs. We need to 'extract' the audio_summary
|
||||
# from the track object 'manually'. I don't know why the
|
||||
# pyechonest API handles tracks (merge audio_summary to __dict__)
|
||||
# and songs (keep audio_summary in an extra attribute)
|
||||
# differently.
|
||||
# Maybe a patch for pyechonest could help?
|
||||
ids = []
|
||||
try:
|
||||
ids = [track.song_id]
|
||||
except Exception:
|
||||
result = {}
|
||||
result['energy'] = track.energy
|
||||
result['liveness'] = track.liveness
|
||||
result['speechiness'] = track.speechiness
|
||||
result['acousticness'] = track.acousticness
|
||||
result['danceability'] = track.danceability
|
||||
result['valence'] = track.valence
|
||||
result['tempo'] = track.tempo
|
||||
return result
|
||||
songs = self._echofun(pyechonest.song.profile,
|
||||
ids=ids, track_ids=[track.id],
|
||||
buckets=['audio_summary'])
|
||||
if songs is None:
|
||||
raise Exception(u'failed to retrieve info from upload')
|
||||
return self._pick_song(songs, item)
|
||||
except Exception as exc:
|
||||
log.error(u'echonest: analysis failed: {0}'.format(str(exc)))
|
||||
return None
|
||||
|
||||
def identify(self, item):
|
||||
"""Try to identify the song at the EchoNest.
|
||||
"""
|
||||
try:
|
||||
code = self.fingerprint(item)
|
||||
if code is None:
|
||||
raise Exception(u'can not identify without a fingerprint')
|
||||
songs = self._echofun(pyechonest.song.identify, code=code)
|
||||
if not songs:
|
||||
raise Exception(u'no songs found')
|
||||
return max(songs, key=lambda s: s.score)
|
||||
except Exception as exc:
|
||||
log.error(u'echonest: identification failed: {0}'.format(str(exc)))
|
||||
return None
|
||||
|
||||
def _pick_song(self, songs, item):
|
||||
"""Helper method to pick the best matching song from a list of songs
|
||||
returned by the EchoNest. Compares artist, title and duration. If
|
||||
the artist and title match and the duration difference is <= 1.0
|
||||
seconds, it's considered a match.
|
||||
"""
|
||||
pick = None
|
||||
if songs:
|
||||
min_dist = item.length
|
||||
for song in songs:
|
||||
if song.artist_name.lower() == item.artist.lower() \
|
||||
and song.title.lower() == item.title.lower():
|
||||
dist = abs(item.length - song.audio_summary['duration'])
|
||||
if dist < min_dist:
|
||||
min_dist = dist
|
||||
pick = song
|
||||
if min_dist > 1.0:
|
||||
return None
|
||||
return pick
|
||||
|
||||
def search(self, item):
|
||||
"""Search the item at the EchoNest by artist and title.
|
||||
"""
|
||||
try:
|
||||
songs = self._echofun(pyechonest.song.search, title=item.title,
|
||||
results=100, artist=item.artist,
|
||||
buckets=['id:musicbrainz', 'tracks'])
|
||||
pick = self._pick_song(songs, item)
|
||||
if pick is None:
|
||||
raise Exception(u'no (matching) songs found')
|
||||
return pick
|
||||
except Exception as exc:
|
||||
log.error(u'echonest: search failed: {0}'.format(str(exc)))
|
||||
return None
|
||||
|
||||
def profile(self, item):
|
||||
"""Do a lookup on the EchoNest by MusicBrainz ID.
|
||||
"""
|
||||
try:
|
||||
if item.get('echonest_id', None) is None:
|
||||
if not item.mb_trackid:
|
||||
raise Exception(u'musicbrainz ID not available')
|
||||
mbid = 'musicbrainz:track:{0}'.format(item.mb_trackid)
|
||||
track = self._echofun(pyechonest.track.track_from_id, identifier=mbid)
|
||||
if not track:
|
||||
raise Exception(u'could not get track from ID')
|
||||
ids = track.song_id
|
||||
else:
|
||||
ids = item.get('echonest_id')
|
||||
songs = self._echofun(pyechonest.song.profile, ids=ids,
|
||||
buckets=['id:musicbrainz', 'audio_summary'])
|
||||
if not songs:
|
||||
raise Exception(u'could not get songs from track ID')
|
||||
return self._pick_song(songs, item)
|
||||
except Exception as exc:
|
||||
log.debug(u'echonest: profile failed: {0}'.format(str(exc)))
|
||||
return None
|
||||
|
||||
def fetch_song(self, item):
|
||||
"""Try all methods, to get a matching song object from the EchoNest.
|
||||
"""
|
||||
methods = [self.profile, self.search, self.identify]
|
||||
if config['echonest']['codegen'].get() is not None:
|
||||
methods.append(self.identify)
|
||||
if config['echonest']['upload'].get(bool):
|
||||
methods.append(self.analyze)
|
||||
for method in methods:
|
||||
try:
|
||||
song = method(item)
|
||||
if not song is None:
|
||||
if isinstance(song, pyechonest.song.Song):
|
||||
log.debug(u'echonest: got song through {0}: {1} - {2} [{3}]'
|
||||
.format(method.im_func.func_name,
|
||||
item.artist, item.title,
|
||||
song.audio_summary['duration']))
|
||||
else: # it's our dict filled from a track object
|
||||
log.debug(u'echonest: got song through {0}: {1} - {2} [{3}]'
|
||||
.format(method.im_func.func_name,
|
||||
item.artist, item.title,
|
||||
song['duration']))
|
||||
return song
|
||||
except Exception as exc:
|
||||
log.debug(u'echonest: profile failed: {0}'.format(str(exc)))
|
||||
return None
|
||||
|
||||
def apply_metadata(self, item):
|
||||
"""Copy the metadata from the EchoNest to the item.
|
||||
"""
|
||||
if item.path in self._songs:
|
||||
# song can be a dict
|
||||
if isinstance(self._songs[item.path], pyechonest.song.Song):
|
||||
item.echonest_id = self._songs[item.path].id
|
||||
values = self._songs[item.path].audio_summary
|
||||
else:
|
||||
values = self._songs[item.path]
|
||||
for k, v in values.iteritems():
|
||||
if ATTRIBUTES.has_key(k) and ATTRIBUTES[k] is not None:
|
||||
log.debug(u'echonest: metadata: {0} = {1}'
|
||||
.format(ATTRIBUTES[k], v))
|
||||
item[ATTRIBUTES[k]] = v
|
||||
if config['import']['write'].get(bool):
|
||||
log.info(u'echonest: writing metadata: {0}'
|
||||
.format(util.displayable_path(item.path)))
|
||||
item.write()
|
||||
if item._lib:
|
||||
item.store()
|
||||
else:
|
||||
log.warn(u'echonest: no metadata available')
|
||||
|
||||
def requires_update(self, item):
|
||||
"""Check if this item requires an update from the EchoNest aka data is
|
||||
missing.
|
||||
"""
|
||||
for k, v in ATTRIBUTES.iteritems():
|
||||
if v is None:
|
||||
continue
|
||||
if item.get(v, None) is None:
|
||||
return True
|
||||
log.info(u'echonest: no update required')
|
||||
return False
|
||||
|
||||
def fetch_song_task(self, task, session):
|
||||
items = task.items if task.is_album else [task.item]
|
||||
for item in items:
|
||||
song = self.fetch_song(item)
|
||||
if not song is None:
|
||||
self._songs[item.path] = song
|
||||
|
||||
def apply_metadata_task(self, task, session):
|
||||
for item in task.imported_items():
|
||||
self.apply_metadata(item)
|
||||
|
||||
def commands(self):
|
||||
cmd = ui.Subcommand('echonest',
|
||||
help='Fetch metadata from the EchoNest')
|
||||
cmd.parser.add_option('-f', '--force', dest='force',
|
||||
action='store_true', default=False,
|
||||
help='(re-)download information from the EchoNest')
|
||||
|
||||
def func(lib, opts, args):
|
||||
self.config.set_args(opts)
|
||||
for item in lib.items(ui.decargs(args)):
|
||||
log.info(u'echonest: {0} - {1} [{2}]'.format(item.artist,
|
||||
item.title, item.length))
|
||||
if self.config['force'] or self.requires_update(item):
|
||||
song = self.fetch_song(item)
|
||||
if not song is None:
|
||||
self._songs[item.path] = song
|
||||
self.apply_metadata(item)
|
||||
|
||||
cmd.func = func
|
||||
return [cmd]
|
||||
|
||||
# eof
|
||||
|
|
@ -1,377 +0,0 @@
|
|||
# This file is part of beets.
|
||||
# Copyright 2013, Peter Schnebel <pschnebel.a.gmail>
|
||||
#
|
||||
# Original 'echonest_tempo' plugin is copyright 2013, David Brenner
|
||||
# <david.a.brenner gmail>
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
"""Gets additional information for imported music from the EchoNest API. Requires
|
||||
version >= 8.0.1 of the pyechonest library (https://github.com/echonest/pyechonest).
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
from beets.plugins import BeetsPlugin
|
||||
from beets import ui
|
||||
from beets import config
|
||||
import pyechonest.config
|
||||
import pyechonest.song
|
||||
import pyechonest.track
|
||||
import socket
|
||||
import math
|
||||
|
||||
# Global logger.
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
RETRY_INTERVAL = 10 # Seconds.
|
||||
RETRIES = 10
|
||||
ATTRIBUTES = ['energy', 'liveness', 'speechiness', 'acousticness',
|
||||
'danceability', 'valence', 'tempo', 'mood' ]
|
||||
MAPPED_ATTRIBUTES = ['energy', 'liveness', 'speechiness', 'acousticness',
|
||||
'danceability', 'valence', 'mood' ]
|
||||
|
||||
PI_2 = math.pi / 2.0
|
||||
MAX_LEN = math.sqrt(2.0 * 0.5 * 0.5)
|
||||
|
||||
def _picker(value, rang, mapping):
|
||||
inc = rang / len(mapping)
|
||||
i = 0.0
|
||||
for m in mapping:
|
||||
i += inc
|
||||
if value < i:
|
||||
return m
|
||||
return m # in case of floating point precision problems
|
||||
|
||||
def _mapping(mapstr):
|
||||
"""Split mapstr at comma and return the stripped values as array."""
|
||||
return [ m.strip() for m in mapstr.split(u',') ]
|
||||
|
||||
def _guess_mood(valence, energy):
|
||||
"""Based on the valence [0.0 .. 1.0]and energy [0.0 .. 1.0] of a song, we
|
||||
try to guess the mood.
|
||||
|
||||
For an explanation see:
|
||||
http://developer.echonest.com/forums/thread/1297
|
||||
|
||||
We use the Valence-Arousal space from here:
|
||||
http://mat.ucsb.edu/~ivana/200a/background.htm
|
||||
"""
|
||||
|
||||
# move center to 0.0/0.0
|
||||
valence -= 0.5
|
||||
energy -= 0.5
|
||||
|
||||
# we use the length of the valence / energy vector to determine the
|
||||
# strength of the emotion
|
||||
length = math.sqrt(valence * valence + energy * energy)
|
||||
|
||||
# FIXME: do we want the next 3 as config options?
|
||||
strength = [u'slightly', u'', u'very' ]
|
||||
# energy from -0.5 to 0.5, valence < 0.0
|
||||
low_valence = [
|
||||
u'fatigued', u'lethargic', u'depressed', u'sad',
|
||||
u'upset', u'stressed', u'nervous', u'tense' ]
|
||||
# energy from -0.5 to 0.5, valence >= 0.0
|
||||
high_valence = [
|
||||
u'calm', u'relaxed', u'serene', u'contented',
|
||||
u'happy', u'elated', u'excited', u'alert' ]
|
||||
if length == 0.0:
|
||||
# FIXME: what now? return a fallback? config?
|
||||
return u'neutral'
|
||||
|
||||
angle = math.asin(energy / length) + PI_2
|
||||
if valence < 0.0:
|
||||
moods = low_valence
|
||||
else:
|
||||
moods = high_valence
|
||||
mood = _picker(angle, math.pi, moods)
|
||||
strength = _picker(length, MAX_LEN, strength)
|
||||
if strength == u'':
|
||||
return mood
|
||||
return u'{} {}'.format(strength, mood)
|
||||
|
||||
def fetch_item_attributes(lib, item, write, force, reapply):
|
||||
"""Fetches audio_summary from the EchoNest and writes it to item.
|
||||
"""
|
||||
|
||||
log.debug(u'echoplus: {} - {} [{}] force:{} reapply:{}'.format(
|
||||
item.artist, item.title, item.length, force, reapply))
|
||||
# permanently store the raw values? not implemented yet
|
||||
store_raw = config['echoplus']['store_raw'].get(bool)
|
||||
|
||||
# if we want to set mood, we need to make sure, that valence and energy
|
||||
# are imported
|
||||
if config['echoplus']['mood'].get(str):
|
||||
if config['echoplus']['valence'].get(str) == '':
|
||||
log.warn(u'echoplus: "valence" is required to guess the mood')
|
||||
config['echoplus']['mood'].set('') # disable mood
|
||||
|
||||
if config['echoplus']['energy'].get(str) == '':
|
||||
log.warn(u'echoplus: "energy" is required to guess the mood')
|
||||
config['echoplus']['mood'].set('') # disable mood
|
||||
|
||||
# force implies reapply
|
||||
if force:
|
||||
reapply = True
|
||||
|
||||
allow_upload = config['echoplus']['upload'].get(bool)
|
||||
# the EchoNest only supports these file formats
|
||||
if allow_upload and \
|
||||
item.format.lower() not in ['wav', 'mp3', 'au', 'ogg', 'mp4', 'm4a']:
|
||||
log.warn(u'echoplus: format {} not supported for upload'.format(item.format))
|
||||
allow_upload = False
|
||||
|
||||
# Check if we need to update
|
||||
need_update = False
|
||||
if force:
|
||||
need_update = True
|
||||
else:
|
||||
need_update = False
|
||||
for attr in ATTRIBUTES:
|
||||
# do we want this attribute?
|
||||
target = config['echoplus'][attr].get(str)
|
||||
if target == '':
|
||||
continue
|
||||
|
||||
# check if the raw values are present. 'mood' has no direct raw
|
||||
# representation and 'tempo' is stored raw anyway
|
||||
if (store_raw or reapply) and not attr in ['mood', 'tempo']:
|
||||
target = '{}_raw'.format(target)
|
||||
|
||||
if item.get(target, None) is None:
|
||||
need_update = True
|
||||
break
|
||||
|
||||
if need_update:
|
||||
log.debug(u'echoplus: fetching data')
|
||||
reapply = True
|
||||
|
||||
# (re-)fetch audio_summary and store it to the raw values. if we do
|
||||
# not want to keep the raw values, we clean them up later
|
||||
|
||||
audio_summary = get_audio_summary(item.artist, item.title,
|
||||
item.length, allow_upload, item.path)
|
||||
changed = False
|
||||
if not audio_summary:
|
||||
return None
|
||||
else:
|
||||
for attr in ATTRIBUTES:
|
||||
if attr == 'mood': # no raw representation
|
||||
continue
|
||||
|
||||
# do we want this attribute?
|
||||
target = config['echoplus'][attr].get(str)
|
||||
if target == '':
|
||||
continue
|
||||
if attr != 'tempo':
|
||||
target = '{}_raw'.format(target)
|
||||
|
||||
if item.get(target, None) is not None and not force:
|
||||
log.info(u'{} already present: {} - {} = {:2.2f}'.format(
|
||||
attr, item.artist, item.title, item.get(target)))
|
||||
else:
|
||||
if not attr in audio_summary or audio_summary[attr] is None:
|
||||
log.info(u'{} not found: {} - {}'.format( attr,
|
||||
item.artist, item.title))
|
||||
else:
|
||||
value = float(audio_summary[attr])
|
||||
item[target] = float(audio_summary[attr])
|
||||
changed = True
|
||||
if reapply:
|
||||
log.debug(u'echoplus: (re-)applying data')
|
||||
global_mapping = _mapping(config['echoplus']['mapping'].get())
|
||||
for attr in ATTRIBUTES:
|
||||
# do we want this attribute?
|
||||
target = config['echoplus'][attr].get(str)
|
||||
if target == '':
|
||||
continue
|
||||
if attr == 'mood':
|
||||
# we validated above, that valence and energy are
|
||||
# included, so this should not fail
|
||||
valence = \
|
||||
float(item.get('{}_raw'.format(config['echoplus']['valence'].get(str))))
|
||||
energy = \
|
||||
float(item.get('{}_raw'.format(config['echoplus']['energy'].get(str))))
|
||||
item[target] = _guess_mood(valence, energy)
|
||||
log.debug(u'echoplus: mapped {}: {:2.2f}x{:2.2f} = {}'.format(
|
||||
attr, valence, energy, item[target]))
|
||||
changed = True
|
||||
elif attr in MAPPED_ATTRIBUTES:
|
||||
mapping = global_mapping
|
||||
map_str = config['echoplus']['{}_mapping'.format(attr)].get()
|
||||
if map_str is not None:
|
||||
mapping = _mapping(map_str)
|
||||
value = float(item.get('{}_raw'.format(target)))
|
||||
mapped_value = _picker(value, 1.0, mapping)
|
||||
log.debug(u'echoplus: mapped {}: {:2.2f} > {}'.format(
|
||||
attr, value, mapped_value))
|
||||
item[attr] = mapped_value
|
||||
changed = True
|
||||
|
||||
if changed:
|
||||
if write:
|
||||
item.write()
|
||||
item.store()
|
||||
|
||||
def _echonest_fun(function, **kwargs):
|
||||
for i in range(RETRIES):
|
||||
try:
|
||||
# Unfortunately, all we can do is search by artist and title.
|
||||
# EchoNest supports foreign ids from MusicBrainz, but currently
|
||||
# only for artists, not individual tracks/recordings.
|
||||
results = function(**kwargs)
|
||||
except pyechonest.util.EchoNestAPIError as e:
|
||||
if e.code == 3:
|
||||
# Wait and try again.
|
||||
time.sleep(RETRY_INTERVAL)
|
||||
else:
|
||||
log.warn(u'echoplus: {0}'.format(e.args[0][0]))
|
||||
return None
|
||||
except (pyechonest.util.EchoNestIOError, socket.error) as e:
|
||||
log.debug(u'echoplus: IO error: {0}'.format(e))
|
||||
time.sleep(RETRY_INTERVAL)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
# If we exited the loop without breaking, then we used up all
|
||||
# our allotted retries.
|
||||
log.debug(u'echoplus: exceeded retries')
|
||||
return None
|
||||
return results
|
||||
|
||||
def get_audio_summary(artist, title, duration, upload, path):
|
||||
"""Get the attribute for a song."""
|
||||
# We must have sufficient metadata for the lookup. Otherwise the API
|
||||
# will just complain.
|
||||
artist = artist.replace(u'\n', u' ').strip().lower()
|
||||
title = title.replace(u'\n', u' ').strip().lower()
|
||||
if not artist or not title:
|
||||
return None
|
||||
|
||||
results = _echonest_fun(pyechonest.song.search,
|
||||
artist=artist, title=title, results=100,
|
||||
buckets=['audio_summary'])
|
||||
pick = None
|
||||
min_distance = duration
|
||||
if results:
|
||||
# The Echo Nest API can return songs that are not perfect matches.
|
||||
# So we look through the results for songs that have the right
|
||||
# artist and title. The API also doesn't have MusicBrainz track IDs;
|
||||
# otherwise we could use those for a more robust match.
|
||||
for result in results:
|
||||
if result.artist_name.lower() == artist \
|
||||
and result.title.lower() == title:
|
||||
distance = abs(duration - result.audio_summary['duration'])
|
||||
log.debug(
|
||||
u'echoplus: candidate {} - {} [dist({:2.2f})={:2.2f}]'.format(
|
||||
result.artist_name, result.title,
|
||||
result.audio_summary['duration'], distance))
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
pick = result
|
||||
if pick:
|
||||
log.debug(
|
||||
u'echoplus: picked {} - {} [dist({:2.2f}-{:2.2f})={:2.2f}]'.format(
|
||||
pick.artist_name, pick.title,
|
||||
pick.audio_summary['duration'], duration, min_distance))
|
||||
|
||||
if (not pick or min_distance > 1.0) and upload:
|
||||
log.debug(u'echoplus: uploading file "{}" to EchoNest'.format(path))
|
||||
# FIXME: same loop as above... make this better
|
||||
t = _echonest_fun(pyechonest.track.track_from_filename, filename=path)
|
||||
if t:
|
||||
log.debug(u'echoplus: track {} - {} [{:2.2f}]'.format(t.artist, t.title,
|
||||
t.duration))
|
||||
# FIXME: maybe make pyechonest "nicer"?
|
||||
result = {}
|
||||
result['energy'] = t.energy
|
||||
result['liveness'] = t.liveness
|
||||
result['speechiness'] = t.speechiness
|
||||
result['acousticness'] = t.acousticness
|
||||
result['danceability'] = t.danceability
|
||||
result['valence'] = t.valence
|
||||
result['tempo'] = t.tempo
|
||||
return result
|
||||
else:
|
||||
return None
|
||||
elif not pick:
|
||||
return None
|
||||
return pick.audio_summary
|
||||
|
||||
|
||||
class EchoPlusPlugin(BeetsPlugin):
|
||||
def __init__(self):
|
||||
super(EchoPlusPlugin, self).__init__()
|
||||
self.import_stages = [self.imported]
|
||||
self.config.add({
|
||||
'apikey': u'NY2KTZHQ0QDSHBAP6',
|
||||
'auto': True,
|
||||
'mapping': 'very low,low,neutral,high,very high',
|
||||
'energy_mapping': None,
|
||||
'liveness_mapping': 'studio,probably studio,probably live,live',
|
||||
'speechiness_mapping': 'singing,unsure,talking',
|
||||
'acousticness_mapping': 'artificial,probably artifical,probably natural,natural',
|
||||
'danceability_mapping': 'bed,couch,unsure,party,disco',
|
||||
'valence_mapping': None,
|
||||
'store_raw': True,
|
||||
'upload': False,
|
||||
})
|
||||
for attr in ATTRIBUTES:
|
||||
if attr == 'tempo':
|
||||
target = '' # disabled to not conflict with echonest_tempo,
|
||||
# to enable, set it to 'bpm'
|
||||
self.config.add({attr:target})
|
||||
else:
|
||||
target = attr
|
||||
self.config.add({attr:target})
|
||||
|
||||
pyechonest.config.ECHO_NEST_API_KEY = \
|
||||
self.config['apikey'].get(unicode)
|
||||
|
||||
def commands(self):
|
||||
cmd = ui.Subcommand('echoplus',
|
||||
help='fetch additional song information from the echonest')
|
||||
cmd.parser.add_option('-f', '--force', dest='force',
|
||||
action='store_true', default=False,
|
||||
help='re-download information from the EchoNest')
|
||||
cmd.parser.add_option('-r', '--reapply', dest='reapply',
|
||||
action='store_true', default=False,
|
||||
help='reapply mappings')
|
||||
def func(lib, opts, args):
|
||||
# The "write to files" option corresponds to the
|
||||
# import_write config value.
|
||||
write = config['import']['write'].get(bool)
|
||||
self.config.set_args(opts)
|
||||
|
||||
for item in lib.items(ui.decargs(args)):
|
||||
log.debug(u'{} {}'.format(
|
||||
self.config['force'],
|
||||
self.config['reapply']))
|
||||
fetch_item_attributes(lib, item, write,
|
||||
self.config['force'],
|
||||
self.config['reapply'])
|
||||
cmd.func = func
|
||||
return [cmd]
|
||||
|
||||
# Auto-fetch info on import.
|
||||
def imported(self, session, task):
|
||||
if self.config['auto']:
|
||||
if task.is_album:
|
||||
album = session.lib.get_album(task.album_id)
|
||||
for item in album.items():
|
||||
fetch_item_attributes(session.lib, item, False, True,
|
||||
True)
|
||||
else:
|
||||
item = task.item
|
||||
fetch_item_attributes(session.lib, item, False, True, True)
|
||||
|
||||
# eof
|
||||
Loading…
Reference in a new issue