Merge pull request #3123 from rhlahuja/spotify-oauth

Add Spotify OAuth support and autotagger integration
This commit is contained in:
Adrian Sampson 2019-01-21 15:10:38 -05:00
commit 1a14c854b2
3 changed files with 417 additions and 82 deletions

View file

@ -3,54 +3,334 @@
from __future__ import division, absolute_import, print_function
import re
import json
import base64
import webbrowser
import collections
import requests
from beets.plugins import BeetsPlugin
from beets.ui import decargs
from beets import ui
from requests.exceptions import HTTPError
from beets.plugins import BeetsPlugin
from beets.util import confit
from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance
class SpotifyPlugin(BeetsPlugin):
# URL for the Web API of Spotify
# Documentation here: https://developer.spotify.com/web-api/search-item/
base_url = "https://api.spotify.com/v1/search"
open_url = "http://open.spotify.com/track/"
playlist_partial = "spotify:trackset:Playlist:"
# Base URLs for the Spotify API
# Documentation: https://developer.spotify.com/web-api
oauth_token_url = 'https://accounts.spotify.com/api/token'
open_track_url = 'http://open.spotify.com/track/'
search_url = 'https://api.spotify.com/v1/search'
album_url = 'https://api.spotify.com/v1/albums/'
track_url = 'https://api.spotify.com/v1/tracks/'
playlist_partial = 'spotify:trackset:Playlist:'
def __init__(self):
super(SpotifyPlugin, self).__init__()
self.config.add({
'mode': 'list',
'tiebreak': 'popularity',
'show_failures': False,
'artist_field': 'albumartist',
'album_field': 'album',
'track_field': 'title',
'region_filter': None,
'regex': []
})
self.config.add(
{
'mode': 'list',
'tiebreak': 'popularity',
'show_failures': False,
'artist_field': 'albumartist',
'album_field': 'album',
'track_field': 'title',
'region_filter': None,
'regex': [],
'client_id': '4e414367a1d14c75a5c5129a627fcab8',
'client_secret': 'f82bdc09b2254f1a8286815d02fd46dc',
'tokenfile': 'spotify_token.json',
'source_weight': 0.5,
}
)
self.config['client_secret'].redact = True
self.tokenfile = self.config['tokenfile'].get(
confit.Filename(in_app_dir=True)
) # Path to the JSON file for storing the OAuth access token.
self.setup()
def setup(self):
"""Retrieve previously saved OAuth token or generate a new one."""
try:
with open(self.tokenfile) as f:
token_data = json.load(f)
except IOError:
self._authenticate()
else:
self.access_token = token_data['access_token']
def _authenticate(self):
"""Request an access token via the Client Credentials Flow:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow
"""
headers = {
'Authorization': 'Basic {}'.format(
base64.b64encode(
':'.join(
self.config[k].as_str()
for k in ('client_id', 'client_secret')
).encode()
).decode()
)
}
response = requests.post(
self.oauth_token_url,
data={'grant_type': 'client_credentials'},
headers=headers,
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise ui.UserError(
u'Spotify authorization failed: {}\n{}'.format(
e, response.content
)
)
self.access_token = response.json()['access_token']
# Save the token for later use.
self._log.debug(u'Spotify access token: {}', self.access_token)
with open(self.tokenfile, 'w') as f:
json.dump({'access_token': self.access_token}, f)
def _handle_response(self, request_type, url, params=None):
"""Send a request, reauthenticating if necessary.
:param request_type: Type of :class:`Request` constructor,
e.g. ``requests.get``, ``requests.post``, etc.
:type request_type: function
:param url: URL for the new :class:`Request` object.
:type url: str
:param params: (optional) list of tuples or bytes to send
in the query string for the :class:`Request`.
:type params: dict
:return: class:`Response <Response>` object
:rtype: requests.Response
"""
response = request_type(
url,
headers={'Authorization': 'Bearer {}'.format(self.access_token)},
params=params,
)
if response.status_code != 200:
if u'token expired' in response.text:
self._log.debug(
'Spotify access token has expired. Reauthenticating.'
)
self._authenticate()
self._handle_response(request_type, url, params=params)
else:
raise ui.UserError(u'Spotify API error:\n{}', response.text)
return response
def _get_spotify_id(self, url_type, id_):
"""Parse a Spotify ID from its URL if necessary.
:param url_type: Type of Spotify URL, either 'album' or 'track'
:type url_type: str
:param id_: Spotify ID or URL
:type id_: str
:return: Spotify ID
:rtype: str
"""
# Spotify IDs consist of 22 alphanumeric characters
# (zero-left-padded base62 representation of randomly generated UUID4)
id_regex = r'(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})'
self._log.debug(u'Searching for {} {}', url_type, id_)
match = re.search(id_regex.format(url_type), id_)
return match.group(2) if match else None
def album_for_id(self, album_id):
"""Fetches an album by its Spotify ID or URL and returns an
AlbumInfo object or None if the album is not found.
:param album_id: Spotify ID or URL for the album
:type album_id: str
:return: AlbumInfo object for album
:rtype: beets.autotag.hooks.AlbumInfo
"""
spotify_id = self._get_spotify_id('album', album_id)
if spotify_id is None:
return None
response = self._handle_response(
requests.get, self.album_url + spotify_id
)
response_data = response.json()
artist, artist_id = self._get_artist(response_data['artists'])
date_parts = [
int(part) for part in response_data['release_date'].split('-')
]
release_date_precision = response_data['release_date_precision']
if release_date_precision == 'day':
year, month, day = date_parts
elif release_date_precision == 'month':
year, month = date_parts
day = None
elif release_date_precision == 'year':
year = date_parts
month = None
day = None
else:
raise ui.UserError(
u"Invalid `release_date_precision` returned "
u"from Spotify API: '{}'".format(release_date_precision)
)
tracks = []
medium_totals = collections.defaultdict(int)
for i, track_data in enumerate(response_data['tracks']['items']):
track = self._get_track(track_data)
track.index = i + 1
medium_totals[track.medium] += 1
tracks.append(track)
for track in tracks:
track.medium_total = medium_totals[track.medium]
return AlbumInfo(
album=response_data['name'],
album_id=spotify_id,
artist=artist,
artist_id=artist_id,
tracks=tracks,
albumtype=response_data['album_type'],
va=len(response_data['artists']) == 1
and artist.lower() == 'various artists',
year=year,
month=month,
day=day,
label=response_data['label'],
data_source='Spotify',
data_url=response_data['external_urls']['spotify'],
)
def _get_track(self, track_data):
"""Convert a Spotify track object dict to a TrackInfo object.
:param track_data: Simplified track object
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
:type track_data: dict
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
"""
artist, artist_id = self._get_artist(track_data['artists'])
return TrackInfo(
title=track_data['name'],
track_id=track_data['id'],
artist=artist,
artist_id=artist_id,
length=track_data['duration_ms'] / 1000,
index=track_data['track_number'],
medium=track_data['disc_number'],
medium_index=track_data['track_number'],
data_source='Spotify',
data_url=track_data['external_urls']['spotify'],
)
def track_for_id(self, track_id):
"""Fetches a track by its Spotify ID or URL and returns a
TrackInfo object or None if the track is not found.
:param track_id: Spotify ID or URL for the track
:type track_id: str
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
"""
spotify_id = self._get_spotify_id('track', track_id)
if spotify_id is None:
return None
response_track = self._handle_response(
requests.get, self.track_url + spotify_id
)
response_data_track = response_track.json()
track = self._get_track(response_data_track)
# get album's tracks to set the track's index/position on
# the entire release
response_album = self._handle_response(
requests.get, self.album_url + response_data_track['album']['id']
)
response_data_album = response_album.json()
medium_total = 0
for i, track_data in enumerate(response_data_album['tracks']['items']):
if track_data['disc_number'] == track.medium:
medium_total += 1
if track_data['id'] == spotify_id:
track.index = i + 1
track.medium_total = medium_total
return track
@staticmethod
def _get_artist(artists):
"""Returns an artist string (all artists) and an artist_id (the main
artist) for a list of Spotify artist object dicts.
:param artists: Iterable of simplified Spotify artist objects
(https://developer.spotify.com/documentation/web-api/reference/object-model/#artist-object-simplified)
:type artists: list[dict]
:return: Normalized artist string
:rtype: str
"""
artist_id = None
artist_names = []
for artist in artists:
if not artist_id:
artist_id = artist['id']
name = artist['name']
# Strip disambiguation number.
name = re.sub(r' \(\d+\)$', '', name)
# Move articles to the front.
name = re.sub(r'^(.*?), (a|an|the)$', r'\2 \1', name, flags=re.I)
artist_names.append(name)
artist = ', '.join(artist_names).replace(' ,', ',') or None
return artist, artist_id
def album_distance(self, items, album_info, mapping):
"""Returns the Spotify source weight and the maximum source weight
for albums.
"""
dist = Distance()
if album_info.data_source == 'Spotify':
dist.add('source', self.config['source_weight'].as_number())
return dist
def track_distance(self, item, track_info):
"""Returns the Spotify source weight and the maximum source weight
for individual tracks.
"""
dist = Distance()
if track_info.data_source == 'Spotify':
dist.add('source', self.config['source_weight'].as_number())
return dist
def commands(self):
def queries(lib, opts, args):
success = self.parse_opts(opts)
if success:
results = self.query_spotify(lib, decargs(args))
results = self.query_spotify(lib, ui.decargs(args))
self.output_results(results)
spotify_cmd = ui.Subcommand(
'spotify',
help=u'build a Spotify playlist'
'spotify', help=u'build a Spotify playlist'
)
spotify_cmd.parser.add_option(
u'-m', u'--mode', action='store',
u'-m',
u'--mode',
action='store',
help=u'"open" to open Spotify with playlist, '
u'"list" to print (default)'
u'"list" to print (default)',
)
spotify_cmd.parser.add_option(
u'-f', u'--show-failures',
action='store_true', dest='show_failures',
help=u'list tracks that did not match a Spotify ID'
u'-f',
u'--show-failures',
action='store_true',
dest='show_failures',
help=u'list tracks that did not match a Spotify ID',
)
spotify_cmd.func = queries
return [spotify_cmd]
@ -63,35 +343,35 @@ class SpotifyPlugin(BeetsPlugin):
self.config['show_failures'].set(True)
if self.config['mode'].get() not in ['list', 'open']:
self._log.warning(u'{0} is not a valid mode',
self.config['mode'].get())
self._log.warning(
u'{0} is not a valid mode', self.config['mode'].get()
)
return False
self.opts = opts
return True
def query_spotify(self, lib, query):
results = []
failures = []
items = lib.items(query)
if not items:
self._log.debug(u'Your beets query returned no items, '
u'skipping spotify')
self._log.debug(
u'Your beets query returned no items, skipping Spotify'
)
return
self._log.info(u'Processing {0} tracks...', len(items))
for item in items:
# Apply regex transformations if provided
for regex in self.config['regex'].get():
if (
not regex['field'] or
not regex['search'] or
not regex['replace']
not regex['field']
or not regex['search']
or not regex['replace']
):
continue
@ -104,59 +384,76 @@ class SpotifyPlugin(BeetsPlugin):
artist = item[self.config['artist_field'].get()]
album = item[self.config['album_field'].get()]
query = item[self.config['track_field'].get()]
search_url = query + " album:" + album + " artist:" + artist
query_keywords = '{} album:{} artist:{}'.format(
query, album, artist
)
# Query the Web API for each track, look for the items' JSON data
r = requests.get(self.base_url, params={
"q": search_url, "type": "track"
})
self._log.debug('{}', r.url)
try:
r.raise_for_status()
except HTTPError as e:
self._log.debug(u'URL returned a {0} error',
e.response.status_code)
failures.append(search_url)
response = self._handle_response(
requests.get,
self.search_url,
params={'q': query_keywords, 'type': 'track'},
)
except ui.UserError:
failures.append(query_keywords)
continue
r_data = r.json()['tracks']['items']
response_data = response.json()['tracks']['items']
# Apply market filter if requested
region_filter = self.config['region_filter'].get()
if region_filter:
r_data = [x for x in r_data if region_filter
in x['available_markets']]
response_data = [
x
for x in response_data
if region_filter in x['available_markets']
]
# Simplest, take the first result
chosen_result = None
if len(r_data) == 1 or self.config['tiebreak'].get() == "first":
self._log.debug(u'Spotify track(s) found, count: {0}',
len(r_data))
chosen_result = r_data[0]
elif len(r_data) > 1:
if (
len(response_data) == 1
or self.config['tiebreak'].get() == 'first'
):
self._log.debug(
u'Spotify track(s) found, count: {0}', len(response_data)
)
chosen_result = response_data[0]
elif len(response_data) > 1:
# Use the popularity filter
self._log.debug(u'Most popular track chosen, count: {0}',
len(r_data))
chosen_result = max(r_data, key=lambda x: x['popularity'])
self._log.debug(
u'Most popular track chosen, count: {0}',
len(response_data),
)
chosen_result = max(
response_data, key=lambda x: x['popularity']
)
if chosen_result:
results.append(chosen_result)
else:
self._log.debug(u'No spotify track found: {0}', search_url)
failures.append(search_url)
self._log.debug(
u'No Spotify track found for the following query: {}',
query_keywords,
)
failures.append(query_keywords)
failure_count = len(failures)
if failure_count > 0:
if self.config['show_failures'].get():
self._log.info(u'{0} track(s) did not match a Spotify ID:',
failure_count)
self._log.info(
u'{0} track(s) did not match a Spotify ID:', failure_count
)
for track in failures:
self._log.info(u'track: {0}', track)
self._log.info(u'')
else:
self._log.warning(u'{0} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count)
self._log.warning(
u'{0} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count,
)
return results
@ -170,6 +467,6 @@ class SpotifyPlugin(BeetsPlugin):
else:
for item in ids:
print(self.open_url + item)
print(self.open_track_url + item)
else:
self._log.warning(u'No Spotify tracks found from beets query')

View file

@ -1,10 +1,16 @@
Spotify Plugin
==============
The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your library. Using the `Spotify Web API`_, any tracks that can be matched with a Spotify ID are returned, and the results can be either pasted in to a playlist or opened directly in the Spotify app.
The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your
library with the ``beet spotify`` command using the `Spotify Search API`_.
Also, the plugin can use the Spotify `Album`_ and `Track`_ APIs to provide
metadata matches for the importer.
.. _Spotify: https://www.spotify.com/
.. _Spotify Web API: https://developer.spotify.com/web-api/search-item/
.. _Spotify Search API: https://developer.spotify.com/documentation/web-api/reference/search/search/
.. _Album: https://developer.spotify.com/documentation/web-api/reference/albums/get-album/
.. _Track: https://developer.spotify.com/documentation/web-api/reference/tracks/get-track/
Why Use This Plugin?
--------------------
@ -12,10 +18,10 @@ Why Use This Plugin?
* You're a Beets user and Spotify user already.
* You have playlists or albums you'd like to make available in Spotify from Beets without having to search for each artist/album/track.
* You want to check which tracks in your library are available on Spotify.
* You want to autotag music with metadata from the Spotify API.
Basic Usage
-----------
First, enable the ``spotify`` plugin (see :ref:`using-plugins`).
Then, use the ``spotify`` command with a beets query::
@ -37,6 +43,12 @@ Command-line options include:
* ``--show-failures`` or ``-f``: List the tracks that did not match a Spotify
ID.
You can enter the URL for an album or song on Spotify at the ``enter Id``
prompt during import::
Enter search, enter Id, aBort, eDit, edit Candidates, plaY? i
Enter release ID: https://open.spotify.com/album/2rFYTHFBLQN3AYlrymBPPA
Configuration
-------------
@ -67,10 +79,14 @@ in config.yaml under the ``spotify:`` section:
track/album/artist fields before sending them to Spotify. Can be useful for
changing certain abbreviations, like ft. -> feat. See the examples below.
Default: None.
- **source_weight**: Penalty applied to Spotify matches during import. Set to
0.0 to disable.
Default: ``0.5``.
Here's an example::
spotify:
source_weight: 0.7
mode: open
region_filter: US
show_failures: on

View file

@ -29,10 +29,22 @@ def _params(url):
class SpotifyPluginTest(_common.TestCase, TestHelper):
@responses.activate
def setUp(self):
config.clear()
self.setup_beets()
responses.add(
responses.POST,
spotify.SpotifyPlugin.oauth_token_url,
status=200,
json={
'access_token': '3XyiC3raJySbIAV5LVYj1DaWbcocNi3LAJTNXRnYY'
'GVUl6mbbqXNhW3YcZnQgYXNWHFkVGSMlc0tMuvq8CF',
'token_type': 'Bearer',
'expires_in': 3600,
'scope': '',
},
)
self.spotify = spotify.SpotifyPlugin()
opts = ArgumentsMock("list", False)
self.spotify.parse_opts(opts)
@ -51,20 +63,25 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
@responses.activate
def test_missing_request(self):
json_file = os.path.join(_common.RSRC, b'spotify',
b'missing_request.json')
json_file = os.path.join(
_common.RSRC, b'spotify', b'missing_request.json'
)
with open(json_file, 'rb') as f:
response_body = f.read()
responses.add(responses.GET, 'https://api.spotify.com/v1/search',
body=response_body, status=200,
content_type='application/json')
responses.add(
responses.GET,
spotify.SpotifyPlugin.search_url,
body=response_body,
status=200,
content_type='application/json',
)
item = Item(
mb_trackid=u'01234',
album=u'lkajsdflakjsd',
albumartist=u'ujydfsuihse',
title=u'duifhjslkef',
length=10
length=10,
)
item.add(self.lib)
self.assertEqual([], self.spotify.query_spotify(self.lib, u""))
@ -78,21 +95,25 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
@responses.activate
def test_track_request(self):
json_file = os.path.join(_common.RSRC, b'spotify',
b'track_request.json')
json_file = os.path.join(
_common.RSRC, b'spotify', b'track_request.json'
)
with open(json_file, 'rb') as f:
response_body = f.read()
responses.add(responses.GET, 'https://api.spotify.com/v1/search',
body=response_body, status=200,
content_type='application/json')
responses.add(
responses.GET,
spotify.SpotifyPlugin.search_url,
body=response_body,
status=200,
content_type='application/json',
)
item = Item(
mb_trackid=u'01234',
album=u'Despicable Me 2',
albumartist=u'Pharrell Williams',
title=u'Happy',
length=10
length=10,
)
item.add(self.lib)
results = self.spotify.query_spotify(self.lib, u"Happy")
@ -111,5 +132,6 @@ class SpotifyPluginTest(_common.TestCase, TestHelper):
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')