diff --git a/beetsplug/spotify.py b/beetsplug/spotify.py index 246e65a6f..b94d0e8e6 100644 --- a/beetsplug/spotify.py +++ b/beetsplug/spotify.py @@ -8,6 +8,8 @@ import base64 import webbrowser import collections +import six +import unidecode import requests from beets import ui @@ -85,7 +87,7 @@ class SpotifyPlugin(BeetsPlugin): except requests.exceptions.HTTPError as e: raise ui.UserError( u'Spotify authorization failed: {}\n{}'.format( - e, response.content + e, response.text ) ) self.access_token = response.json()['access_token'] @@ -106,8 +108,8 @@ class SpotifyPlugin(BeetsPlugin): :param params: (optional) list of tuples or bytes to send in the query string for the :class:`Request`. :type params: dict - :return: class:`Response ` object - :rtype: requests.Response + :return: JSON data for the class:`Response ` object. + :rtype: dict """ response = request_type( url, @@ -120,19 +122,19 @@ class SpotifyPlugin(BeetsPlugin): 'Spotify access token has expired. Reauthenticating.' ) self._authenticate() - self._handle_response(request_type, url, params=params) + return self._handle_response(request_type, url, params=params) else: raise ui.UserError(u'Spotify API error:\n{}', response.text) - return response + return response.json() def _get_spotify_id(self, url_type, id_): """Parse a Spotify ID from its URL if necessary. - :param url_type: Type of Spotify URL, either 'album' or 'track' + :param url_type: Type of Spotify URL, either 'album' or 'track'. :type url_type: str - :param id_: Spotify ID or URL + :param id_: Spotify ID or URL. :type id_: str - :return: Spotify ID + :return: Spotify ID. :rtype: str """ # Spotify IDs consist of 22 alphanumeric characters @@ -143,22 +145,21 @@ class SpotifyPlugin(BeetsPlugin): return match.group(2) if match else None def album_for_id(self, album_id): - """Fetches an album by its Spotify ID or URL and returns an + """Fetch an album by its Spotify ID or URL and return an AlbumInfo object or None if the album is not found. :param album_id: Spotify ID or URL for the album :type album_id: str :return: AlbumInfo object for album - :rtype: beets.autotag.hooks.AlbumInfo + :rtype: beets.autotag.hooks.AlbumInfo or None """ spotify_id = self._get_spotify_id('album', album_id) if spotify_id is None: return None - response = self._handle_response( + response_data = self._handle_response( requests.get, self.album_url + spotify_id ) - response_data = response.json() artist, artist_id = self._get_artist(response_data['artists']) date_parts = [ @@ -178,7 +179,7 @@ class SpotifyPlugin(BeetsPlugin): else: raise ui.UserError( u"Invalid `release_date_precision` returned " - u"from Spotify API: '{}'".format(release_date_precision) + u"by Spotify API: '{}'".format(release_date_precision) ) tracks = [] @@ -231,36 +232,39 @@ class SpotifyPlugin(BeetsPlugin): data_url=track_data['external_urls']['spotify'], ) - def track_for_id(self, track_id): - """Fetches a track by its Spotify ID or URL and returns a + def track_for_id(self, track_id=None, track_data=None): + """Fetch a track by its Spotify ID or URL and return a TrackInfo object or None if the track is not found. - :param track_id: Spotify ID or URL for the track + :param track_id: (Optional) Spotify ID or URL for the track. Either + ``track_id`` or ``track_data`` must be provided. :type track_id: str + :param track_data: (Optional) Simplified track object dict. May be + provided instead of ``track_id`` to avoid unnecessary API calls. + :type track_data: dict :return: TrackInfo object for track - :rtype: beets.autotag.hooks.TrackInfo + :rtype: beets.autotag.hooks.TrackInfo or None """ - spotify_id = self._get_spotify_id('track', track_id) - if spotify_id is None: - return None + if track_data is None: + spotify_id = self._get_spotify_id('track', track_id) + if spotify_id is None: + return None + track_data = self._handle_response( + requests.get, self.track_url + spotify_id + ) + track = self._get_track(track_data) - response_track = self._handle_response( - requests.get, self.track_url + spotify_id + # Get album's tracks to set `track.index` (position on the entire + # release) and `track.medium_total` (total number of tracks on + # the track's disc). + album_data = self._handle_response( + requests.get, self.album_url + track_data['album']['id'] ) - response_data_track = response_track.json() - track = self._get_track(response_data_track) - - # get album's tracks to set the track's index/position on - # the entire release - response_album = self._handle_response( - requests.get, self.album_url + response_data_track['album']['id'] - ) - response_data_album = response_album.json() medium_total = 0 - for i, track_data in enumerate(response_data_album['tracks']['items']): + for i, track_data in enumerate(album_data['tracks']['items']): if track_data['disc_number'] == track.medium: medium_total += 1 - if track_data['id'] == spotify_id: + if track_data['id'] == track.track_id: track.index = i + 1 track.medium_total = medium_total return track @@ -308,12 +312,122 @@ class SpotifyPlugin(BeetsPlugin): dist.add('source', self.config['source_weight'].as_number()) return dist + def candidates(self, items, artist, album, va_likely): + """Returns a list of AlbumInfo objects for Spotify Search API results + matching an ``album`` and ``artist`` (if not various). + + :param items: List of items comprised by an album to be matched. + :type items: list[beets.library.Item] + :param artist: The artist of the album to be matched. + :type artist: str + :param album: The name of the album to be matched. + :type album: str + :param va_likely: True if the album to be matched likely has + Various Artists. + :type va_likely: bool + :return: Candidate AlbumInfo objects. + :rtype: list[beets.autotag.hooks.AlbumInfo] + """ + query_filters = {'album': album} + if not va_likely: + query_filters['artist'] = artist + response_data = self._search_spotify( + query_type='album', filters=query_filters + ) + if response_data is None: + return [] + return [ + self.album_for_id(album_id=album_data['id']) + for album_data in response_data['albums']['items'] + ] + + def item_candidates(self, item, artist, title): + """Returns a list of TrackInfo objects for Spotify Search API results + matching ``title`` and ``artist``. + + :param item: Singleton item to be matched. + :type item: beets.library.Item + :param artist: The artist of the track to be matched. + :type artist: str + :param title: The title of the track to be matched. + :type title: str + :return: Candidate TrackInfo objects. + :rtype: list[beets.autotag.hooks.TrackInfo] + """ + response_data = self._search_spotify( + query_type='track', keywords=title, filters={'artist': artist} + ) + if response_data is None: + return [] + return [ + self.track_for_id(track_data=track_data) + for track_data in response_data['tracks']['items'] + ] + + @staticmethod + def _construct_search_query(filters=None, keywords=''): + """Construct a query string with the specified filters and keywords to + be provided to the Spotify Search API + (https://developer.spotify.com/documentation/web-api/reference/search/search/#writing-a-query---guidelines). + + :param filters: (Optional) Field filters to apply. + :type filters: dict + :param keywords: (Optional) Query keywords to use. + :type keywords: str + :return: Query string to be provided to the Search API. + :rtype: str + """ + query_components = [ + keywords, + ' '.join(':'.join((k, v)) for k, v in filters.items()), + ] + query = ' '.join([q for q in query_components if q]) + if not isinstance(query, six.text_type): + query = query.decode('utf8') + return unidecode.unidecode(query) + + def _search_spotify(self, query_type, filters=None, keywords=''): + """Query the Spotify Search API for the specified ``keywords``, applying + the provided ``filters``. + + :param query_type: A comma-separated list of item types to search + across. Valid types are: 'album', 'artist', 'playlist', and + 'track'. Search results include hits from all the specified item + types. + :type query_type: str + :param filters: (Optional) Field filters to apply. + :type filters: dict + :param keywords: (Optional) Query keywords to use. + :type keywords: str + :return: JSON data for the class:`Response ` object or None + if no search results are returned. + :rtype: dict or None + """ + query = self._construct_search_query( + keywords=keywords, filters=filters + ) + if not query: + return None + self._log.debug(u"Searching Spotify for '{}'".format(query)) + response_data = self._handle_response( + requests.get, + self.search_url, + params={'q': query, 'type': query_type}, + ) + num_results = 0 + for result_type_data in response_data.values(): + num_results += len(result_type_data['items']) + self._log.debug( + u"Found {} results from Spotify for '{}'", num_results, query + ) + return response_data if num_results > 0 else None + def commands(self): def queries(lib, opts, args): - success = self.parse_opts(opts) + success = self._parse_opts(opts) if success: - results = self.query_spotify(lib, ui.decargs(args)) - self.output_results(results) + results = self._match_library_tracks(lib, ui.decargs(args)) + self._output_match_results(results) spotify_cmd = ui.Subcommand( 'spotify', help=u'build a Spotify playlist' @@ -335,7 +449,7 @@ class SpotifyPlugin(BeetsPlugin): spotify_cmd.func = queries return [spotify_cmd] - def parse_opts(self, opts): + def _parse_opts(self, opts): if opts.mode: self.config['mode'].set(opts.mode) @@ -351,19 +465,30 @@ class SpotifyPlugin(BeetsPlugin): self.opts = opts return True - def query_spotify(self, lib, query): + def _match_library_tracks(self, library, keywords): + """Get a list of simplified track object dicts for library tracks + matching the specified ``keywords``. + + :param library: beets library object to query. + :type library: beets.library.Library + :param keywords: Query to match library items against. + :type keywords: str + :return: List of simplified track object dicts for library items + matching the specified query. + :rtype: list[dict] + """ results = [] failures = [] - items = lib.items(query) + items = library.items(keywords) if not items: self._log.debug( - u'Your beets query returned no items, skipping Spotify' + u'Your beets query returned no items, skipping Spotify.' ) return - self._log.info(u'Processing {0} tracks...', len(items)) + self._log.info(u'Processing {} tracks...', len(items)) for item in items: # Apply regex transformations if provided @@ -383,90 +508,84 @@ class SpotifyPlugin(BeetsPlugin): # Custom values can be passed in the config (just in case) artist = item[self.config['artist_field'].get()] album = item[self.config['album_field'].get()] - query = item[self.config['track_field'].get()] - query_keywords = '{} album:{} artist:{}'.format( - query, album, artist - ) + keywords = item[self.config['track_field'].get()] # Query the Web API for each track, look for the items' JSON data - try: - response = self._handle_response( - requests.get, - self.search_url, - params={'q': query_keywords, 'type': 'track'}, + query_filters = {'artist': artist, 'album': album} + response_data = self._search_spotify( + query_type='track', keywords=keywords, filters=query_filters + ) + if response_data is None: + query = self._construct_search_query( + keywords=keywords, filters=query_filters ) - except ui.UserError: - failures.append(query_keywords) + failures.append(query) continue - - response_data = response.json()['tracks']['items'] + response_data_tracks = response_data['tracks']['items'] # Apply market filter if requested region_filter = self.config['region_filter'].get() if region_filter: - response_data = [ - x - for x in response_data - if region_filter in x['available_markets'] + response_data_tracks = [ + track_data + for track_data in response_data_tracks + if region_filter in track_data['available_markets'] ] - # Simplest, take the first result - chosen_result = None if ( - len(response_data) == 1 + len(response_data_tracks) == 1 or self.config['tiebreak'].get() == 'first' ): self._log.debug( - u'Spotify track(s) found, count: {0}', len(response_data) + u'Spotify track(s) found, count: {}', + len(response_data_tracks), ) - chosen_result = response_data[0] - elif len(response_data) > 1: + chosen_result = response_data_tracks[0] + else: # Use the popularity filter self._log.debug( - u'Most popular track chosen, count: {0}', - len(response_data), + u'Most popular track chosen, count: {}', + len(response_data_tracks), ) chosen_result = max( - response_data, key=lambda x: x['popularity'] + response_data_tracks, key=lambda x: x['popularity'] ) - - if chosen_result: - results.append(chosen_result) - else: - self._log.debug( - u'No Spotify track found for the following query: {}', - query_keywords, - ) - failures.append(query_keywords) + results.append(chosen_result) failure_count = len(failures) if failure_count > 0: if self.config['show_failures'].get(): self._log.info( - u'{0} track(s) did not match a Spotify ID:', failure_count + u'{} track(s) did not match a Spotify ID:', failure_count ) for track in failures: - self._log.info(u'track: {0}', track) + self._log.info(u'track: {}', track) self._log.info(u'') else: self._log.warning( - u'{0} track(s) did not match a Spotify ID;\n' + u'{} track(s) did not match a Spotify ID;\n' u'use --show-failures to display', failure_count, ) return results - def output_results(self, results): - if results: - ids = [x['id'] for x in results] - if self.config['mode'].get() == "open": - self._log.info(u'Attempting to open Spotify with playlist') - spotify_url = self.playlist_partial + ",".join(ids) - webbrowser.open(spotify_url) + def _output_match_results(self, results): + """Open a playlist or print Spotify URLs for the provided track + object dicts. + :param results: List of simplified track object dicts + (https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified) + :type results: list[dict] + """ + if results: + spotify_ids = [track_data['id'] for track_data in results] + if self.config['mode'].get() == 'open': + self._log.info(u'Attempting to open Spotify with playlist') + spotify_url = self.playlist_partial + ",".join(spotify_ids) + webbrowser.open(spotify_url) else: - for item in ids: - print(self.open_track_url + item) + for spotify_id in spotify_ids: + print(self.open_track_url + spotify_id) else: self._log.warning(u'No Spotify tracks found from beets query') diff --git a/test/test_spotify.py b/test/test_spotify.py index 221c21e74..ea54a13db 100644 --- a/test/test_spotify.py +++ b/test/test_spotify.py @@ -47,19 +47,21 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): ) self.spotify = spotify.SpotifyPlugin() opts = ArgumentsMock("list", False) - self.spotify.parse_opts(opts) + self.spotify._parse_opts(opts) def tearDown(self): self.teardown_beets() def test_args(self): opts = ArgumentsMock("fail", True) - self.assertEqual(False, self.spotify.parse_opts(opts)) + self.assertEqual(False, self.spotify._parse_opts(opts)) opts = ArgumentsMock("list", False) - self.assertEqual(True, self.spotify.parse_opts(opts)) + self.assertEqual(True, self.spotify._parse_opts(opts)) def test_empty_query(self): - self.assertEqual(None, self.spotify.query_spotify(self.lib, u"1=2")) + self.assertEqual( + None, self.spotify._match_library_tracks(self.lib, u"1=2") + ) @responses.activate def test_missing_request(self): @@ -84,13 +86,13 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): length=10, ) item.add(self.lib) - self.assertEqual([], self.spotify.query_spotify(self.lib, u"")) + self.assertEqual([], self.spotify._match_library_tracks(self.lib, u"")) params = _params(responses.calls[0].request.url) - self.assertEqual( - params['q'], - [u'duifhjslkef album:lkajsdflakjsd artist:ujydfsuihse'], - ) + query = params['q'][0] + self.assertIn(u'duifhjslkef', query) + self.assertIn(u'artist:ujydfsuihse', query) + self.assertIn(u'album:lkajsdflakjsd', query) self.assertEqual(params['type'], [u'track']) @responses.activate @@ -116,16 +118,16 @@ class SpotifyPluginTest(_common.TestCase, TestHelper): length=10, ) item.add(self.lib) - results = self.spotify.query_spotify(self.lib, u"Happy") + results = self.spotify._match_library_tracks(self.lib, u"Happy") self.assertEqual(1, len(results)) self.assertEqual(u"6NPVjNh8Jhru9xOmyQigds", results[0]['id']) - self.spotify.output_results(results) + self.spotify._output_match_results(results) params = _params(responses.calls[0].request.url) - self.assertEqual( - params['q'], - [u'Happy album:Despicable Me 2 artist:Pharrell Williams'], - ) + query = params['q'][0] + self.assertIn(u'Happy', query) + self.assertIn(u'artist:Pharrell Williams', query) + self.assertIn(u'album:Despicable Me 2', query) self.assertEqual(params['type'], [u'track'])