Add candidates and item_candidates, modularize Search API queries

This commit is contained in:
Rahul Ahuja 2019-01-21 21:24:41 -08:00
parent 1087740580
commit 5472a49991
2 changed files with 127 additions and 62 deletions

View file

@ -85,7 +85,7 @@ class SpotifyPlugin(BeetsPlugin):
except requests.exceptions.HTTPError as e:
raise ui.UserError(
u'Spotify authorization failed: {}\n{}'.format(
e, response.content
e, response.text
)
)
self.access_token = response.json()['access_token']
@ -106,8 +106,8 @@ class SpotifyPlugin(BeetsPlugin):
:param params: (optional) list of tuples or bytes to send
in the query string for the :class:`Request`.
:type params: dict
:return: class:`Response <Response>` object
:rtype: requests.Response
:return: JSON data for the class:`Response <Response>` object
:rtype: dict
"""
response = request_type(
url,
@ -123,7 +123,7 @@ class SpotifyPlugin(BeetsPlugin):
self._handle_response(request_type, url, params=params)
else:
raise ui.UserError(u'Spotify API error:\n{}', response.text)
return response
return response.json()
def _get_spotify_id(self, url_type, id_):
"""Parse a Spotify ID from its URL if necessary.
@ -155,10 +155,9 @@ class SpotifyPlugin(BeetsPlugin):
if spotify_id is None:
return None
response = self._handle_response(
response_data = self._handle_response(
requests.get, self.album_url + spotify_id
)
response_data = response.json()
artist, artist_id = self._get_artist(response_data['artists'])
date_parts = [
@ -244,18 +243,16 @@ class SpotifyPlugin(BeetsPlugin):
if spotify_id is None:
return None
response_track = self._handle_response(
response_data_track = self._handle_response(
requests.get, self.track_url + spotify_id
)
response_data_track = response_track.json()
track = self._get_track(response_data_track)
# get album's tracks to set the track's index/position on
# the entire release
response_album = self._handle_response(
response_data_album = self._handle_response(
requests.get, self.album_url + response_data_track['album']['id']
)
response_data_album = response_album.json()
medium_total = 0
for i, track_data in enumerate(response_data_album['tracks']['items']):
if track_data['disc_number'] == track.medium:
@ -308,12 +305,92 @@ class SpotifyPlugin(BeetsPlugin):
dist.add('source', self.config['source_weight'].as_number())
return dist
def candidates(self, items, artist, album, va_likely):
"""Returns a list of AlbumInfo objects for Spotify Search results
matching an ``album`` and ``artist`` (if not various).
"""
query_filters = {'album': album}
if not va_likely:
query_filters['artist'] = artist
response_data = self._search_spotify(
query_type='album', filters=query_filters
)
return [
self.album_for_id(album_id=album_data['id'])
for album_data in response_data['albums']['items']
]
def item_candidates(self, item, artist, title):
"""Returns a list of TrackInfo objects for Spotify Search results
matching ``title`` and ``artist``.
"""
response_data = self._search_spotify(
query_type='track', keywords=title, filters={'artist': artist}
)
return [
self._get_track(track_data)
for track_data in response_data['tracks']['items']
]
@staticmethod
def _construct_search_query(filters=None, keywords=''):
"""Construct a query string with the specified filters and keywords to
be provided to the Spotify Search API
(https://developer.spotify.com/documentation/web-api/reference/search/search/).
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:type keywords: str
:return: Search query string to be provided to the Search API
(https://developer.spotify.com/documentation/web-api/reference/search/search/#writing-a-query---guidelines)
:rtype: str
"""
query_string = keywords
if filters is not None:
query_string += ' ' + ' '.join(
':'.join((k, v)) for k, v in filters.items()
)
return query_string
def _search_spotify(self, query_type, filters=None, keywords=''):
"""Query the Spotify Search API for the specified ``keywords``, applying
the provided filters.
:param query_type: A comma-separated list of item types to search
across. Valid types are: 'album', 'artist', 'playlist', and
'track'. Search results include hits from all the specified item
types.
:type query_type: str
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:return: JSON data for the class:`Response <Response>` object
:rtype: dict
"""
query = self._construct_search_query(
keywords=keywords, filters=filters
)
if not query:
return None
self._log.debug(u'Searching Spotify for "{}"'.format(query))
response_data = self._handle_response(
requests.get,
self.search_url,
params={'q': query, 'type': query_type},
)
num_results = 0
for result_type_data in response_data.values():
num_results += len(result_type_data['items'])
self._log.debug(u'Found {} results from Spotify'.format(num_results))
return response_data if num_results > 0 else None
def commands(self):
def queries(lib, opts, args):
success = self.parse_opts(opts)
success = self._parse_opts(opts)
if success:
results = self.query_spotify(lib, ui.decargs(args))
self.output_results(results)
results = self._query_spotify(lib, ui.decargs(args))
self._output_results(results)
spotify_cmd = ui.Subcommand(
'spotify', help=u'build a Spotify playlist'
@ -335,7 +412,7 @@ class SpotifyPlugin(BeetsPlugin):
spotify_cmd.func = queries
return [spotify_cmd]
def parse_opts(self, opts):
def _parse_opts(self, opts):
if opts.mode:
self.config['mode'].set(opts.mode)
@ -351,19 +428,19 @@ class SpotifyPlugin(BeetsPlugin):
self.opts = opts
return True
def query_spotify(self, lib, query):
def _query_spotify(self, lib, keywords):
results = []
failures = []
items = lib.items(query)
items = lib.items(keywords)
if not items:
self._log.debug(
u'Your beets query returned no items, skipping Spotify'
u'Your beets query returned no items, skipping Spotify.'
)
return
self._log.info(u'Processing {0} tracks...', len(items))
self._log.info(u'Processing {} tracks...', len(items))
for item in items:
# Apply regex transformations if provided
@ -383,81 +460,69 @@ class SpotifyPlugin(BeetsPlugin):
# Custom values can be passed in the config (just in case)
artist = item[self.config['artist_field'].get()]
album = item[self.config['album_field'].get()]
query = item[self.config['track_field'].get()]
query_keywords = '{} album:{} artist:{}'.format(
query, album, artist
)
keywords = item[self.config['track_field'].get()]
# Query the Web API for each track, look for the items' JSON data
try:
response = self._handle_response(
requests.get,
self.search_url,
params={'q': query_keywords, 'type': 'track'},
query_filters = {'artist': artist, 'album': album}
response_data = self._search_spotify(
query_type='track', keywords=keywords, filters=query_filters
)
if response_data is None:
query = self._construct_search_query(
keywords=keywords, filters=query_filters
)
except ui.UserError:
failures.append(query_keywords)
failures.append(query)
continue
response_data = response.json()['tracks']['items']
response_data_tracks = response_data['tracks']['items']
# Apply market filter if requested
region_filter = self.config['region_filter'].get()
if region_filter:
response_data = [
response_data_tracks = [
x
for x in response_data
for x in response_data_tracks
if region_filter in x['available_markets']
]
# Simplest, take the first result
chosen_result = None
if (
len(response_data) == 1
len(response_data_tracks) == 1
or self.config['tiebreak'].get() == 'first'
):
self._log.debug(
u'Spotify track(s) found, count: {0}', len(response_data)
u'Spotify track(s) found, count: {}',
len(response_data_tracks),
)
chosen_result = response_data[0]
elif len(response_data) > 1:
chosen_result = response_data_tracks[0]
else:
# Use the popularity filter
self._log.debug(
u'Most popular track chosen, count: {0}',
len(response_data),
u'Most popular track chosen, count: {}',
len(response_data_tracks),
)
chosen_result = max(
response_data, key=lambda x: x['popularity']
response_data_tracks, key=lambda x: x['popularity']
)
if chosen_result:
results.append(chosen_result)
else:
self._log.debug(
u'No Spotify track found for the following query: {}',
query_keywords,
)
failures.append(query_keywords)
results.append(chosen_result)
failure_count = len(failures)
if failure_count > 0:
if self.config['show_failures'].get():
self._log.info(
u'{0} track(s) did not match a Spotify ID:', failure_count
u'{} track(s) did not match a Spotify ID:', failure_count
)
for track in failures:
self._log.info(u'track: {0}', track)
self._log.info(u'track: {}', track)
self._log.info(u'')
else:
self._log.warning(
u'{0} track(s) did not match a Spotify ID;\n'
u'{} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count,
)
return results
def output_results(self, results):
def _output_results(self, results):
if results:
ids = [x['id'] for x in results]
if self.config['mode'].get() == "open":

View file

@ -47,19 +47,19 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
)
self.spotify = spotify.SpotifyPlugin()
opts = ArgumentsMock("list", False)
self.spotify.parse_opts(opts)
self.spotify._parse_opts(opts)
def tearDown(self):
self.teardown_beets()
def test_args(self):
opts = ArgumentsMock("fail", True)
self.assertEqual(False, self.spotify.parse_opts(opts))
self.assertEqual(False, self.spotify._parse_opts(opts))
opts = ArgumentsMock("list", False)
self.assertEqual(True, self.spotify.parse_opts(opts))
self.assertEqual(True, self.spotify._parse_opts(opts))
def test_empty_query(self):
self.assertEqual(None, self.spotify.query_spotify(self.lib, u"1=2"))
self.assertEqual(None, self.spotify._query_spotify(self.lib, u"1=2"))
@responses.activate
def test_missing_request(self):
@ -84,7 +84,7 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
length=10,
)
item.add(self.lib)
self.assertEqual([], self.spotify.query_spotify(self.lib, u""))
self.assertEqual([], self.spotify._query_spotify(self.lib, u""))
params = _params(responses.calls[0].request.url)
self.assertEqual(
@ -116,10 +116,10 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
length=10,
)
item.add(self.lib)
results = self.spotify.query_spotify(self.lib, u"Happy")
results = self.spotify._query_spotify(self.lib, u"Happy")
self.assertEqual(1, len(results))
self.assertEqual(u"6NPVjNh8Jhru9xOmyQigds", results[0]['id'])
self.spotify.output_results(results)
self.spotify._output_results(results)
params = _params(responses.calls[0].request.url)
self.assertEqual(