diff --git a/beetsplug/spotify.py b/beetsplug/spotify.py index 36231f297..246e65a6f 100644 --- a/beetsplug/spotify.py +++ b/beetsplug/spotify.py @@ -3,54 +3,334 @@ from __future__ import division, absolute_import, print_function import re +import json +import base64 import webbrowser +import collections + import requests -from beets.plugins import BeetsPlugin -from beets.ui import decargs + from beets import ui -from requests.exceptions import HTTPError +from beets.plugins import BeetsPlugin +from beets.util import confit +from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance class SpotifyPlugin(BeetsPlugin): - - # URL for the Web API of Spotify - # Documentation here: https://developer.spotify.com/web-api/search-item/ - base_url = "https://api.spotify.com/v1/search" - open_url = "http://open.spotify.com/track/" - playlist_partial = "spotify:trackset:Playlist:" + # Base URLs for the Spotify API + # Documentation: https://developer.spotify.com/web-api + oauth_token_url = 'https://accounts.spotify.com/api/token' + open_track_url = 'http://open.spotify.com/track/' + search_url = 'https://api.spotify.com/v1/search' + album_url = 'https://api.spotify.com/v1/albums/' + track_url = 'https://api.spotify.com/v1/tracks/' + playlist_partial = 'spotify:trackset:Playlist:' def __init__(self): super(SpotifyPlugin, self).__init__() - self.config.add({ - 'mode': 'list', - 'tiebreak': 'popularity', - 'show_failures': False, - 'artist_field': 'albumartist', - 'album_field': 'album', - 'track_field': 'title', - 'region_filter': None, - 'regex': [] - }) + self.config.add( + { + 'mode': 'list', + 'tiebreak': 'popularity', + 'show_failures': False, + 'artist_field': 'albumartist', + 'album_field': 'album', + 'track_field': 'title', + 'region_filter': None, + 'regex': [], + 'client_id': '4e414367a1d14c75a5c5129a627fcab8', + 'client_secret': 'f82bdc09b2254f1a8286815d02fd46dc', + 'tokenfile': 'spotify_token.json', + 'source_weight': 0.5, + } + ) + self.config['client_secret'].redact = True + + self.tokenfile = self.config['tokenfile'].get( + confit.Filename(in_app_dir=True) + ) # Path to the JSON file for storing the OAuth access token. + self.setup() + + def setup(self): + """Retrieve previously saved OAuth token or generate a new one.""" + try: + with open(self.tokenfile) as f: + token_data = json.load(f) + except IOError: + self._authenticate() + else: + self.access_token = token_data['access_token'] + + def _authenticate(self): + """Request an access token via the Client Credentials Flow: + https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow + """ + headers = { + 'Authorization': 'Basic {}'.format( + base64.b64encode( + ':'.join( + self.config[k].as_str() + for k in ('client_id', 'client_secret') + ).encode() + ).decode() + ) + } + response = requests.post( + self.oauth_token_url, + data={'grant_type': 'client_credentials'}, + headers=headers, + ) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + raise ui.UserError( + u'Spotify authorization failed: {}\n{}'.format( + e, response.content + ) + ) + self.access_token = response.json()['access_token'] + + # Save the token for later use. + self._log.debug(u'Spotify access token: {}', self.access_token) + with open(self.tokenfile, 'w') as f: + json.dump({'access_token': self.access_token}, f) + + def _handle_response(self, request_type, url, params=None): + """Send a request, reauthenticating if necessary. + + :param request_type: Type of :class:`Request` constructor, + e.g. ``requests.get``, ``requests.post``, etc. + :type request_type: function + :param url: URL for the new :class:`Request` object. + :type url: str + :param params: (optional) list of tuples or bytes to send + in the query string for the :class:`Request`. + :type params: dict + :return: class:`Response ` object + :rtype: requests.Response + """ + response = request_type( + url, + headers={'Authorization': 'Bearer {}'.format(self.access_token)}, + params=params, + ) + if response.status_code != 200: + if u'token expired' in response.text: + self._log.debug( + 'Spotify access token has expired. Reauthenticating.' + ) + self._authenticate() + self._handle_response(request_type, url, params=params) + else: + raise ui.UserError(u'Spotify API error:\n{}', response.text) + return response + + def _get_spotify_id(self, url_type, id_): + """Parse a Spotify ID from its URL if necessary. + + :param url_type: Type of Spotify URL, either 'album' or 'track' + :type url_type: str + :param id_: Spotify ID or URL + :type id_: str + :return: Spotify ID + :rtype: str + """ + # Spotify IDs consist of 22 alphanumeric characters + # (zero-left-padded base62 representation of randomly generated UUID4) + id_regex = r'(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})' + self._log.debug(u'Searching for {} {}', url_type, id_) + match = re.search(id_regex.format(url_type), id_) + return match.group(2) if match else None + + def album_for_id(self, album_id): + """Fetches an album by its Spotify ID or URL and returns an + AlbumInfo object or None if the album is not found. + + :param album_id: Spotify ID or URL for the album + :type album_id: str + :return: AlbumInfo object for album + :rtype: beets.autotag.hooks.AlbumInfo + """ + spotify_id = self._get_spotify_id('album', album_id) + if spotify_id is None: + return None + + response = self._handle_response( + requests.get, self.album_url + spotify_id + ) + response_data = response.json() + artist, artist_id = self._get_artist(response_data['artists']) + + date_parts = [ + int(part) for part in response_data['release_date'].split('-') + ] + + release_date_precision = response_data['release_date_precision'] + if release_date_precision == 'day': + year, month, day = date_parts + elif release_date_precision == 'month': + year, month = date_parts + day = None + elif release_date_precision == 'year': + year = date_parts + month = None + day = None + else: + raise ui.UserError( + u"Invalid `release_date_precision` returned " + u"from Spotify API: '{}'".format(release_date_precision) + ) + + tracks = [] + medium_totals = collections.defaultdict(int) + for i, track_data in enumerate(response_data['tracks']['items']): + track = self._get_track(track_data) + track.index = i + 1 + medium_totals[track.medium] += 1 + tracks.append(track) + for track in tracks: + track.medium_total = medium_totals[track.medium] + + return AlbumInfo( + album=response_data['name'], + album_id=spotify_id, + artist=artist, + artist_id=artist_id, + tracks=tracks, + albumtype=response_data['album_type'], + va=len(response_data['artists']) == 1 + and artist.lower() == 'various artists', + year=year, + month=month, + day=day, + label=response_data['label'], + data_source='Spotify', + data_url=response_data['external_urls']['spotify'], + ) + + def _get_track(self, track_data): + """Convert a Spotify track object dict to a TrackInfo object. + + :param track_data: Simplified track object + (https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified) + :type track_data: dict + :return: TrackInfo object for track + :rtype: beets.autotag.hooks.TrackInfo + """ + artist, artist_id = self._get_artist(track_data['artists']) + return TrackInfo( + title=track_data['name'], + track_id=track_data['id'], + artist=artist, + artist_id=artist_id, + length=track_data['duration_ms'] / 1000, + index=track_data['track_number'], + medium=track_data['disc_number'], + medium_index=track_data['track_number'], + data_source='Spotify', + data_url=track_data['external_urls']['spotify'], + ) + + def track_for_id(self, track_id): + """Fetches a track by its Spotify ID or URL and returns a + TrackInfo object or None if the track is not found. + + :param track_id: Spotify ID or URL for the track + :type track_id: str + :return: TrackInfo object for track + :rtype: beets.autotag.hooks.TrackInfo + """ + spotify_id = self._get_spotify_id('track', track_id) + if spotify_id is None: + return None + + response_track = self._handle_response( + requests.get, self.track_url + spotify_id + ) + response_data_track = response_track.json() + track = self._get_track(response_data_track) + + # get album's tracks to set the track's index/position on + # the entire release + response_album = self._handle_response( + requests.get, self.album_url + response_data_track['album']['id'] + ) + response_data_album = response_album.json() + medium_total = 0 + for i, track_data in enumerate(response_data_album['tracks']['items']): + if track_data['disc_number'] == track.medium: + medium_total += 1 + if track_data['id'] == spotify_id: + track.index = i + 1 + track.medium_total = medium_total + return track + + @staticmethod + def _get_artist(artists): + """Returns an artist string (all artists) and an artist_id (the main + artist) for a list of Spotify artist object dicts. + + :param artists: Iterable of simplified Spotify artist objects + (https://developer.spotify.com/documentation/web-api/reference/object-model/#artist-object-simplified) + :type artists: list[dict] + :return: Normalized artist string + :rtype: str + """ + artist_id = None + artist_names = [] + for artist in artists: + if not artist_id: + artist_id = artist['id'] + name = artist['name'] + # Strip disambiguation number. + name = re.sub(r' \(\d+\)$', '', name) + # Move articles to the front. + name = re.sub(r'^(.*?), (a|an|the)$', r'\2 \1', name, flags=re.I) + artist_names.append(name) + artist = ', '.join(artist_names).replace(' ,', ',') or None + return artist, artist_id + + def album_distance(self, items, album_info, mapping): + """Returns the Spotify source weight and the maximum source weight + for albums. + """ + dist = Distance() + if album_info.data_source == 'Spotify': + dist.add('source', self.config['source_weight'].as_number()) + return dist + + def track_distance(self, item, track_info): + """Returns the Spotify source weight and the maximum source weight + for individual tracks. + """ + dist = Distance() + if track_info.data_source == 'Spotify': + dist.add('source', self.config['source_weight'].as_number()) + return dist def commands(self): def queries(lib, opts, args): success = self.parse_opts(opts) if success: - results = self.query_spotify(lib, decargs(args)) + results = self.query_spotify(lib, ui.decargs(args)) self.output_results(results) + spotify_cmd = ui.Subcommand( - 'spotify', - help=u'build a Spotify playlist' + 'spotify', help=u'build a Spotify playlist' ) spotify_cmd.parser.add_option( - u'-m', u'--mode', action='store', + u'-m', + u'--mode', + action='store', help=u'"open" to open Spotify with playlist, ' - u'"list" to print (default)' + u'"list" to print (default)', ) spotify_cmd.parser.add_option( - u'-f', u'--show-failures', - action='store_true', dest='show_failures', - help=u'list tracks that did not match a Spotify ID' + u'-f', + u'--show-failures', + action='store_true', + dest='show_failures', + help=u'list tracks that did not match a Spotify ID', ) spotify_cmd.func = queries return [spotify_cmd] @@ -63,35 +343,35 @@ class SpotifyPlugin(BeetsPlugin): self.config['show_failures'].set(True) if self.config['mode'].get() not in ['list', 'open']: - self._log.warning(u'{0} is not a valid mode', - self.config['mode'].get()) + self._log.warning( + u'{0} is not a valid mode', self.config['mode'].get() + ) return False self.opts = opts return True def query_spotify(self, lib, query): - results = [] failures = [] items = lib.items(query) if not items: - self._log.debug(u'Your beets query returned no items, ' - u'skipping spotify') + self._log.debug( + u'Your beets query returned no items, skipping Spotify' + ) return self._log.info(u'Processing {0} tracks...', len(items)) for item in items: - # Apply regex transformations if provided for regex in self.config['regex'].get(): if ( - not regex['field'] or - not regex['search'] or - not regex['replace'] + not regex['field'] + or not regex['search'] + or not regex['replace'] ): continue @@ -104,59 +384,76 @@ class SpotifyPlugin(BeetsPlugin): artist = item[self.config['artist_field'].get()] album = item[self.config['album_field'].get()] query = item[self.config['track_field'].get()] - search_url = query + " album:" + album + " artist:" + artist + query_keywords = '{} album:{} artist:{}'.format( + query, album, artist + ) # Query the Web API for each track, look for the items' JSON data - r = requests.get(self.base_url, params={ - "q": search_url, "type": "track" - }) - self._log.debug('{}', r.url) try: - r.raise_for_status() - except HTTPError as e: - self._log.debug(u'URL returned a {0} error', - e.response.status_code) - failures.append(search_url) + response = self._handle_response( + requests.get, + self.search_url, + params={'q': query_keywords, 'type': 'track'}, + ) + except ui.UserError: + failures.append(query_keywords) continue - r_data = r.json()['tracks']['items'] + response_data = response.json()['tracks']['items'] # Apply market filter if requested region_filter = self.config['region_filter'].get() if region_filter: - r_data = [x for x in r_data if region_filter - in x['available_markets']] + response_data = [ + x + for x in response_data + if region_filter in x['available_markets'] + ] # Simplest, take the first result chosen_result = None - if len(r_data) == 1 or self.config['tiebreak'].get() == "first": - self._log.debug(u'Spotify track(s) found, count: {0}', - len(r_data)) - chosen_result = r_data[0] - elif len(r_data) > 1: + if ( + len(response_data) == 1 + or self.config['tiebreak'].get() == 'first' + ): + self._log.debug( + u'Spotify track(s) found, count: {0}', len(response_data) + ) + chosen_result = response_data[0] + elif len(response_data) > 1: # Use the popularity filter - self._log.debug(u'Most popular track chosen, count: {0}', - len(r_data)) - chosen_result = max(r_data, key=lambda x: x['popularity']) + self._log.debug( + u'Most popular track chosen, count: {0}', + len(response_data), + ) + chosen_result = max( + response_data, key=lambda x: x['popularity'] + ) if chosen_result: results.append(chosen_result) else: - self._log.debug(u'No spotify track found: {0}', search_url) - failures.append(search_url) + self._log.debug( + u'No Spotify track found for the following query: {}', + query_keywords, + ) + failures.append(query_keywords) failure_count = len(failures) if failure_count > 0: if self.config['show_failures'].get(): - self._log.info(u'{0} track(s) did not match a Spotify ID:', - failure_count) + self._log.info( + u'{0} track(s) did not match a Spotify ID:', failure_count + ) for track in failures: self._log.info(u'track: {0}', track) self._log.info(u'') else: - self._log.warning(u'{0} track(s) did not match a Spotify ID;\n' - u'use --show-failures to display', - failure_count) + self._log.warning( + u'{0} track(s) did not match a Spotify ID;\n' + u'use --show-failures to display', + failure_count, + ) return results @@ -170,6 +467,6 @@ class SpotifyPlugin(BeetsPlugin): else: for item in ids: - print(self.open_url + item) + print(self.open_track_url + item) else: self._log.warning(u'No Spotify tracks found from beets query') diff --git a/docs/plugins/spotify.rst b/docs/plugins/spotify.rst index b993a66d2..3f4c6c43d 100644 --- a/docs/plugins/spotify.rst +++ b/docs/plugins/spotify.rst @@ -1,10 +1,16 @@ Spotify Plugin ============== -The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your library. Using the `Spotify Web API`_, any tracks that can be matched with a Spotify ID are returned, and the results can be either pasted in to a playlist or opened directly in the Spotify app. +The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your +library with the ``beet spotify`` command using the `Spotify Search API`_. + +Also, the plugin can use the Spotify `Album`_ and `Track`_ APIs to provide +metadata matches for the importer. .. _Spotify: https://www.spotify.com/ -.. _Spotify Web API: https://developer.spotify.com/web-api/search-item/ +.. _Spotify Search API: https://developer.spotify.com/documentation/web-api/reference/search/search/ +.. _Album: https://developer.spotify.com/documentation/web-api/reference/albums/get-album/ +.. _Track: https://developer.spotify.com/documentation/web-api/reference/tracks/get-track/ Why Use This Plugin? -------------------- @@ -12,10 +18,10 @@ Why Use This Plugin? * You're a Beets user and Spotify user already. * You have playlists or albums you'd like to make available in Spotify from Beets without having to search for each artist/album/track. * You want to check which tracks in your library are available on Spotify. +* You want to autotag music with metadata from the Spotify API. Basic Usage ----------- - First, enable the ``spotify`` plugin (see :ref:`using-plugins`). Then, use the ``spotify`` command with a beets query:: @@ -37,6 +43,12 @@ Command-line options include: * ``--show-failures`` or ``-f``: List the tracks that did not match a Spotify ID. +You can enter the URL for an album or song on Spotify at the ``enter Id`` +prompt during import:: + + Enter search, enter Id, aBort, eDit, edit Candidates, plaY? i + Enter release ID: https://open.spotify.com/album/2rFYTHFBLQN3AYlrymBPPA + Configuration ------------- @@ -67,10 +79,14 @@ in config.yaml under the ``spotify:`` section: track/album/artist fields before sending them to Spotify. Can be useful for changing certain abbreviations, like ft. -> feat. See the examples below. Default: None. +- **source_weight**: Penalty applied to Spotify matches during import. Set to + 0.0 to disable. + Default: ``0.5``. Here's an example:: spotify: + source_weight: 0.7 mode: open region_filter: US show_failures: on diff --git a/test/test_spotify.py b/test/test_spotify.py index 17f3ef42f..221c21e74 100644 --- a/test/test_spotify.py +++ b/test/test_spotify.py @@ -29,10 +29,22 @@ def _params(url): class SpotifyPluginTest(_common.TestCase, TestHelper): - + @responses.activate def setUp(self): config.clear() self.setup_beets() + responses.add( + responses.POST, + spotify.SpotifyPlugin.oauth_token_url, + status=200, + json={ + 'access_token': '3XyiC3raJySbIAV5LVYj1DaWbcocNi3LAJTNXRnYY' + 'GVUl6mbbqXNhW3YcZnQgYXNWHFkVGSMlc0tMuvq8CF', + 'token_type': 'Bearer', + 'expires_in': 3600, + 'scope': '', + }, + ) self.spotify = spotify.SpotifyPlugin() opts = ArgumentsMock("list", False) self.spotify.parse_opts(opts) @@ -51,20 +63,25 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): @responses.activate def test_missing_request(self): - json_file = os.path.join(_common.RSRC, b'spotify', - b'missing_request.json') + json_file = os.path.join( + _common.RSRC, b'spotify', b'missing_request.json' + ) with open(json_file, 'rb') as f: response_body = f.read() - responses.add(responses.GET, 'https://api.spotify.com/v1/search', - body=response_body, status=200, - content_type='application/json') + responses.add( + responses.GET, + spotify.SpotifyPlugin.search_url, + body=response_body, + status=200, + content_type='application/json', + ) item = Item( mb_trackid=u'01234', album=u'lkajsdflakjsd', albumartist=u'ujydfsuihse', title=u'duifhjslkef', - length=10 + length=10, ) item.add(self.lib) self.assertEqual([], self.spotify.query_spotify(self.lib, u"")) @@ -78,21 +95,25 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): @responses.activate def test_track_request(self): - - json_file = os.path.join(_common.RSRC, b'spotify', - b'track_request.json') + json_file = os.path.join( + _common.RSRC, b'spotify', b'track_request.json' + ) with open(json_file, 'rb') as f: response_body = f.read() - responses.add(responses.GET, 'https://api.spotify.com/v1/search', - body=response_body, status=200, - content_type='application/json') + responses.add( + responses.GET, + spotify.SpotifyPlugin.search_url, + body=response_body, + status=200, + content_type='application/json', + ) item = Item( mb_trackid=u'01234', album=u'Despicable Me 2', albumartist=u'Pharrell Williams', title=u'Happy', - length=10 + length=10, ) item.add(self.lib) results = self.spotify.query_spotify(self.lib, u"Happy") @@ -111,5 +132,6 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): def suite(): return unittest.TestLoader().loadTestsFromName(__name__) + if __name__ == '__main__': unittest.main(defaultTest='suite')