mirror of
https://github.com/beetbox/beets.git
synced 2025-12-06 08:39:17 +01:00
794 lines
27 KiB
Python
794 lines
27 KiB
Python
# This file is part of beets.
|
|
# Copyright 2019, Rahul Ahuja.
|
|
# Copyright 2022, Alok Saboo.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
|
|
"""Adds Spotify release and track search support to the autotagger.
|
|
|
|
Also includes Spotify playlist construction.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import collections
|
|
import json
|
|
import re
|
|
import threading
|
|
import time
|
|
import webbrowser
|
|
from typing import TYPE_CHECKING, Any, Literal, Sequence, Union
|
|
|
|
import confuse
|
|
import requests
|
|
|
|
from beets import ui
|
|
from beets.autotag.hooks import AlbumInfo, TrackInfo
|
|
from beets.dbcore import types
|
|
from beets.library import Library
|
|
from beets.metadata_plugins import (
|
|
IDResponse,
|
|
SearchApiMetadataSourcePlugin,
|
|
SearchFilter,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from beets.library import Library
|
|
from beetsplug._typing import JSONDict
|
|
|
|
DEFAULT_WAITING_TIME = 5
|
|
|
|
|
|
class SearchResponseAlbums(IDResponse):
|
|
"""A response returned by the Spotify API.
|
|
|
|
We only use items and disregard the pagination information. i.e.
|
|
res["albums"]["items"][0].
|
|
|
|
There are more fields in the response, but we only type the ones we
|
|
currently use.
|
|
|
|
see https://developer.spotify.com/documentation/web-api/reference/search
|
|
|
|
"""
|
|
|
|
album_type: str
|
|
available_markets: Sequence[str]
|
|
name: str
|
|
|
|
|
|
class SearchResponseTracks(IDResponse):
|
|
"""A track response returned by the Spotify API."""
|
|
|
|
album: SearchResponseAlbums
|
|
available_markets: Sequence[str]
|
|
popularity: int
|
|
name: str
|
|
|
|
|
|
class APIError(Exception):
|
|
pass
|
|
|
|
|
|
class AudioFeaturesUnavailableError(Exception):
|
|
"""Raised when audio features API returns 403 (deprecated)."""
|
|
|
|
pass
|
|
|
|
|
|
class SpotifyPlugin(
|
|
SearchApiMetadataSourcePlugin[
|
|
Union[SearchResponseAlbums, SearchResponseTracks]
|
|
]
|
|
):
|
|
item_types = {
|
|
"spotify_track_popularity": types.INTEGER,
|
|
"spotify_acousticness": types.FLOAT,
|
|
"spotify_danceability": types.FLOAT,
|
|
"spotify_energy": types.FLOAT,
|
|
"spotify_instrumentalness": types.FLOAT,
|
|
"spotify_key": types.FLOAT,
|
|
"spotify_liveness": types.FLOAT,
|
|
"spotify_loudness": types.FLOAT,
|
|
"spotify_mode": types.INTEGER,
|
|
"spotify_speechiness": types.FLOAT,
|
|
"spotify_tempo": types.FLOAT,
|
|
"spotify_time_signature": types.INTEGER,
|
|
"spotify_valence": types.FLOAT,
|
|
"spotify_updated": types.DATE,
|
|
}
|
|
|
|
# Base URLs for the Spotify API
|
|
# Documentation: https://developer.spotify.com/web-api
|
|
oauth_token_url = "https://accounts.spotify.com/api/token"
|
|
open_track_url = "https://open.spotify.com/track/"
|
|
search_url = "https://api.spotify.com/v1/search"
|
|
album_url = "https://api.spotify.com/v1/albums/"
|
|
track_url = "https://api.spotify.com/v1/tracks/"
|
|
audio_features_url = "https://api.spotify.com/v1/audio-features/"
|
|
|
|
spotify_audio_features = {
|
|
"acousticness": "spotify_acousticness",
|
|
"danceability": "spotify_danceability",
|
|
"energy": "spotify_energy",
|
|
"instrumentalness": "spotify_instrumentalness",
|
|
"key": "spotify_key",
|
|
"liveness": "spotify_liveness",
|
|
"loudness": "spotify_loudness",
|
|
"mode": "spotify_mode",
|
|
"speechiness": "spotify_speechiness",
|
|
"tempo": "spotify_tempo",
|
|
"time_signature": "spotify_time_signature",
|
|
"valence": "spotify_valence",
|
|
}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.config.add(
|
|
{
|
|
"mode": "list",
|
|
"tiebreak": "popularity",
|
|
"show_failures": False,
|
|
"region_filter": None,
|
|
"regex": [],
|
|
"client_id": "4e414367a1d14c75a5c5129a627fcab8",
|
|
"client_secret": "f82bdc09b2254f1a8286815d02fd46dc",
|
|
"tokenfile": "spotify_token.json",
|
|
}
|
|
)
|
|
self.config["client_id"].redact = True
|
|
self.config["client_secret"].redact = True
|
|
|
|
self.audio_features_available = (
|
|
True # Track if audio features API is available
|
|
)
|
|
self._audio_features_lock = (
|
|
threading.Lock()
|
|
) # Protects audio_features_available
|
|
self.setup()
|
|
|
|
def setup(self):
|
|
"""Retrieve previously saved OAuth token or generate a new one."""
|
|
|
|
try:
|
|
with open(self._tokenfile()) as f:
|
|
token_data = json.load(f)
|
|
except OSError:
|
|
self._authenticate()
|
|
else:
|
|
self.access_token = token_data["access_token"]
|
|
|
|
def _tokenfile(self) -> str:
|
|
"""Get the path to the JSON file for storing the OAuth token."""
|
|
return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True))
|
|
|
|
def _authenticate(self) -> None:
|
|
"""Request an access token via the Client Credentials Flow: https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow"""
|
|
c_id: str = self.config["client_id"].as_str()
|
|
c_secret: str = self.config["client_secret"].as_str()
|
|
|
|
headers = {
|
|
"Authorization": (
|
|
"Basic"
|
|
f" {base64.b64encode(f'{c_id}:{c_secret}'.encode()).decode()}"
|
|
)
|
|
}
|
|
response = requests.post(
|
|
self.oauth_token_url,
|
|
data={"grant_type": "client_credentials"},
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as e:
|
|
raise ui.UserError(
|
|
f"Spotify authorization failed: {e}\n{response.text}"
|
|
)
|
|
self.access_token = response.json()["access_token"]
|
|
|
|
# Save the token for later use.
|
|
self._log.debug("{0.data_source} access token: {0.access_token}", self)
|
|
with open(self._tokenfile(), "w") as f:
|
|
json.dump({"access_token": self.access_token}, f)
|
|
|
|
def _handle_response(
|
|
self,
|
|
method: Literal["get", "post", "put", "delete"],
|
|
url: str,
|
|
params: Any = None,
|
|
retry_count: int = 0,
|
|
max_retries: int = 3,
|
|
) -> JSONDict:
|
|
"""Send a request, reauthenticating if necessary.
|
|
|
|
:param method: HTTP method to use for the request.
|
|
:param url: URL for the new :class:`Request` object.
|
|
:param dict params: (optional) list of tuples or bytes to send
|
|
in the query string for the :class:`Request`.
|
|
|
|
"""
|
|
|
|
if retry_count > max_retries:
|
|
raise APIError("Maximum retries reached.")
|
|
|
|
try:
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
|
params=params,
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.ReadTimeout:
|
|
self._log.error("ReadTimeout.")
|
|
raise APIError("Request timed out.")
|
|
except requests.exceptions.ConnectionError as e:
|
|
self._log.error("Network error: {}", e)
|
|
raise APIError("Network error.")
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is None:
|
|
self._log.error("Request failed: {}", e)
|
|
raise APIError("Request failed.")
|
|
if e.response.status_code == 401:
|
|
self._log.debug(
|
|
"{.data_source} access token has expired. Reauthenticating.",
|
|
self,
|
|
)
|
|
self._authenticate()
|
|
return self._handle_response(
|
|
method,
|
|
url,
|
|
params=params,
|
|
retry_count=retry_count + 1,
|
|
)
|
|
elif e.response.status_code == 404:
|
|
raise APIError(
|
|
f"API Error: {e.response.status_code}\n"
|
|
f"URL: {url}\nparams: {params}"
|
|
)
|
|
elif e.response.status_code == 403:
|
|
# Check if this is the audio features endpoint
|
|
if url.startswith(self.audio_features_url):
|
|
raise AudioFeaturesUnavailableError(
|
|
"Audio features API returned 403 "
|
|
"(deprecated or unavailable)"
|
|
)
|
|
raise APIError(
|
|
f"API Error: {e.response.status_code}\n"
|
|
f"URL: {url}\nparams: {params}"
|
|
)
|
|
elif e.response.status_code == 429:
|
|
seconds = e.response.headers.get(
|
|
"Retry-After", DEFAULT_WAITING_TIME
|
|
)
|
|
self._log.debug(
|
|
"Too many API requests. Retrying after {} seconds.", seconds
|
|
)
|
|
time.sleep(int(seconds) + 1)
|
|
return self._handle_response(
|
|
method,
|
|
url,
|
|
params=params,
|
|
retry_count=retry_count + 1,
|
|
)
|
|
elif e.response.status_code == 503:
|
|
self._log.error("Service Unavailable.")
|
|
raise APIError("Service Unavailable.")
|
|
elif e.response.status_code == 502:
|
|
self._log.error("Bad Gateway.")
|
|
raise APIError("Bad Gateway.")
|
|
elif e.response is not None:
|
|
raise APIError(
|
|
f"{self.data_source} API error:\n"
|
|
f"{e.response.text}\n"
|
|
f"URL:\n{url}\nparams:\n{params}"
|
|
)
|
|
else:
|
|
self._log.error("Request failed. Error: {}", e)
|
|
raise APIError("Request failed.")
|
|
|
|
def album_for_id(self, album_id: str) -> AlbumInfo | None:
|
|
"""Fetch an album by its Spotify ID or URL and return an
|
|
AlbumInfo object or None if the album is not found.
|
|
|
|
:param str album_id: Spotify ID or URL for the album
|
|
|
|
:returns: AlbumInfo object for album
|
|
:rtype: beets.autotag.hooks.AlbumInfo or None
|
|
|
|
"""
|
|
if not (spotify_id := self._extract_id(album_id)):
|
|
return None
|
|
|
|
album_data = self._handle_response(
|
|
"get", f"{self.album_url}{spotify_id}"
|
|
)
|
|
if album_data["name"] == "":
|
|
self._log.debug("Album removed from Spotify: {}", album_id)
|
|
return None
|
|
artist, artist_id = self.get_artist(album_data["artists"])
|
|
|
|
date_parts = [
|
|
int(part) for part in album_data["release_date"].split("-")
|
|
]
|
|
|
|
release_date_precision = album_data["release_date_precision"]
|
|
if release_date_precision == "day":
|
|
year, month, day = date_parts
|
|
elif release_date_precision == "month":
|
|
year, month = date_parts
|
|
day = None
|
|
elif release_date_precision == "year":
|
|
year = date_parts[0]
|
|
month = None
|
|
day = None
|
|
else:
|
|
raise ui.UserError(
|
|
"Invalid `release_date_precision` returned "
|
|
f"by {self.data_source} API: '{release_date_precision}'"
|
|
)
|
|
|
|
tracks_data = album_data["tracks"]
|
|
tracks_items = tracks_data["items"]
|
|
while tracks_data["next"]:
|
|
tracks_data = self._handle_response("get", tracks_data["next"])
|
|
tracks_items.extend(tracks_data["items"])
|
|
|
|
tracks = []
|
|
medium_totals: dict[int | None, int] = collections.defaultdict(int)
|
|
for i, track_data in enumerate(tracks_items, start=1):
|
|
track = self._get_track(track_data)
|
|
track.index = i
|
|
medium_totals[track.medium] += 1
|
|
tracks.append(track)
|
|
for track in tracks:
|
|
track.medium_total = medium_totals[track.medium]
|
|
|
|
return AlbumInfo(
|
|
album=album_data["name"],
|
|
album_id=spotify_id,
|
|
spotify_album_id=spotify_id,
|
|
artist=artist,
|
|
artist_id=artist_id,
|
|
spotify_artist_id=artist_id,
|
|
tracks=tracks,
|
|
albumtype=album_data["album_type"],
|
|
va=len(album_data["artists"]) == 1
|
|
and artist.lower() == "various artists",
|
|
year=year,
|
|
month=month,
|
|
day=day,
|
|
label=album_data["label"],
|
|
mediums=max(filter(None, medium_totals.keys())),
|
|
data_source=self.data_source,
|
|
data_url=album_data["external_urls"]["spotify"],
|
|
)
|
|
|
|
def _get_track(self, track_data: JSONDict) -> TrackInfo:
|
|
"""Convert a Spotify track object dict to a TrackInfo object.
|
|
|
|
:param track_data: Simplified track object
|
|
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
|
|
|
|
:returns: TrackInfo object for track
|
|
|
|
"""
|
|
artist, artist_id = self.get_artist(track_data["artists"])
|
|
|
|
# Get album information for spotify tracks
|
|
try:
|
|
album = track_data["album"]["name"]
|
|
except (KeyError, TypeError):
|
|
album = None
|
|
return TrackInfo(
|
|
title=track_data["name"],
|
|
track_id=track_data["id"],
|
|
spotify_track_id=track_data["id"],
|
|
artist=artist,
|
|
album=album,
|
|
artist_id=artist_id,
|
|
spotify_artist_id=artist_id,
|
|
length=track_data["duration_ms"] / 1000,
|
|
index=track_data["track_number"],
|
|
medium=track_data["disc_number"],
|
|
medium_index=track_data["track_number"],
|
|
data_source=self.data_source,
|
|
data_url=track_data["external_urls"]["spotify"],
|
|
)
|
|
|
|
def track_for_id(self, track_id: str) -> None | TrackInfo:
|
|
"""Fetch a track by its Spotify ID or URL.
|
|
|
|
Returns a TrackInfo object or None if the track is not found.
|
|
|
|
"""
|
|
|
|
if not (spotify_id := self._extract_id(track_id)):
|
|
self._log.debug("Invalid Spotify ID: {}", track_id)
|
|
return None
|
|
|
|
if not (
|
|
track_data := self._handle_response(
|
|
"get", f"{self.track_url}{spotify_id}"
|
|
)
|
|
):
|
|
self._log.debug("Track not found: {}", track_id)
|
|
return None
|
|
|
|
track = self._get_track(track_data)
|
|
|
|
# Get album's tracks to set `track.index` (position on the entire
|
|
# release) and `track.medium_total` (total number of tracks on
|
|
# the track's disc).
|
|
album_data = self._handle_response(
|
|
"get", f"{self.album_url}{track_data['album']['id']}"
|
|
)
|
|
medium_total = 0
|
|
for i, track_data in enumerate(album_data["tracks"]["items"], start=1):
|
|
if track_data["disc_number"] == track.medium:
|
|
medium_total += 1
|
|
if track_data["id"] == track.track_id:
|
|
track.index = i
|
|
track.medium_total = medium_total
|
|
return track
|
|
|
|
def _search_api(
|
|
self,
|
|
query_type: Literal["album", "track"],
|
|
filters: SearchFilter,
|
|
query_string: str = "",
|
|
) -> Sequence[SearchResponseAlbums | SearchResponseTracks]:
|
|
"""Query the Spotify Search API for the specified ``query_string``,
|
|
applying the provided ``filters``.
|
|
|
|
:param query_type: Item type to search across. Valid types are: 'album',
|
|
'artist', 'playlist', and 'track'.
|
|
:param filters: Field filters to apply.
|
|
:param query_string: Additional query to include in the search.
|
|
|
|
"""
|
|
query = self._construct_search_query(
|
|
filters=filters, query_string=query_string
|
|
)
|
|
|
|
self._log.debug("Searching {.data_source} for '{}'", self, query)
|
|
try:
|
|
response = self._handle_response(
|
|
"get",
|
|
self.search_url,
|
|
params={
|
|
"q": query,
|
|
"type": query_type,
|
|
"limit": self.config["search_limit"].get(),
|
|
},
|
|
)
|
|
except APIError as e:
|
|
self._log.debug("Spotify API error: {}", e)
|
|
return ()
|
|
response_data = response.get(f"{query_type}s", {}).get("items", [])
|
|
self._log.debug(
|
|
"Found {} result(s) from {.data_source} for '{}'",
|
|
len(response_data),
|
|
self,
|
|
query,
|
|
)
|
|
return response_data
|
|
|
|
def commands(self) -> list[ui.Subcommand]:
|
|
# autotagger import command
|
|
def queries(lib, opts, args):
|
|
success = self._parse_opts(opts)
|
|
if success:
|
|
results = self._match_library_tracks(lib, args)
|
|
self._output_match_results(results)
|
|
|
|
spotify_cmd = ui.Subcommand(
|
|
"spotify", help=f"build a {self.data_source} playlist"
|
|
)
|
|
spotify_cmd.parser.add_option(
|
|
"-m",
|
|
"--mode",
|
|
action="store",
|
|
help=(
|
|
f'"open" to open {self.data_source} with playlist, '
|
|
'"list" to print (default)'
|
|
),
|
|
)
|
|
spotify_cmd.parser.add_option(
|
|
"-f",
|
|
"--show-failures",
|
|
action="store_true",
|
|
dest="show_failures",
|
|
help=f"list tracks that did not match a {self.data_source} ID",
|
|
)
|
|
spotify_cmd.func = queries
|
|
|
|
# spotifysync command
|
|
sync_cmd = ui.Subcommand(
|
|
"spotifysync", help="fetch track attributes from Spotify"
|
|
)
|
|
sync_cmd.parser.add_option(
|
|
"-f",
|
|
"--force",
|
|
dest="force_refetch",
|
|
action="store_true",
|
|
default=False,
|
|
help="re-download data when already present",
|
|
)
|
|
|
|
def func(lib, opts, args):
|
|
items = lib.items(args)
|
|
self._fetch_info(items, ui.should_write(), opts.force_refetch)
|
|
|
|
sync_cmd.func = func
|
|
return [spotify_cmd, sync_cmd]
|
|
|
|
def _parse_opts(self, opts):
|
|
if opts.mode:
|
|
self.config["mode"].set(opts.mode)
|
|
|
|
if opts.show_failures:
|
|
self.config["show_failures"].set(True)
|
|
|
|
if self.config["mode"].get() not in ["list", "open"]:
|
|
self._log.warning(
|
|
"{} is not a valid mode", self.config["mode"].get()
|
|
)
|
|
return False
|
|
|
|
self.opts = opts
|
|
return True
|
|
|
|
def _match_library_tracks(self, library: Library, keywords: str):
|
|
"""Get simplified track object dicts for library tracks.
|
|
|
|
Matches tracks based on the specified ``keywords``.
|
|
|
|
:param library: beets library object to query.
|
|
:param keywords: Query to match library items against.
|
|
|
|
:returns: List of simplified track object dicts for library
|
|
items matching the specified query.
|
|
|
|
"""
|
|
results = []
|
|
failures = []
|
|
|
|
items = library.items(keywords)
|
|
|
|
if not items:
|
|
self._log.debug(
|
|
"Your beets query returned no items, skipping {.data_source}.",
|
|
self,
|
|
)
|
|
return
|
|
|
|
self._log.info("Processing {} tracks...", len(items))
|
|
|
|
for item in items:
|
|
# Apply regex transformations if provided
|
|
for regex in self.config["regex"].get():
|
|
if (
|
|
not regex["field"]
|
|
or not regex["search"]
|
|
or not regex["replace"]
|
|
):
|
|
continue
|
|
|
|
value = item[regex["field"]]
|
|
item[regex["field"]] = re.sub(
|
|
regex["search"], regex["replace"], value
|
|
)
|
|
|
|
artist = item["artist"] or item["albumartist"]
|
|
album = item["album"]
|
|
query_string = item["title"]
|
|
|
|
# Query the Web API for each track, look for the items' JSON data
|
|
query_filters: SearchFilter = {}
|
|
if artist:
|
|
query_filters["artist"] = artist
|
|
if album:
|
|
query_filters["album"] = album
|
|
|
|
response_data_tracks = self._search_api(
|
|
query_type="track",
|
|
query_string=query_string,
|
|
filters=query_filters,
|
|
)
|
|
if not response_data_tracks:
|
|
query = self._construct_search_query(
|
|
query_string=query_string, filters=query_filters
|
|
)
|
|
|
|
failures.append(query)
|
|
continue
|
|
|
|
# Apply market filter if requested
|
|
region_filter: str = self.config["region_filter"].get()
|
|
if region_filter:
|
|
response_data_tracks = [
|
|
track_data
|
|
for track_data in response_data_tracks
|
|
if region_filter in track_data["available_markets"]
|
|
]
|
|
|
|
if (
|
|
len(response_data_tracks) == 1
|
|
or self.config["tiebreak"].get() == "first"
|
|
):
|
|
self._log.debug(
|
|
"{.data_source} track(s) found, count: {}",
|
|
self,
|
|
len(response_data_tracks),
|
|
)
|
|
chosen_result = response_data_tracks[0]
|
|
else:
|
|
# Use the popularity filter
|
|
self._log.debug(
|
|
"Most popular track chosen, count: {}",
|
|
len(response_data_tracks),
|
|
)
|
|
chosen_result = max(
|
|
response_data_tracks,
|
|
key=lambda x: x[
|
|
# We are sure this is a track response!
|
|
"popularity" # type: ignore[typeddict-item]
|
|
],
|
|
)
|
|
results.append(chosen_result)
|
|
|
|
failure_count = len(failures)
|
|
if failure_count > 0:
|
|
if self.config["show_failures"].get():
|
|
self._log.info(
|
|
"{} track(s) did not match a {.data_source} ID:",
|
|
failure_count,
|
|
self,
|
|
)
|
|
for track in failures:
|
|
self._log.info("track: {}", track)
|
|
self._log.info("")
|
|
else:
|
|
self._log.warning(
|
|
"{} track(s) did not match a {.data_source} ID:\n"
|
|
"use --show-failures to display",
|
|
failure_count,
|
|
self,
|
|
)
|
|
|
|
return results
|
|
|
|
def _output_match_results(self, results):
|
|
"""Open a playlist or print Spotify URLs.
|
|
|
|
Uses the provided track object dicts.
|
|
|
|
:param list[dict] results: List of simplified track object dicts
|
|
(https://developer.spotify.com/documentation/web-api/
|
|
reference/object-model/#track-object-simplified)
|
|
|
|
"""
|
|
if results:
|
|
spotify_ids = [track_data["id"] for track_data in results]
|
|
if self.config["mode"].get() == "open":
|
|
self._log.info(
|
|
"Attempting to open {.data_source} with playlist", self
|
|
)
|
|
spotify_url = (
|
|
f"spotify:trackset:Playlist:{','.join(spotify_ids)}"
|
|
)
|
|
webbrowser.open(spotify_url)
|
|
else:
|
|
for spotify_id in spotify_ids:
|
|
print(f"{self.open_track_url}{spotify_id}")
|
|
else:
|
|
self._log.warning(
|
|
"No {.data_source} tracks found from beets query", self
|
|
)
|
|
|
|
def _fetch_info(self, items, write, force):
|
|
"""Obtain track information from Spotify."""
|
|
|
|
self._log.debug("Total {} tracks", len(items))
|
|
|
|
for index, item in enumerate(items, start=1):
|
|
self._log.info(
|
|
"Processing {}/{} tracks - {} ", index, len(items), item
|
|
)
|
|
# If we're not forcing re-downloading for all tracks, check
|
|
# whether the popularity data is already present
|
|
if not force:
|
|
if "spotify_track_popularity" in item:
|
|
self._log.debug("Popularity already present for: {}", item)
|
|
continue
|
|
try:
|
|
spotify_track_id = item.spotify_track_id
|
|
except AttributeError:
|
|
self._log.debug("No track_id present for: {}", item)
|
|
continue
|
|
|
|
popularity, isrc, ean, upc = self.track_info(spotify_track_id)
|
|
item["spotify_track_popularity"] = popularity
|
|
item["isrc"] = isrc
|
|
item["ean"] = ean
|
|
item["upc"] = upc
|
|
|
|
if self.audio_features_available:
|
|
audio_features = self.track_audio_features(spotify_track_id)
|
|
if audio_features is None:
|
|
self._log.info("No audio features found for: {}", item)
|
|
else:
|
|
for feature, value in audio_features.items():
|
|
if feature in self.spotify_audio_features:
|
|
item[self.spotify_audio_features[feature]] = value
|
|
else:
|
|
self._log.debug("Audio features API unavailable, skipping")
|
|
|
|
item["spotify_updated"] = time.time()
|
|
item.store()
|
|
if write:
|
|
item.try_write()
|
|
|
|
def track_info(self, track_id: str):
|
|
"""Fetch a track's popularity and external IDs using its Spotify ID."""
|
|
track_data = self._handle_response("get", f"{self.track_url}{track_id}")
|
|
external_ids = track_data.get("external_ids", {})
|
|
popularity = track_data.get("popularity")
|
|
self._log.debug(
|
|
"track_popularity: {} and track_isrc: {}",
|
|
popularity,
|
|
external_ids.get("isrc"),
|
|
)
|
|
return (
|
|
popularity,
|
|
external_ids.get("isrc"),
|
|
external_ids.get("ean"),
|
|
external_ids.get("upc"),
|
|
)
|
|
|
|
def track_audio_features(self, track_id: str):
|
|
"""Fetch track audio features by its Spotify ID.
|
|
|
|
Thread-safe: avoids redundant API calls and logs the 403 warning only
|
|
once.
|
|
|
|
"""
|
|
# Fast path: if we've already detected unavailability, skip the call.
|
|
with self._audio_features_lock:
|
|
if not self.audio_features_available:
|
|
return None
|
|
|
|
try:
|
|
return self._handle_response(
|
|
"get", f"{self.audio_features_url}{track_id}"
|
|
)
|
|
except AudioFeaturesUnavailableError:
|
|
# Disable globally in a thread-safe manner and warn once.
|
|
should_log = False
|
|
with self._audio_features_lock:
|
|
if self.audio_features_available:
|
|
self.audio_features_available = False
|
|
should_log = True
|
|
if should_log:
|
|
self._log.warning(
|
|
"Audio features API is unavailable (403 error). "
|
|
"Skipping audio features for remaining tracks."
|
|
)
|
|
return None
|
|
except APIError as e:
|
|
self._log.debug("Spotify API error: {}", e)
|
|
return None
|