echo nest: top-level code review pass

General Python/beets style changes:
- "x is None" can generally be replaced with "not x"
- prefer iteration over repetitive lines
- config[foo].get(bool) can be replaced with config[foo] in conditions
- removed extraneous "return None", which is what Python functions do
  automatically if you don't return anything
- 79-column wrap
This commit is contained in:
Adrian Sampson 2013-11-26 15:20:18 -08:00
parent 8f54b2ef3d
commit 8fe254f086

View file

@ -1,10 +1,28 @@
# This file is part of beets.
# Copyright 2013, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Fetch a variety of acoustic metrics from The Echo Nest.
"""
import time
import logging
import socket
import math
import os
import tempfile
from string import Template
from subprocess import Popen
from beets import util, config, plugins, ui, library
from beets import util, config, plugins, ui
import pyechonest
import pyechonest.song
import pyechonest.track
@ -16,29 +34,27 @@ log = logging.getLogger('beets')
RETRIES = 10
RETRY_INTERVAL = 10
# for converting files
import os
import tempfile
from string import Template
from subprocess import Popen
DEVNULL = open(os.devnull, 'wb')
ALLOWED_FORMATS = ('MP3', 'OGG', 'AAC')
# The attributes we can import and where to store them
# Note: We use echonest_id (song_id) and echonest_fingerprint to speed up
# lookups. They are not listed as attributes here.
# The attributes we can import and where to store them in beets fields.
# Note: We also use echonest_id (song_id) and echonest_fingerprint to speed up
# lookups. They are not listed as attributes here.
ATTRIBUTES = {
'energy' : 'energy',
'liveness' : 'liveness',
'speechiness' : 'speechiness',
'acousticness' : 'acousticness',
'danceability' : 'danceability',
'valence' : 'valence',
'tempo' : 'bpm',
}
'energy': 'energy',
'liveness': 'liveness',
'speechiness': 'speechiness',
'acousticness': 'acousticness',
'danceability': 'danceability',
'valence': 'valence',
'tempo': 'bpm',
}
def _splitstrip(string):
"""Split string at comma and return the stripped values as array."""
return [ s.strip() for s in string.split(u',') ]
def _splitstrip(string, delim=u','):
"""Split string (at commas by default) and strip whitespace from the
pieces.
"""
return [s.strip() for s in string.split(delim)]
class EchonestMetadataPlugin(plugins.BeetsPlugin):
_songs = {}
@ -48,29 +64,30 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
def __init__(self):
super(EchonestMetadataPlugin, self).__init__()
self.config.add({
'auto' : True,
'apikey' : u'NY2KTZHQ0QDSHBAP6',
'codegen' : None,
'upload' : True,
'convert' : True,
})
for k, v in ATTRIBUTES.iteritems():
self.config.add({k:v})
'auto': True,
'apikey': u'NY2KTZHQ0QDSHBAP6',
'codegen': None,
'upload': True,
'convert': True,
})
self.config.add(ATTRIBUTES)
pyechonest.config.ECHO_NEST_API_KEY = \
config['echonest']['apikey'].get(unicode)
if config['echonest']['codegen'].get() is not None:
if config['echonest']['codegen']:
pyechonest.config.CODEGEN_BINARY_OVERRIDE = \
config['echonest']['codegen'].get(unicode)
if config['echonest']['auto'].get(bool):
if self.config['auto']:
self.register_listener('import_task_start', self.fetch_song_task)
self.register_listener('import_task_apply', self.apply_metadata_task)
self.register_listener('import_task_apply',
self.apply_metadata_task)
def _echofun(self, func, **kwargs):
"""Wrapper for requests to the EchoNest API. Will retry up to RETRIES
times and wait between retries for RETRY_INTERVAL seconds.
"""Wrapper for requests to the EchoNest API. Will retry up to
RETRIES times and wait between retries for RETRY_INTERVAL
seconds.
"""
for i in range(RETRIES):
try:
@ -95,24 +112,24 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
else:
# If we exited the loop without breaking, then we used up all
# our allotted retries.
raise Exception(u'exceeded retries')
raise ui.UserError(u'echonest request failed repeatedly')
return None
return result
def fingerprint(self, item):
"""Get the fingerprint for this item from the EchoNest. If we already
have a fingerprint, return it and don't calculate it again.
def fingerprint(self, item, key='echonest_fingerprint'):
"""Get the fingerprint for this item from the EchoNest. If we
already have a fingerprint, return it and don't calculate it
again.
"""
if item.get('echonest_fingerprint', None) is None:
try:
code = self._echofun(pyechonest.util.codegen, filename=item.path.decode('utf-8'))
item['echonest_fingerprint'] = code[0]['code']
item.write()
except Exception as exc:
log.error(u'echonest: fingerprinting failed: {0}'
.format(str(exc)))
return None
return item.get('echonest_fingerprint')
if key in item:
return item[key]
try:
code = self._echofun(pyechonest.util.codegen,
filename=item.path.decode('utf-8'))
item['echonest_fingerprint'] = code[0]['code']
item.write()
except Exception as exc:
log.error(u'echonest: fingerprinting failed: {0}'.format(exc))
def convert(self, item):
"""Converts an item in an unsupported media format to ogg. Config
@ -155,66 +172,59 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
return dest
def analyze(self, item):
"""Upload the item to the EchoNest for analysis. May require to
"""Upload the item to the EchoNest for analysis. May require to
convert the item to a supported media format.
"""
try:
source = item.path
if item.format.lower() not in ['wav', 'mp3', 'au', 'ogg', 'mp4', 'm4a']:
if not config['echonest']['convert'].get(bool):
raise Exception(u'format {} not supported for upload'
.format(item.format))
else:
if item.format.lower() not in ALLOWED_FORMATS:
if config['echonest']['convert']:
source = self.convert(item)
if source is None:
raise Exception(u'failed to convert file'
.format(item.format))
else:
raise Exception(u'format {} not supported for upload'
.format(item.format))
log.info(u'echonest: uploading file, be patient')
track = self._echofun(pyechonest.track.track_from_filename,
filename=source)
if track is None:
filename=source)
if not track:
raise Exception(u'failed to upload file')
# Sometimes we have a track but no song. I guess this happens for
# new / unverified songs. We need to 'extract' the audio_summary
# from the track object 'manually'. I don't know why the
# Sometimes we have a track but no song. I guess this happens for
# new / unverified songs. We need to "extract" the audio_summary
# from the track object manually. I don't know why the
# pyechonest API handles tracks (merge audio_summary to __dict__)
# and songs (keep audio_summary in an extra attribute)
# differently.
# Maybe a patch for pyechonest could help?
from_track = {}
from_track['energy'] = track.energy
from_track['liveness'] = track.liveness
from_track['speechiness'] = track.speechiness
from_track['acousticness'] = track.acousticness
from_track['danceability'] = track.danceability
from_track['valence'] = track.valence
from_track['tempo'] = track.tempo
from_track['duration'] = track.duration
for key in ATTRIBUTES:
from_track[key] = getattr(track, key)
ids = []
try:
ids = [track.song_id]
except Exception:
return from_track
songs = self._echofun(pyechonest.song.profile,
ids=ids, track_ids=[track.id],
buckets=['audio_summary'])
if songs is None:
ids=ids, track_ids=[track.id],
buckets=['audio_summary'])
if not songs:
raise Exception(u'failed to retrieve info from upload')
pick = self._pick_song(songs, item)
if pick is None:
if not pick:
return from_track
return pick
except Exception as exc:
log.error(u'echonest: analysis failed: {0}'.format(str(exc)))
return None
def identify(self, item):
"""Try to identify the song at the EchoNest.
"""
try:
code = self.fingerprint(item)
if code is None:
if not code:
raise Exception(u'can not identify without a fingerprint')
songs = self._echofun(pyechonest.song.identify, code=code)
if not songs:
@ -222,7 +232,6 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
return max(songs, key=lambda s: s.score)
except Exception as exc:
log.error(u'echonest: identification failed: {0}'.format(str(exc)))
return None
def _pick_song(self, songs, item):
"""Helper method to pick the best matching song from a list of songs
@ -249,15 +258,14 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
"""
try:
songs = self._echofun(pyechonest.song.search, title=item.title,
results=100, artist=item.artist,
buckets=['id:musicbrainz', 'tracks'])
results=100, artist=item.artist,
buckets=['id:musicbrainz', 'tracks'])
pick = self._pick_song(songs, item)
if pick is None:
raise Exception(u'no (matching) songs found')
return pick
except Exception as exc:
log.error(u'echonest: search failed: {0}'.format(str(exc)))
return None
def profile(self, item):
"""Do a lookup on the EchoNest by MusicBrainz ID.
@ -267,7 +275,8 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
if not item.mb_trackid:
raise Exception(u'musicbrainz ID not available')
mbid = 'musicbrainz:track:{0}'.format(item.mb_trackid)
track = self._echofun(pyechonest.track.track_from_id, identifier=mbid)
track = self._echofun(pyechonest.track.track_from_id,
identifier=mbid)
if not track:
raise Exception(u'could not get track from ID')
ids = track.song_id
@ -280,15 +289,14 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin):
return self._pick_song(songs, item)
except Exception as exc:
log.debug(u'echonest: profile failed: {0}'.format(str(exc)))
return None
def fetch_song(self, item):
"""Try all methods, to get a matching song object from the EchoNest.
"""
methods = [self.profile, self.search]
if config['echonest']['codegen'].get() is not None:
if config['echonest']['codegen']:
methods.append(self.identify)
if config['echonest']['upload'].get(bool):
if config['echonest']['upload']:
methods.append(self.analyze)
for method in methods:
try:
@ -384,22 +392,17 @@ def diff(item1, item2, attributes):
for attr in attributes:
try:
result += abs(
float(item1.get(attr, None)) -
float(item2.get(attr, None))
)
float(item1.get(attr, None)) -
float(item2.get(attr, None))
)
except TypeError:
result += 1.0
return result
def similar(lib, src_item, threshold=0.15):
attributes = []
for attr in ['energy', 'danceability', 'valence', 'speechiness',
'acousticness', 'liveness']:
if ATTRIBUTES[attr] is not None:
attributes.append(ATTRIBUTES[attr])
for item in lib.items():
if not item.path == src_item.path:
d = diff(item, src_item, attributes)
if item.path != src_item.path:
d = diff(item, src_item, ATTRIBUTES.values())
if d < threshold:
print(u'{1:2.2f}: {0}'.format(item.path, d))
@ -414,5 +417,3 @@ class EchonestSimilarPlugin(plugins.BeetsPlugin):
cmd.func = func
return [cmd]
# eof