diff --git a/beetsplug/lyrics.py b/beetsplug/lyrics.py
index 8e9452a2c..5a35519b5 100644
--- a/beetsplug/lyrics.py
+++ b/beetsplug/lyrics.py
@@ -19,13 +19,12 @@ from __future__ import annotations
import atexit
import errno
import itertools
-import json
import math
import os.path
import re
import struct
import unicodedata
-from contextlib import suppress
+from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import cached_property, partial, total_ordering
from http import HTTPStatus
@@ -108,8 +107,14 @@ class NotFoundError(requests.exceptions.HTTPError):
class TimeoutSession(requests.Session):
def request(self, *args, **kwargs):
+ """Wrap the request method to raise an exception on HTTP errors."""
kwargs.setdefault("timeout", 10)
- return super().request(*args, **kwargs)
+ r = super().request(*args, **kwargs)
+ if r.status_code == HTTPStatus.NOT_FOUND:
+ raise NotFoundError("HTTP Error: Not Found", response=r)
+ r.raise_for_status()
+
+ return r
r_session = TimeoutSession()
@@ -250,28 +255,36 @@ else:
return None
-class Backend:
+class RequestHandler:
+ _log: beets.logging.Logger
+
+ def fetch_text(self, url: str, **kwargs) -> str:
+ """Return text / HTML data from the given URL."""
+ self._log.debug("Fetching HTML from {}", url)
+ return r_session.get(url, **kwargs).text
+
+ def fetch_json(self, url: str, **kwargs):
+ """Return JSON data from the given URL."""
+ self._log.debug("Fetching JSON from {}", url)
+ return r_session.get(url, **kwargs).json()
+
+ @contextmanager
+ def handle_request(self) -> Iterator[None]:
+ try:
+ yield
+ except requests.JSONDecodeError:
+ self._log.warning("Could not decode response JSON data")
+ except requests.RequestException as exc:
+ self._log.warning("Request error: {}", exc)
+
+
+class Backend(RequestHandler):
REQUIRES_BS = False
def __init__(self, config, log):
self._log = log
self.config = config
- def fetch_url(self, url, **kwargs):
- """Retrieve the content at a given URL, or return None if the source
- is unreachable.
- """
- try:
- r = r_session.get(url)
- except requests.RequestException as exc:
- self._log.debug("lyrics request failed: {0}", exc)
- return
- if r.status_code == requests.codes.ok:
- return r.text
- else:
- self._log.debug("failed to fetch: {0} ({1})", url, r.status_code)
- return None
-
def fetch(
self, artist: str, title: str, album: str, length: int
) -> str | None:
@@ -368,15 +381,6 @@ class LRCLib(Backend):
"""Log a warning message with the class name."""
self._log.warning(f"{self.__class__.__name__}: {message}", *args)
- def fetch_json(self, *args, **kwargs):
- """Wrap the request method to raise an exception on HTTP errors."""
- r = r_session.get(*args, **kwargs)
- if r.status_code == HTTPStatus.NOT_FOUND:
- raise NotFoundError("HTTP Error: Not Found", response=r)
- r.raise_for_status()
-
- return r.json()
-
def fetch_candidates(
self, artist: str, title: str, album: str, length: int
) -> Iterator[list[LRCLibItem]]:
@@ -417,13 +421,7 @@ class LRCLib(Backend):
if item := self.pick_best_match(candidates):
return item.get_text(self.config["synced"])
except StopIteration:
- pass
- except requests.JSONDecodeError:
- self.warn("Could not decode response JSON data")
- except requests.RequestException as exc:
- self.warn("Request error: {}", exc)
-
- return None
+ return None
class DirectBackend(Backend):
@@ -463,9 +461,7 @@ class MusiXmatch(DirectBackend):
def fetch(self, artist: str, title: str, *_) -> str | None:
url = self.build_url(artist, title)
- html = self.fetch_url(url)
- if not html:
- return None
+ html = self.fetch_text(url)
if "We detected that your IP is blocked" in html:
self._log.warning(
"we are blocked at MusixMatch: url %s failed" % url
@@ -531,6 +527,7 @@ class Genius(SearchBackend):
"""
base_url = "https://api.genius.com"
+ search_url = f"{base_url}/search"
def __init__(self, config, log):
super().__init__(config, log)
@@ -553,10 +550,9 @@ class Genius(SearchBackend):
for hit in json["response"]["hits"]:
result = hit["result"]
if check(result["primary_artist"]["name"], result["title"]):
- html = self.fetch_url(result["url"])
- if not html:
- return None
- return self._scrape_lyrics_from_html(html)
+ return self._scrape_lyrics_from_html(
+ self.fetch_text(result["url"])
+ )
return None
@@ -567,29 +563,20 @@ class Genius(SearchBackend):
:returns: json response
"""
- search_url = self.base_url + "/search"
- data = {"q": title + " " + artist.lower()}
- try:
- r = r_session.get(search_url, params=data, headers=self.headers)
- except requests.RequestException as exc:
- self._log.debug("Genius API request failed: {0}", exc)
- return None
-
- try:
- return r.json()
- except ValueError:
- return None
+ return self.fetch_json(
+ self.search_url,
+ params={"q": f"{title} {artist.lower()}"},
+ headers=self.headers,
+ )
def replace_br(self, lyrics_div):
for br in lyrics_div.find_all("br"):
br.replace_with("\n")
- def _scrape_lyrics_from_html(self, html):
+ def _scrape_lyrics_from_html(self, html: str) -> str | None:
"""Scrape lyrics from a given genius.com html"""
soup = try_parse_html(html)
- if not soup:
- return
# Remove script tags that they put in the middle of the lyrics.
[h.extract() for h in soup("script")]
@@ -660,10 +647,12 @@ class Tekstowo(DirectBackend):
return cls.non_alpha_to_underscore(unidecode(text.lower()))
def fetch(self, artist: str, title: str, *_) -> str | None:
- if html := self.fetch_url(self.build_url(artist, title)):
- return self.extract_lyrics(html)
-
- return None
+ # We are expecting to receive a 404 since we are guessing the URL.
+ # Thus suppress the error so that it does not end up in the logs.
+ with suppress(NotFoundError):
+ return self.extract_lyrics(
+ self.fetch_text(self.build_url(artist, title))
+ )
def extract_lyrics(self, html: str) -> str | None:
html = _scrape_strip_cruft(html)
@@ -717,7 +706,7 @@ def _scrape_merge_paragraphs(html):
return re.sub(r"
\s*
", "\n", html)
-def scrape_lyrics_from_html(html):
+def scrape_lyrics_from_html(html: str) -> str | None:
"""Scrape lyrics from a URL. If no lyrics can be found, return None
instead.
"""
@@ -737,8 +726,6 @@ def scrape_lyrics_from_html(html):
# extract all long text blocks that are not code
soup = try_parse_html(html, parse_only=SoupStrainer(string=is_text_notcode))
- if not soup:
- return None
# Get the longest text element (if any).
strings = sorted(soup.stripped_strings, key=len, reverse=True)
@@ -831,39 +818,26 @@ class Google(SearchBackend):
"q": f"{artist} {title}",
}
- data = self.fetch_url(self.SEARCH_URL, params=params)
- if not data:
- self._log.debug("google backend returned no data")
- return None
- try:
- data = json.loads(data)
- except ValueError as exc:
- self._log.debug("google backend returned malformed JSON: {}", exc)
- if "error" in data:
- reason = data["error"]["errors"][0]["reason"]
- self._log.debug("google backend error: {0}", reason)
- return None
-
check_candidate = partial(self.is_page_candidate, artist, title)
- for item in data.get("items", []):
+ for item in self.fetch_json(self.SEARCH_URL, params=params).get(
+ "items", []
+ ):
url_link = item["link"]
if not check_candidate(url_link, item.get("title", "")):
continue
- html = self.fetch_url(url_link)
- if not html:
- continue
- lyrics = scrape_lyrics_from_html(html)
- if not lyrics:
- continue
+ with self.handle_request():
+ lyrics = scrape_lyrics_from_html(self.fetch_text(url_link))
+ if not lyrics:
+ continue
- if self.is_lyrics(lyrics, artist):
- self._log.debug("got lyrics from {0}", item["displayLink"])
- return lyrics
+ if self.is_lyrics(lyrics, artist):
+ self._log.debug("got lyrics from {0}", item["displayLink"])
+ return lyrics
return None
-class LyricsPlugin(plugins.BeetsPlugin):
+class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
SOURCES = ["lrclib", "google", "musixmatch", "genius", "tekstowo"]
SOURCE_BACKENDS = {
"google": Google,
@@ -934,7 +908,6 @@ class LyricsPlugin(plugins.BeetsPlugin):
self.config["bing_lang_from"] = [
x.lower() for x in self.config["bing_lang_from"].as_str_seq()
]
- self.bing_auth_token = None
if not HAS_LANGDETECT and self.config["bing_client_secret"].get():
self._log.warning(
@@ -962,7 +935,8 @@ class LyricsPlugin(plugins.BeetsPlugin):
return enabled_sources
- def get_bing_access_token(self):
+ @cached_property
+ def bing_access_token(self) -> str | None:
params = {
"client_id": "beets",
"client_secret": self.config["bing_client_secret"],
@@ -971,14 +945,11 @@ class LyricsPlugin(plugins.BeetsPlugin):
}
oauth_url = "https://datamarket.accesscontrol.windows.net/v2/OAuth2-13"
- oauth_token = r_session.post(oauth_url, params=params).json()
- if "access_token" in oauth_token:
- return "Bearer " + oauth_token["access_token"]
- else:
- self._log.warning(
- "Could not get Bing Translate API access token."
- ' Check your "bing_client_secret" password'
- )
+ with self.handle_request():
+ r = r_session.post(oauth_url, params=params)
+ return r.json()["access_token"]
+
+ return None
def commands(self):
cmd = ui.Subcommand("lyrics", help="fetch song lyrics")
@@ -1167,44 +1138,39 @@ class LyricsPlugin(plugins.BeetsPlugin):
None if no lyrics were found.
"""
for backend in self.backends:
- lyrics = backend.fetch(artist, title, *args)
- if lyrics:
- self._log.debug(
- "got lyrics from backend: {0}", backend.__class__.__name__
- )
- return _scrape_strip_cruft(lyrics, True)
+ with backend.handle_request():
+ if lyrics := backend.fetch(artist, title, *args):
+ self._log.debug(
+ "got lyrics from backend: {0}",
+ backend.__class__.__name__,
+ )
+ return _scrape_strip_cruft(lyrics, True)
return None
def append_translation(self, text, to_lang):
from xml.etree import ElementTree
- if not self.bing_auth_token:
- self.bing_auth_token = self.get_bing_access_token()
- if self.bing_auth_token:
- # Extract unique lines to limit API request size per song
- text_lines = set(text.split("\n"))
- url = (
- "https://api.microsofttranslator.com/v2/Http.svc/"
- "Translate?text=%s&to=%s" % ("|".join(text_lines), to_lang)
+ if not (token := self.bing_access_token):
+ self._log.warning(
+ "Could not get Bing Translate API access token. "
+ "Check your 'bing_client_secret' password."
)
- r = r_session.get(
- url, headers={"Authorization": self.bing_auth_token}
+ return text
+
+ # Extract unique lines to limit API request size per song
+ lines = text.split("\n")
+ unique_lines = set(lines)
+ url = "https://api.microsofttranslator.com/v2/Http.svc/Translate"
+ with self.handle_request():
+ text = self.fetch_text(
+ url,
+ headers={"Authorization": f"Bearer {token}"},
+ params={"text": "|".join(unique_lines), "to": to_lang},
)
- if r.status_code != 200:
- self._log.debug(
- "translation API error {}: {}", r.status_code, r.text
- )
- if "token has expired" in r.text:
- self.bing_auth_token = None
- return self.append_translation(text, to_lang)
- return text
- lines_translated = ElementTree.fromstring(
- r.text.encode("utf-8")
- ).text
- # Use a translation mapping dict to build resulting lyrics
- translations = dict(zip(text_lines, lines_translated.split("|")))
- result = ""
- for line in text.split("\n"):
- result += "{} / {}\n".format(line, translations[line])
- return result
+ if translated := ElementTree.fromstring(text.encode("utf-8")).text:
+ # Use a translation mapping dict to build resulting lyrics
+ translations = dict(zip(unique_lines, translated.split("|")))
+ return "".join(f"{ln} / {translations[ln]}\n" for ln in lines)
+
+ return text
diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py
index 812399698..b77054b52 100644
--- a/test/plugins/test_lyrics.py
+++ b/test/plugins/test_lyrics.py
@@ -202,7 +202,7 @@ def lyrics_root_dir(pytestconfig: pytest.Config):
return pytestconfig.rootpath / "test" / "rsrc" / "lyrics"
-class LyricsBackendTest(PluginMixin):
+class LyricsPluginMixin(PluginMixin):
plugin = "lyrics"
@pytest.fixture
@@ -218,6 +218,42 @@ class LyricsBackendTest(PluginMixin):
return lyrics.LyricsPlugin()
+
+class TestLyricsPlugin(LyricsPluginMixin):
+ @pytest.fixture
+ def backend_name(self):
+ """Return lyrics configuration to test."""
+ return "lrclib"
+
+ @pytest.mark.parametrize(
+ "request_kwargs, expected_log_match",
+ [
+ (
+ {"status_code": HTTPStatus.BAD_GATEWAY},
+ r"lyrics: Request error: 502",
+ ),
+ ({"text": "invalid"}, r"lyrics: Could not decode.*JSON"),
+ ],
+ )
+ def test_error_handling(
+ self,
+ requests_mock,
+ lyrics_plugin,
+ caplog,
+ request_kwargs,
+ expected_log_match,
+ ):
+ """Errors are logged with the plugin name."""
+ requests_mock.get(lyrics.LRCLib.SEARCH_URL, **request_kwargs)
+
+ assert lyrics_plugin.get_lyrics("", "", "", 0.0) is None
+ assert caplog.messages
+ last_log = caplog.messages[-1]
+ assert last_log
+ assert re.search(expected_log_match, last_log, re.I)
+
+
+class LyricsBackendTest(LyricsPluginMixin):
@pytest.fixture
def backend(self, lyrics_plugin):
"""Return a lyrics backend instance."""
@@ -399,13 +435,9 @@ class TestLRCLibLyrics(LyricsBackendTest):
return "lrclib"
@pytest.fixture
- def request_kwargs(self, response_data):
- return {"json": response_data}
-
- @pytest.fixture
- def fetch_lyrics(self, backend, requests_mock, request_kwargs):
+ def fetch_lyrics(self, backend, requests_mock, response_data):
requests_mock.get(backend.GET_URL, status_code=HTTPStatus.NOT_FOUND)
- requests_mock.get(backend.SEARCH_URL, **request_kwargs)
+ requests_mock.get(backend.SEARCH_URL, json=response_data)
return partial(backend.fetch, "la", "la", "la", self.ITEM_DURATION)
@@ -478,19 +510,3 @@ class TestLRCLibLyrics(LyricsBackendTest):
@pytest.mark.parametrize("plugin_config", [{"synced": True}])
def test_fetch_lyrics(self, fetch_lyrics, expected_lyrics):
assert fetch_lyrics() == expected_lyrics
-
- @pytest.mark.parametrize(
- "request_kwargs, expected_log_match",
- [
- (
- {"status_code": HTTPStatus.BAD_GATEWAY},
- r"LRCLib: Request error: 502",
- ),
- ({"text": "invalid"}, r"LRCLib: Could not decode.*JSON"),
- ],
- )
- def test_error(self, caplog, fetch_lyrics, expected_log_match):
- assert fetch_lyrics() is None
- assert caplog.messages
- assert (last_log := caplog.messages[-1])
- assert re.search(expected_log_match, last_log, re.I)