From 8fe254f0866fc83cd81957b107dd4ec520632f78 Mon Sep 17 00:00:00 2001 From: Adrian Sampson Date: Tue, 26 Nov 2013 15:20:18 -0800 Subject: [PATCH] echo nest: top-level code review pass General Python/beets style changes: - "x is None" can generally be replaced with "not x" - prefer iteration over repetitive lines - config[foo].get(bool) can be replaced with config[foo] in conditions - removed extraneous "return None", which is what Python functions do automatically if you don't return anything - 79-column wrap --- beetsplug/echonest.py | 187 +++++++++++++++++++++--------------------- 1 file changed, 94 insertions(+), 93 deletions(-) diff --git a/beetsplug/echonest.py b/beetsplug/echonest.py index 61cbd44a6..331add383 100644 --- a/beetsplug/echonest.py +++ b/beetsplug/echonest.py @@ -1,10 +1,28 @@ # This file is part of beets. +# Copyright 2013, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +"""Fetch a variety of acoustic metrics from The Echo Nest. +""" import time import logging import socket -import math +import os +import tempfile +from string import Template +from subprocess import Popen -from beets import util, config, plugins, ui, library +from beets import util, config, plugins, ui import pyechonest import pyechonest.song import pyechonest.track @@ -16,29 +34,27 @@ log = logging.getLogger('beets') RETRIES = 10 RETRY_INTERVAL = 10 -# for converting files -import os -import tempfile -from string import Template -from subprocess import Popen DEVNULL = open(os.devnull, 'wb') +ALLOWED_FORMATS = ('MP3', 'OGG', 'AAC') -# The attributes we can import and where to store them -# Note: We use echonest_id (song_id) and echonest_fingerprint to speed up -# lookups. They are not listed as attributes here. +# The attributes we can import and where to store them in beets fields. +# Note: We also use echonest_id (song_id) and echonest_fingerprint to speed up +# lookups. They are not listed as attributes here. ATTRIBUTES = { - 'energy' : 'energy', - 'liveness' : 'liveness', - 'speechiness' : 'speechiness', - 'acousticness' : 'acousticness', - 'danceability' : 'danceability', - 'valence' : 'valence', - 'tempo' : 'bpm', - } + 'energy': 'energy', + 'liveness': 'liveness', + 'speechiness': 'speechiness', + 'acousticness': 'acousticness', + 'danceability': 'danceability', + 'valence': 'valence', + 'tempo': 'bpm', +} -def _splitstrip(string): - """Split string at comma and return the stripped values as array.""" - return [ s.strip() for s in string.split(u',') ] +def _splitstrip(string, delim=u','): + """Split string (at commas by default) and strip whitespace from the + pieces. + """ + return [s.strip() for s in string.split(delim)] class EchonestMetadataPlugin(plugins.BeetsPlugin): _songs = {} @@ -48,29 +64,30 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): def __init__(self): super(EchonestMetadataPlugin, self).__init__() self.config.add({ - 'auto' : True, - 'apikey' : u'NY2KTZHQ0QDSHBAP6', - 'codegen' : None, - 'upload' : True, - 'convert' : True, - }) - for k, v in ATTRIBUTES.iteritems(): - self.config.add({k:v}) + 'auto': True, + 'apikey': u'NY2KTZHQ0QDSHBAP6', + 'codegen': None, + 'upload': True, + 'convert': True, + }) + self.config.add(ATTRIBUTES) pyechonest.config.ECHO_NEST_API_KEY = \ config['echonest']['apikey'].get(unicode) - if config['echonest']['codegen'].get() is not None: + if config['echonest']['codegen']: pyechonest.config.CODEGEN_BINARY_OVERRIDE = \ config['echonest']['codegen'].get(unicode) - if config['echonest']['auto'].get(bool): + if self.config['auto']: self.register_listener('import_task_start', self.fetch_song_task) - self.register_listener('import_task_apply', self.apply_metadata_task) + self.register_listener('import_task_apply', + self.apply_metadata_task) def _echofun(self, func, **kwargs): - """Wrapper for requests to the EchoNest API. Will retry up to RETRIES - times and wait between retries for RETRY_INTERVAL seconds. + """Wrapper for requests to the EchoNest API. Will retry up to + RETRIES times and wait between retries for RETRY_INTERVAL + seconds. """ for i in range(RETRIES): try: @@ -95,24 +112,24 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): else: # If we exited the loop without breaking, then we used up all # our allotted retries. - raise Exception(u'exceeded retries') + raise ui.UserError(u'echonest request failed repeatedly') return None return result - def fingerprint(self, item): - """Get the fingerprint for this item from the EchoNest. If we already - have a fingerprint, return it and don't calculate it again. + def fingerprint(self, item, key='echonest_fingerprint'): + """Get the fingerprint for this item from the EchoNest. If we + already have a fingerprint, return it and don't calculate it + again. """ - if item.get('echonest_fingerprint', None) is None: - try: - code = self._echofun(pyechonest.util.codegen, filename=item.path.decode('utf-8')) - item['echonest_fingerprint'] = code[0]['code'] - item.write() - except Exception as exc: - log.error(u'echonest: fingerprinting failed: {0}' - .format(str(exc))) - return None - return item.get('echonest_fingerprint') + if key in item: + return item[key] + try: + code = self._echofun(pyechonest.util.codegen, + filename=item.path.decode('utf-8')) + item['echonest_fingerprint'] = code[0]['code'] + item.write() + except Exception as exc: + log.error(u'echonest: fingerprinting failed: {0}'.format(exc)) def convert(self, item): """Converts an item in an unsupported media format to ogg. Config @@ -155,66 +172,59 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): return dest def analyze(self, item): - """Upload the item to the EchoNest for analysis. May require to + """Upload the item to the EchoNest for analysis. May require to convert the item to a supported media format. """ try: source = item.path - if item.format.lower() not in ['wav', 'mp3', 'au', 'ogg', 'mp4', 'm4a']: - if not config['echonest']['convert'].get(bool): - raise Exception(u'format {} not supported for upload' - .format(item.format)) - else: + if item.format.lower() not in ALLOWED_FORMATS: + if config['echonest']['convert']: source = self.convert(item) if source is None: raise Exception(u'failed to convert file' .format(item.format)) + else: + raise Exception(u'format {} not supported for upload' + .format(item.format)) log.info(u'echonest: uploading file, be patient') track = self._echofun(pyechonest.track.track_from_filename, - filename=source) - if track is None: + filename=source) + if not track: raise Exception(u'failed to upload file') - # Sometimes we have a track but no song. I guess this happens for - # new / unverified songs. We need to 'extract' the audio_summary - # from the track object 'manually'. I don't know why the + # Sometimes we have a track but no song. I guess this happens for + # new / unverified songs. We need to "extract" the audio_summary + # from the track object manually. I don't know why the # pyechonest API handles tracks (merge audio_summary to __dict__) # and songs (keep audio_summary in an extra attribute) # differently. # Maybe a patch for pyechonest could help? from_track = {} - from_track['energy'] = track.energy - from_track['liveness'] = track.liveness - from_track['speechiness'] = track.speechiness - from_track['acousticness'] = track.acousticness - from_track['danceability'] = track.danceability - from_track['valence'] = track.valence - from_track['tempo'] = track.tempo - from_track['duration'] = track.duration + for key in ATTRIBUTES: + from_track[key] = getattr(track, key) ids = [] try: ids = [track.song_id] except Exception: return from_track songs = self._echofun(pyechonest.song.profile, - ids=ids, track_ids=[track.id], - buckets=['audio_summary']) - if songs is None: + ids=ids, track_ids=[track.id], + buckets=['audio_summary']) + if not songs: raise Exception(u'failed to retrieve info from upload') pick = self._pick_song(songs, item) - if pick is None: + if not pick: return from_track return pick except Exception as exc: log.error(u'echonest: analysis failed: {0}'.format(str(exc))) - return None def identify(self, item): """Try to identify the song at the EchoNest. """ try: code = self.fingerprint(item) - if code is None: + if not code: raise Exception(u'can not identify without a fingerprint') songs = self._echofun(pyechonest.song.identify, code=code) if not songs: @@ -222,7 +232,6 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): return max(songs, key=lambda s: s.score) except Exception as exc: log.error(u'echonest: identification failed: {0}'.format(str(exc))) - return None def _pick_song(self, songs, item): """Helper method to pick the best matching song from a list of songs @@ -249,15 +258,14 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): """ try: songs = self._echofun(pyechonest.song.search, title=item.title, - results=100, artist=item.artist, - buckets=['id:musicbrainz', 'tracks']) + results=100, artist=item.artist, + buckets=['id:musicbrainz', 'tracks']) pick = self._pick_song(songs, item) if pick is None: raise Exception(u'no (matching) songs found') return pick except Exception as exc: log.error(u'echonest: search failed: {0}'.format(str(exc))) - return None def profile(self, item): """Do a lookup on the EchoNest by MusicBrainz ID. @@ -267,7 +275,8 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): if not item.mb_trackid: raise Exception(u'musicbrainz ID not available') mbid = 'musicbrainz:track:{0}'.format(item.mb_trackid) - track = self._echofun(pyechonest.track.track_from_id, identifier=mbid) + track = self._echofun(pyechonest.track.track_from_id, + identifier=mbid) if not track: raise Exception(u'could not get track from ID') ids = track.song_id @@ -280,15 +289,14 @@ class EchonestMetadataPlugin(plugins.BeetsPlugin): return self._pick_song(songs, item) except Exception as exc: log.debug(u'echonest: profile failed: {0}'.format(str(exc))) - return None def fetch_song(self, item): """Try all methods, to get a matching song object from the EchoNest. """ methods = [self.profile, self.search] - if config['echonest']['codegen'].get() is not None: + if config['echonest']['codegen']: methods.append(self.identify) - if config['echonest']['upload'].get(bool): + if config['echonest']['upload']: methods.append(self.analyze) for method in methods: try: @@ -384,22 +392,17 @@ def diff(item1, item2, attributes): for attr in attributes: try: result += abs( - float(item1.get(attr, None)) - - float(item2.get(attr, None)) - ) + float(item1.get(attr, None)) - + float(item2.get(attr, None)) + ) except TypeError: result += 1.0 return result def similar(lib, src_item, threshold=0.15): - attributes = [] - for attr in ['energy', 'danceability', 'valence', 'speechiness', - 'acousticness', 'liveness']: - if ATTRIBUTES[attr] is not None: - attributes.append(ATTRIBUTES[attr]) for item in lib.items(): - if not item.path == src_item.path: - d = diff(item, src_item, attributes) + if item.path != src_item.path: + d = diff(item, src_item, ATTRIBUTES.values()) if d < threshold: print(u'{1:2.2f}: {0}'.format(item.path, d)) @@ -414,5 +417,3 @@ class EchonestSimilarPlugin(plugins.BeetsPlugin): cmd.func = func return [cmd] - -# eof