beets/beetsplug/spotify.py
2019-01-20 02:20:10 -08:00

421 lines
14 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import division, absolute_import, print_function
import re
import json
import base64
import webbrowser
import requests
from beets import ui
from beets.plugins import BeetsPlugin
from beets.util import confit
from beets.autotag.hooks import AlbumInfo, TrackInfo
class SpotifyPlugin(BeetsPlugin):
# Base URLs for the Spotify API
# Documentation: https://developer.spotify.com/web-api
oauth_token_url = 'https://accounts.spotify.com/api/token'
open_track_url = 'http://open.spotify.com/track/'
search_url = 'https://api.spotify.com/v1/search'
album_url = 'https://api.spotify.com/v1/albums/'
track_url = 'https://api.spotify.com/v1/tracks/'
playlist_partial = 'spotify:trackset:Playlist:'
def __init__(self):
super(SpotifyPlugin, self).__init__()
self.config.add(
{
'mode': 'list',
'tiebreak': 'popularity',
'show_failures': False,
'artist_field': 'albumartist',
'album_field': 'album',
'track_field': 'title',
'region_filter': None,
'regex': [],
'client_id': 'N3dliiOOTBEEFqCH5NDDUmF5Eo8bl7AN',
'client_secret': '6DRS7k66h4643yQEbepPxOuxeVW0yZpk',
'tokenfile': 'spotify_token.json',
'source_weight': 0.5,
'user_token': '',
}
)
self.config['client_secret'].redact = True
"""Path to the JSON file for storing the OAuth access token."""
self.tokenfile = self.config['tokenfile'].get(
confit.Filename(in_app_dir=True)
)
self.setup()
def setup(self):
"""
Retrieve previously saved OAuth token or generate a new one.
"""
try:
with open(self.tokenfile) as f:
token_data = json.load(f)
except IOError:
self._authenticate()
else:
self.access_token = token_data['access_token']
def _authenticate(self):
"""
Request an access token via the Client Credentials Flow:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow
"""
headers = {
'Authorization': 'Basic {}'.format(
base64.b64encode(
':'.join(
self.config[k].as_str()
for k in ('client_id', 'client_secret')
).encode()
).decode()
)
}
response = requests.post(
self.oauth_token_url,
data={'grant_type': 'client_credentials'},
headers=headers,
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise ui.UserError(
u'Spotify authorization failed: {}\n{}'.format(
e, response.content
)
)
self.access_token = response.json()['access_token']
# Save the token for later use.
self._log.debug(u'Spotify access token: {}', self.access_token)
with open(self.tokenfile, 'w') as f:
json.dump({'access_token': self.access_token}, f)
@property
def _auth_header(self):
return {'Authorization': 'Bearer {}'.format(self.access_token)}
def _handle_response(self, request_type, url, params=None):
response = request_type(url, headers=self._auth_header, params=params)
if response.status_code != 200:
if u'token expired' in response.text:
self._log.debug(
'Spotify access token has expired. Reauthenticating.'
)
self._authenticate()
self._handle_response(request_type, url, params=params)
else:
raise ui.UserError(u'Spotify API error:\n{}', response.text)
return response
def _get_spotify_id(self, url_type, id_):
"""
Parse a Spotify ID from its URL if necessary.
:param url_type: Type of Spotify URL, either 'album' or 'track'
:type url_type: str
:return: Spotify ID
:rtype: str
"""
# Spotify IDs consist of 22 alphanumeric characters
# (zero-left-padded base62 representation of randomly generated UUID4)
id_regex = r'(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})'
self._log.debug(u'Searching for {} {}', url_type, id_)
match = re.search(id_regex.format(url_type), id_)
return match.group(2) if match else None
def album_for_id(self, album_id):
"""
Fetches an album by its Spotify ID or URL and returns an
AlbumInfo object or None if the album is not found.
"""
spotify_id = self._get_spotify_id('album', album_id)
if spotify_id is None:
return None
response = self._handle_response(
requests.get, self.album_url + spotify_id
)
response_data = response.json()
artist, artist_id = self._get_artist(response_data['artists'])
date_parts = [
int(part) for part in response_data['release_date'].split('-')
]
release_date_precision = response_data['release_date_precision']
if release_date_precision == 'day':
year, month, day = date_parts
elif release_date_precision == 'month':
year, month = date_parts
day = None
elif release_date_precision == 'year':
year = date_parts
month = None
day = None
else:
raise ui.UserError(
u"Invalid `release_date_precision` returned "
u"from Spotify API: '{}'".format(release_date_precision)
)
tracks = []
for i, track_data in enumerate(response_data['tracks']['items']):
track = self._get_track(track_data)
track.index = i + 1
tracks.append(track)
return AlbumInfo(
album=response_data['name'],
album_id=album_id,
artist=artist,
artist_id=artist_id,
tracks=tracks,
albumtype=response_data['album_type'],
va=len(response_data['artists']) == 1
and artist.lower() == 'various artists',
year=year,
month=month,
day=day,
label=response_data['label'],
data_source='Spotify',
data_url=response_data['uri'],
)
def _get_track(self, track_data):
"""
Convert a Spotify track object dict to a TrackInfo object.
:param track_data: Simplified track object
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
:type track_data: dict
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
"""
artist, artist_id = self._get_artist(track_data['artists'])
return TrackInfo(
title=track_data['name'],
track_id=track_data['id'],
artist=artist,
artist_id=artist_id,
length=track_data['duration_ms'] / 1000,
index=track_data['track_number'],
medium_index=track_data['track_number'],
data_source='Spotify',
data_url=track_data['uri'],
)
def track_for_id(self, track_id):
"""
Fetches a track by its Spotify ID or URL and returns a
TrackInfo object or None if the track is not found.
:param track_id: Spotify ID or URL for the track
:type track_id: str
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
"""
spotify_id = self._get_spotify_id('track', track_id)
if spotify_id is None:
return None
response = self._handle_response(
requests.get, self.track_url + spotify_id
)
return self._get_track(response.json())
def _get_artist(self, artists):
"""
Returns an artist string (all artists) and an artist_id (the main
artist) for a list of Spotify artist object dicts.
:param artists: Iterable of simplified Spotify artist objects
(https://developer.spotify.com/documentation/web-api/reference/object-model/#artist-object-simplified)
:type artists: list[dict]
:return: Normalized artist string
:rtype: str
"""
artist_id = None
artist_names = []
for artist in artists:
if not artist_id:
artist_id = artist['id']
name = artist['name']
# Strip disambiguation number.
name = re.sub(r' \(\d+\)$', '', name)
# Move articles to the front.
name = re.sub(r'^(.*?), (a|an|the)$', r'\2 \1', name, flags=re.I)
artist_names.append(name)
artist = ', '.join(artist_names).replace(' ,', ',') or None
return artist, artist_id
def commands(self):
def queries(lib, opts, args):
success = self.parse_opts(opts)
if success:
results = self.query_spotify(lib, ui.decargs(args))
self.output_results(results)
spotify_cmd = ui.Subcommand(
'spotify', help=u'build a Spotify playlist'
)
spotify_cmd.parser.add_option(
u'-m',
u'--mode',
action='store',
help=u'"open" to open Spotify with playlist, '
u'"list" to print (default)',
)
spotify_cmd.parser.add_option(
u'-f',
u'--show-failures',
action='store_true',
dest='show_failures',
help=u'list tracks that did not match a Spotify ID',
)
spotify_cmd.func = queries
return [spotify_cmd]
def parse_opts(self, opts):
if opts.mode:
self.config['mode'].set(opts.mode)
if opts.show_failures:
self.config['show_failures'].set(True)
if self.config['mode'].get() not in ['list', 'open']:
self._log.warning(
u'{0} is not a valid mode', self.config['mode'].get()
)
return False
self.opts = opts
return True
def query_spotify(self, lib, query):
results = []
failures = []
items = lib.items(query)
if not items:
self._log.debug(
u'Your beets query returned no items, ' u'skipping spotify'
)
return
self._log.info(u'Processing {0} tracks...', len(items))
for item in items:
# Apply regex transformations if provided
for regex in self.config['regex'].get():
if (
not regex['field']
or not regex['search']
or not regex['replace']
):
continue
value = item[regex['field']]
item[regex['field']] = re.sub(
regex['search'], regex['replace'], value
)
# Custom values can be passed in the config (just in case)
artist = item[self.config['artist_field'].get()]
album = item[self.config['album_field'].get()]
query = item[self.config['track_field'].get()]
query_keywords = '{} album:{} artist:{}'.format(
query, album, artist
)
# Query the Web API for each track, look for the items' JSON data
try:
response = self._handle_response(
requests.get,
self.search_url,
params={'q': query_keywords, 'type': 'track'},
)
except ui.UserError:
failures.append(query_keywords)
continue
response_data = response.json()['tracks']['items']
# Apply market filter if requested
region_filter = self.config['region_filter'].get()
if region_filter:
response_data = [
x
for x in response_data
if region_filter in x['available_markets']
]
# Simplest, take the first result
chosen_result = None
if (
len(response_data) == 1
or self.config['tiebreak'].get() == 'first'
):
self._log.debug(
u'Spotify track(s) found, count: {0}', len(response_data)
)
chosen_result = response_data[0]
elif len(response_data) > 1:
# Use the popularity filter
self._log.debug(
u'Most popular track chosen, count: {0}',
len(response_data),
)
chosen_result = max(
response_data, key=lambda x: x['popularity']
)
if chosen_result:
results.append(chosen_result)
else:
self._log.debug(
u'No Spotify track found for the following query: {}',
query_keywords,
)
failures.append(query_keywords)
failure_count = len(failures)
if failure_count > 0:
if self.config['show_failures'].get():
self._log.info(
u'{0} track(s) did not match a Spotify ID:', failure_count
)
for track in failures:
self._log.info(u'track: {0}', track)
self._log.info(u'')
else:
self._log.warning(
u'{0} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count,
)
return results
def output_results(self, results):
if results:
ids = [x['id'] for x in results]
if self.config['mode'].get() == "open":
self._log.info(u'Attempting to open Spotify with playlist')
spotify_url = self.playlist_partial + ",".join(ids)
webbrowser.open(spotify_url)
else:
for item in ids:
print(self.open_track_url + item)
else:
self._log.warning(u'No Spotify tracks found from beets query')