# -*- coding: utf-8 -*- from __future__ import division, absolute_import, print_function import re import json import base64 import webbrowser import requests from beets import ui from beets.plugins import BeetsPlugin from beets.util import confit from beets.autotag.hooks import AlbumInfo, TrackInfo class SpotifyPlugin(BeetsPlugin): # Base URLs for the Spotify API # Documentation: https://developer.spotify.com/web-api oauth_token_url = 'https://accounts.spotify.com/api/token' open_track_url = 'http://open.spotify.com/track/' search_url = 'https://api.spotify.com/v1/search' album_url = 'https://api.spotify.com/v1/albums/' track_url = 'https://api.spotify.com/v1/tracks/' playlist_partial = 'spotify:trackset:Playlist:' id_regex = r'(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})' def __init__(self): super(SpotifyPlugin, self).__init__() self.config.add( { 'mode': 'list', 'tiebreak': 'popularity', 'show_failures': False, 'artist_field': 'albumartist', 'album_field': 'album', 'track_field': 'title', 'region_filter': None, 'regex': [], 'client_id': 'N3dliiOOTBEEFqCH5NDDUmF5Eo8bl7AN', 'client_secret': '6DRS7k66h4643yQEbepPxOuxeVW0yZpk', 'tokenfile': 'spotify_token.json', 'source_weight': 0.5, 'user_token': '', } ) self.config['client_secret'].redact = True """Path to the JSON file for storing the OAuth access token.""" self.tokenfile = self.config['tokenfile'].get( confit.Filename(in_app_dir=True) ) self.setup() def setup(self): """ Retrieve previously saved OAuth token or generate a new one. """ try: with open(self.tokenfile) as f: token_data = json.load(f) except IOError: self._authenticate() else: self.access_token = token_data['access_token'] def _authenticate(self): """ Request an access token via the Client Credentials Flow: https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow """ headers = { 'Authorization': 'Basic {}'.format( base64.b64encode( ':'.join( self.config[k].as_str() for k in ('client_id', 'client_secret') ).encode() ).decode() ) } response = requests.post( self.oauth_token_url, data={'grant_type': 'client_credentials'}, headers=headers, ) try: response.raise_for_status() except requests.exceptions.HTTPError as e: raise ui.UserError( u'Spotify authorization failed: {}\n{}'.format( e, response.content ) ) self.access_token = response.json()['access_token'] # Save the token for later use. self._log.debug(u'Spotify access token: {}', self.access_token) with open(self.tokenfile, 'w') as f: json.dump({'access_token': self.access_token}, f) @property def _auth_header(self): return {'Authorization': 'Bearer {}'.format(self.access_token)} def _handle_response(self, request_type, url, params=None): response = request_type(url, headers=self._auth_header, params=params) if response.status_code != 200: if u'token expired' in response.text: self._log.debug( 'Spotify access token has expired. Reauthenticating.' ) self._authenticate() self._handle_response(request_type, url, params=params) else: raise ui.UserError(u'Spotify API error:\n{}', response.text) return response def _get_spotify_id(self, url_type, id_): """ Parse a Spotify ID from its URL if necessary. :param url_type: Type of Spotify URL, either 'album' or 'track' :type url_type: str :return: Spotify ID :rtype: str """ self._log.debug(u'Searching for {} {}', url_type, id_) match = re.search(self.id_regex.format(url_type), id_) return match.group(2) if match else None def album_for_id(self, album_id): """ Fetches an album by its Spotify ID or URL and returns an AlbumInfo object or None if the album is not found. """ spotify_id = self._get_spotify_id('album', album_id) if spotify_id is None: return None response = self._handle_response( requests.get, self.album_url + spotify_id ) response_data = response.json() artist, artist_id = self._get_artist(response_data['artists']) date_parts = [ int(part) for part in response_data['release_date'].split('-') ] release_date_precision = response_data['release_date_precision'] if release_date_precision == 'day': year, month, day = date_parts elif release_date_precision == 'month': year, month = date_parts day = None elif release_date_precision == 'year': year = date_parts month = None day = None else: raise ui.UserError( u"Invalid `release_date_precision` returned " u"from Spotify API: '{}'".format(release_date_precision) ) tracks = [] for i, track_data in enumerate(response_data['tracks']['items']): track = self._get_track(track_data) track.index = i + 1 tracks.append(track) return AlbumInfo( album=response_data['name'], album_id=album_id, artist=artist, artist_id=artist_id, tracks=tracks, albumtype=response_data['album_type'], va=len(response_data['artists']) == 1 and artist.lower() == 'various artists', year=year, month=month, day=day, label=response_data['label'], data_source='Spotify', data_url=response_data['uri'], ) def _get_track(self, track_data): """ Convert a Spotify track object dict to a TrackInfo object. :param track_data: Simplified track object (https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified) :type track_data: dict :return: TrackInfo object for track :rtype: beets.autotag.hooks.TrackInfo """ artist, artist_id = self._get_artist(track_data['artists']) return TrackInfo( title=track_data['name'], track_id=track_data['id'], artist=artist, artist_id=artist_id, length=track_data['duration_ms'] / 1000, index=track_data['track_number'], medium_index=track_data['track_number'], data_source='Spotify', data_url=track_data['uri'], ) def track_for_id(self, track_id): """ Fetches a track by its Spotify ID or URL and returns a TrackInfo object or None if the track is not found. :param track_id: Spotify ID or URL for the track :type track_id: str :return: TrackInfo object for track :rtype: beets.autotag.hooks.TrackInfo """ spotify_id = self._get_spotify_id('track', track_id) if spotify_id is None: return None response = self._handle_response( requests.get, self.track_url + spotify_id ) return self._get_track(response.json()) def _get_artist(self, artists): """ Returns an artist string (all artists) and an artist_id (the main artist) for a list of Spotify artist object dicts. :param artists: Iterable of simplified Spotify artist objects (https://developer.spotify.com/documentation/web-api/reference/object-model/#artist-object-simplified) :type artists: list[dict] :return: Normalized artist string :rtype: str """ artist_id = None artist_names = [] for artist in artists: if not artist_id: artist_id = artist['id'] name = artist['name'] # Strip disambiguation number. name = re.sub(r' \(\d+\)$', '', name) # Move articles to the front. name = re.sub(r'^(.*?), (a|an|the)$', r'\2 \1', name, flags=re.I) artist_names.append(name) artist = ', '.join(artist_names).replace(' ,', ',') or None return artist, artist_id def commands(self): def queries(lib, opts, args): success = self.parse_opts(opts) if success: results = self.query_spotify(lib, ui.decargs(args)) self.output_results(results) spotify_cmd = ui.Subcommand( 'spotify', help=u'build a Spotify playlist' ) spotify_cmd.parser.add_option( u'-m', u'--mode', action='store', help=u'"open" to open Spotify with playlist, ' u'"list" to print (default)', ) spotify_cmd.parser.add_option( u'-f', u'--show-failures', action='store_true', dest='show_failures', help=u'list tracks that did not match a Spotify ID', ) spotify_cmd.func = queries return [spotify_cmd] def parse_opts(self, opts): if opts.mode: self.config['mode'].set(opts.mode) if opts.show_failures: self.config['show_failures'].set(True) if self.config['mode'].get() not in ['list', 'open']: self._log.warning( u'{0} is not a valid mode', self.config['mode'].get() ) return False self.opts = opts return True def query_spotify(self, lib, query): results = [] failures = [] items = lib.items(query) if not items: self._log.debug( u'Your beets query returned no items, ' u'skipping spotify' ) return self._log.info(u'Processing {0} tracks...', len(items)) for item in items: # Apply regex transformations if provided for regex in self.config['regex'].get(): if ( not regex['field'] or not regex['search'] or not regex['replace'] ): continue value = item[regex['field']] item[regex['field']] = re.sub( regex['search'], regex['replace'], value ) # Custom values can be passed in the config (just in case) artist = item[self.config['artist_field'].get()] album = item[self.config['album_field'].get()] query = item[self.config['track_field'].get()] query_keywords = '{} album:{} artist:{}'.format( query, album, artist ) # Query the Web API for each track, look for the items' JSON data try: response = self._handle_response( requests.get, self.search_url, params={'q': query_keywords, 'type': 'track'}, ) except ui.UserError: failures.append(query_keywords) continue response_data = response.json()['tracks']['items'] # Apply market filter if requested region_filter = self.config['region_filter'].get() if region_filter: response_data = [ x for x in response_data if region_filter in x['available_markets'] ] # Simplest, take the first result chosen_result = None if ( len(response_data) == 1 or self.config['tiebreak'].get() == 'first' ): self._log.debug( u'Spotify track(s) found, count: {0}', len(response_data) ) chosen_result = response_data[0] elif len(response_data) > 1: # Use the popularity filter self._log.debug( u'Most popular track chosen, count: {0}', len(response_data), ) chosen_result = max( response_data, key=lambda x: x['popularity'] ) if chosen_result: results.append(chosen_result) else: self._log.debug( u'No Spotify track found for the following query: {}', query_keywords, ) failures.append(query_keywords) failure_count = len(failures) if failure_count > 0: if self.config['show_failures'].get(): self._log.info( u'{0} track(s) did not match a Spotify ID:', failure_count ) for track in failures: self._log.info(u'track: {0}', track) self._log.info(u'') else: self._log.warning( u'{0} track(s) did not match a Spotify ID;\n' u'use --show-failures to display', failure_count, ) return results def output_results(self, results): if results: ids = [x['id'] for x in results] if self.config['mode'].get() == "open": self._log.info(u'Attempting to open Spotify with playlist') spotify_url = self.playlist_partial + ",".join(ids) webbrowser.open(spotify_url) else: for item in ids: print(self.open_track_url + item) else: self._log.warning(u'No Spotify tracks found from beets query')