Merge pull request #3125 from rhlahuja/spotify-candidates

Add Spotify `candidates` and `item_candidates`
This commit is contained in:
Adrian Sampson 2019-01-23 13:48:20 -05:00 committed by GitHub
commit 11ae2679ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 224 additions and 103 deletions

View file

@ -8,6 +8,8 @@ import base64
import webbrowser
import collections
import six
import unidecode
import requests
from beets import ui
@ -85,7 +87,7 @@ class SpotifyPlugin(BeetsPlugin):
except requests.exceptions.HTTPError as e:
raise ui.UserError(
u'Spotify authorization failed: {}\n{}'.format(
e, response.content
e, response.text
)
)
self.access_token = response.json()['access_token']
@ -106,8 +108,8 @@ class SpotifyPlugin(BeetsPlugin):
:param params: (optional) list of tuples or bytes to send
in the query string for the :class:`Request`.
:type params: dict
:return: class:`Response <Response>` object
:rtype: requests.Response
:return: JSON data for the class:`Response <Response>` object.
:rtype: dict
"""
response = request_type(
url,
@ -120,19 +122,19 @@ class SpotifyPlugin(BeetsPlugin):
'Spotify access token has expired. Reauthenticating.'
)
self._authenticate()
self._handle_response(request_type, url, params=params)
return self._handle_response(request_type, url, params=params)
else:
raise ui.UserError(u'Spotify API error:\n{}', response.text)
return response
return response.json()
def _get_spotify_id(self, url_type, id_):
"""Parse a Spotify ID from its URL if necessary.
:param url_type: Type of Spotify URL, either 'album' or 'track'
:param url_type: Type of Spotify URL, either 'album' or 'track'.
:type url_type: str
:param id_: Spotify ID or URL
:param id_: Spotify ID or URL.
:type id_: str
:return: Spotify ID
:return: Spotify ID.
:rtype: str
"""
# Spotify IDs consist of 22 alphanumeric characters
@ -143,22 +145,21 @@ class SpotifyPlugin(BeetsPlugin):
return match.group(2) if match else None
def album_for_id(self, album_id):
"""Fetches an album by its Spotify ID or URL and returns an
"""Fetch an album by its Spotify ID or URL and return an
AlbumInfo object or None if the album is not found.
:param album_id: Spotify ID or URL for the album
:type album_id: str
:return: AlbumInfo object for album
:rtype: beets.autotag.hooks.AlbumInfo
:rtype: beets.autotag.hooks.AlbumInfo or None
"""
spotify_id = self._get_spotify_id('album', album_id)
if spotify_id is None:
return None
response = self._handle_response(
response_data = self._handle_response(
requests.get, self.album_url + spotify_id
)
response_data = response.json()
artist, artist_id = self._get_artist(response_data['artists'])
date_parts = [
@ -178,7 +179,7 @@ class SpotifyPlugin(BeetsPlugin):
else:
raise ui.UserError(
u"Invalid `release_date_precision` returned "
u"from Spotify API: '{}'".format(release_date_precision)
u"by Spotify API: '{}'".format(release_date_precision)
)
tracks = []
@ -231,36 +232,39 @@ class SpotifyPlugin(BeetsPlugin):
data_url=track_data['external_urls']['spotify'],
)
def track_for_id(self, track_id):
"""Fetches a track by its Spotify ID or URL and returns a
def track_for_id(self, track_id=None, track_data=None):
"""Fetch a track by its Spotify ID or URL and return a
TrackInfo object or None if the track is not found.
:param track_id: Spotify ID or URL for the track
:param track_id: (Optional) Spotify ID or URL for the track. Either
``track_id`` or ``track_data`` must be provided.
:type track_id: str
:param track_data: (Optional) Simplified track object dict. May be
provided instead of ``track_id`` to avoid unnecessary API calls.
:type track_data: dict
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
:rtype: beets.autotag.hooks.TrackInfo or None
"""
spotify_id = self._get_spotify_id('track', track_id)
if spotify_id is None:
return None
if track_data is None:
spotify_id = self._get_spotify_id('track', track_id)
if spotify_id is None:
return None
track_data = self._handle_response(
requests.get, self.track_url + spotify_id
)
track = self._get_track(track_data)
response_track = self._handle_response(
requests.get, self.track_url + spotify_id
# Get album's tracks to set `track.index` (position on the entire
# release) and `track.medium_total` (total number of tracks on
# the track's disc).
album_data = self._handle_response(
requests.get, self.album_url + track_data['album']['id']
)
response_data_track = response_track.json()
track = self._get_track(response_data_track)
# get album's tracks to set the track's index/position on
# the entire release
response_album = self._handle_response(
requests.get, self.album_url + response_data_track['album']['id']
)
response_data_album = response_album.json()
medium_total = 0
for i, track_data in enumerate(response_data_album['tracks']['items']):
for i, track_data in enumerate(album_data['tracks']['items']):
if track_data['disc_number'] == track.medium:
medium_total += 1
if track_data['id'] == spotify_id:
if track_data['id'] == track.track_id:
track.index = i + 1
track.medium_total = medium_total
return track
@ -308,12 +312,122 @@ class SpotifyPlugin(BeetsPlugin):
dist.add('source', self.config['source_weight'].as_number())
return dist
def candidates(self, items, artist, album, va_likely):
"""Returns a list of AlbumInfo objects for Spotify Search API results
matching an ``album`` and ``artist`` (if not various).
:param items: List of items comprised by an album to be matched.
:type items: list[beets.library.Item]
:param artist: The artist of the album to be matched.
:type artist: str
:param album: The name of the album to be matched.
:type album: str
:param va_likely: True if the album to be matched likely has
Various Artists.
:type va_likely: bool
:return: Candidate AlbumInfo objects.
:rtype: list[beets.autotag.hooks.AlbumInfo]
"""
query_filters = {'album': album}
if not va_likely:
query_filters['artist'] = artist
response_data = self._search_spotify(
query_type='album', filters=query_filters
)
if response_data is None:
return []
return [
self.album_for_id(album_id=album_data['id'])
for album_data in response_data['albums']['items']
]
def item_candidates(self, item, artist, title):
"""Returns a list of TrackInfo objects for Spotify Search API results
matching ``title`` and ``artist``.
:param item: Singleton item to be matched.
:type item: beets.library.Item
:param artist: The artist of the track to be matched.
:type artist: str
:param title: The title of the track to be matched.
:type title: str
:return: Candidate TrackInfo objects.
:rtype: list[beets.autotag.hooks.TrackInfo]
"""
response_data = self._search_spotify(
query_type='track', keywords=title, filters={'artist': artist}
)
if response_data is None:
return []
return [
self.track_for_id(track_data=track_data)
for track_data in response_data['tracks']['items']
]
@staticmethod
def _construct_search_query(filters=None, keywords=''):
"""Construct a query string with the specified filters and keywords to
be provided to the Spotify Search API
(https://developer.spotify.com/documentation/web-api/reference/search/search/#writing-a-query---guidelines).
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:type keywords: str
:return: Query string to be provided to the Search API.
:rtype: str
"""
query_components = [
keywords,
' '.join(':'.join((k, v)) for k, v in filters.items()),
]
query = ' '.join([q for q in query_components if q])
if not isinstance(query, six.text_type):
query = query.decode('utf8')
return unidecode.unidecode(query)
def _search_spotify(self, query_type, filters=None, keywords=''):
"""Query the Spotify Search API for the specified ``keywords``, applying
the provided ``filters``.
:param query_type: A comma-separated list of item types to search
across. Valid types are: 'album', 'artist', 'playlist', and
'track'. Search results include hits from all the specified item
types.
:type query_type: str
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:type keywords: str
:return: JSON data for the class:`Response <Response>` object or None
if no search results are returned.
:rtype: dict or None
"""
query = self._construct_search_query(
keywords=keywords, filters=filters
)
if not query:
return None
self._log.debug(u"Searching Spotify for '{}'".format(query))
response_data = self._handle_response(
requests.get,
self.search_url,
params={'q': query, 'type': query_type},
)
num_results = 0
for result_type_data in response_data.values():
num_results += len(result_type_data['items'])
self._log.debug(
u"Found {} results from Spotify for '{}'", num_results, query
)
return response_data if num_results > 0 else None
def commands(self):
def queries(lib, opts, args):
success = self.parse_opts(opts)
success = self._parse_opts(opts)
if success:
results = self.query_spotify(lib, ui.decargs(args))
self.output_results(results)
results = self._match_library_tracks(lib, ui.decargs(args))
self._output_match_results(results)
spotify_cmd = ui.Subcommand(
'spotify', help=u'build a Spotify playlist'
@ -335,7 +449,7 @@ class SpotifyPlugin(BeetsPlugin):
spotify_cmd.func = queries
return [spotify_cmd]
def parse_opts(self, opts):
def _parse_opts(self, opts):
if opts.mode:
self.config['mode'].set(opts.mode)
@ -351,19 +465,30 @@ class SpotifyPlugin(BeetsPlugin):
self.opts = opts
return True
def query_spotify(self, lib, query):
def _match_library_tracks(self, library, keywords):
"""Get a list of simplified track object dicts for library tracks
matching the specified ``keywords``.
:param library: beets library object to query.
:type library: beets.library.Library
:param keywords: Query to match library items against.
:type keywords: str
:return: List of simplified track object dicts for library items
matching the specified query.
:rtype: list[dict]
"""
results = []
failures = []
items = lib.items(query)
items = library.items(keywords)
if not items:
self._log.debug(
u'Your beets query returned no items, skipping Spotify'
u'Your beets query returned no items, skipping Spotify.'
)
return
self._log.info(u'Processing {0} tracks...', len(items))
self._log.info(u'Processing {} tracks...', len(items))
for item in items:
# Apply regex transformations if provided
@ -383,90 +508,84 @@ class SpotifyPlugin(BeetsPlugin):
# Custom values can be passed in the config (just in case)
artist = item[self.config['artist_field'].get()]
album = item[self.config['album_field'].get()]
query = item[self.config['track_field'].get()]
query_keywords = '{} album:{} artist:{}'.format(
query, album, artist
)
keywords = item[self.config['track_field'].get()]
# Query the Web API for each track, look for the items' JSON data
try:
response = self._handle_response(
requests.get,
self.search_url,
params={'q': query_keywords, 'type': 'track'},
query_filters = {'artist': artist, 'album': album}
response_data = self._search_spotify(
query_type='track', keywords=keywords, filters=query_filters
)
if response_data is None:
query = self._construct_search_query(
keywords=keywords, filters=query_filters
)
except ui.UserError:
failures.append(query_keywords)
failures.append(query)
continue
response_data = response.json()['tracks']['items']
response_data_tracks = response_data['tracks']['items']
# Apply market filter if requested
region_filter = self.config['region_filter'].get()
if region_filter:
response_data = [
x
for x in response_data
if region_filter in x['available_markets']
response_data_tracks = [
track_data
for track_data in response_data_tracks
if region_filter in track_data['available_markets']
]
# Simplest, take the first result
chosen_result = None
if (
len(response_data) == 1
len(response_data_tracks) == 1
or self.config['tiebreak'].get() == 'first'
):
self._log.debug(
u'Spotify track(s) found, count: {0}', len(response_data)
u'Spotify track(s) found, count: {}',
len(response_data_tracks),
)
chosen_result = response_data[0]
elif len(response_data) > 1:
chosen_result = response_data_tracks[0]
else:
# Use the popularity filter
self._log.debug(
u'Most popular track chosen, count: {0}',
len(response_data),
u'Most popular track chosen, count: {}',
len(response_data_tracks),
)
chosen_result = max(
response_data, key=lambda x: x['popularity']
response_data_tracks, key=lambda x: x['popularity']
)
if chosen_result:
results.append(chosen_result)
else:
self._log.debug(
u'No Spotify track found for the following query: {}',
query_keywords,
)
failures.append(query_keywords)
results.append(chosen_result)
failure_count = len(failures)
if failure_count > 0:
if self.config['show_failures'].get():
self._log.info(
u'{0} track(s) did not match a Spotify ID:', failure_count
u'{} track(s) did not match a Spotify ID:', failure_count
)
for track in failures:
self._log.info(u'track: {0}', track)
self._log.info(u'track: {}', track)
self._log.info(u'')
else:
self._log.warning(
u'{0} track(s) did not match a Spotify ID;\n'
u'{} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count,
)
return results
def output_results(self, results):
if results:
ids = [x['id'] for x in results]
if self.config['mode'].get() == "open":
self._log.info(u'Attempting to open Spotify with playlist')
spotify_url = self.playlist_partial + ",".join(ids)
webbrowser.open(spotify_url)
def _output_match_results(self, results):
"""Open a playlist or print Spotify URLs for the provided track
object dicts.
:param results: List of simplified track object dicts
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
:type results: list[dict]
"""
if results:
spotify_ids = [track_data['id'] for track_data in results]
if self.config['mode'].get() == 'open':
self._log.info(u'Attempting to open Spotify with playlist')
spotify_url = self.playlist_partial + ",".join(spotify_ids)
webbrowser.open(spotify_url)
else:
for item in ids:
print(self.open_track_url + item)
for spotify_id in spotify_ids:
print(self.open_track_url + spotify_id)
else:
self._log.warning(u'No Spotify tracks found from beets query')

View file

@ -47,19 +47,21 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
)
self.spotify = spotify.SpotifyPlugin()
opts = ArgumentsMock("list", False)
self.spotify.parse_opts(opts)
self.spotify._parse_opts(opts)
def tearDown(self):
self.teardown_beets()
def test_args(self):
opts = ArgumentsMock("fail", True)
self.assertEqual(False, self.spotify.parse_opts(opts))
self.assertEqual(False, self.spotify._parse_opts(opts))
opts = ArgumentsMock("list", False)
self.assertEqual(True, self.spotify.parse_opts(opts))
self.assertEqual(True, self.spotify._parse_opts(opts))
def test_empty_query(self):
self.assertEqual(None, self.spotify.query_spotify(self.lib, u"1=2"))
self.assertEqual(
None, self.spotify._match_library_tracks(self.lib, u"1=2")
)
@responses.activate
def test_missing_request(self):
@ -84,13 +86,13 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
length=10,
)
item.add(self.lib)
self.assertEqual([], self.spotify.query_spotify(self.lib, u""))
self.assertEqual([], self.spotify._match_library_tracks(self.lib, u""))
params = _params(responses.calls[0].request.url)
self.assertEqual(
params['q'],
[u'duifhjslkef album:lkajsdflakjsd artist:ujydfsuihse'],
)
query = params['q'][0]
self.assertIn(u'duifhjslkef', query)
self.assertIn(u'artist:ujydfsuihse', query)
self.assertIn(u'album:lkajsdflakjsd', query)
self.assertEqual(params['type'], [u'track'])
@responses.activate
@ -116,16 +118,16 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
length=10,
)
item.add(self.lib)
results = self.spotify.query_spotify(self.lib, u"Happy")
results = self.spotify._match_library_tracks(self.lib, u"Happy")
self.assertEqual(1, len(results))
self.assertEqual(u"6NPVjNh8Jhru9xOmyQigds", results[0]['id'])
self.spotify.output_results(results)
self.spotify._output_match_results(results)
params = _params(responses.calls[0].request.url)
self.assertEqual(
params['q'],
[u'Happy album:Despicable Me 2 artist:Pharrell Williams'],
)
query = params['q'][0]
self.assertIn(u'Happy', query)
self.assertIn(u'artist:Pharrell Williams', query)
self.assertIn(u'album:Despicable Me 2', query)
self.assertEqual(params['type'], [u'track'])