Centralise request error handling

This commit is contained in:
Šarūnas Nejus 2024-10-13 16:14:15 +01:00
parent 06eac79c0d
commit 283c513c72
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
2 changed files with 136 additions and 154 deletions

View file

@ -19,13 +19,12 @@ from __future__ import annotations
import atexit
import errno
import itertools
import json
import math
import os.path
import re
import struct
import unicodedata
from contextlib import suppress
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import cached_property, partial, total_ordering
from http import HTTPStatus
@ -108,8 +107,14 @@ class NotFoundError(requests.exceptions.HTTPError):
class TimeoutSession(requests.Session):
def request(self, *args, **kwargs):
"""Wrap the request method to raise an exception on HTTP errors."""
kwargs.setdefault("timeout", 10)
return super().request(*args, **kwargs)
r = super().request(*args, **kwargs)
if r.status_code == HTTPStatus.NOT_FOUND:
raise NotFoundError("HTTP Error: Not Found", response=r)
r.raise_for_status()
return r
r_session = TimeoutSession()
@ -250,28 +255,36 @@ else:
return None
class Backend:
class RequestHandler:
_log: beets.logging.Logger
def fetch_text(self, url: str, **kwargs) -> str:
"""Return text / HTML data from the given URL."""
self._log.debug("Fetching HTML from {}", url)
return r_session.get(url, **kwargs).text
def fetch_json(self, url: str, **kwargs):
"""Return JSON data from the given URL."""
self._log.debug("Fetching JSON from {}", url)
return r_session.get(url, **kwargs).json()
@contextmanager
def handle_request(self) -> Iterator[None]:
try:
yield
except requests.JSONDecodeError:
self._log.warning("Could not decode response JSON data")
except requests.RequestException as exc:
self._log.warning("Request error: {}", exc)
class Backend(RequestHandler):
REQUIRES_BS = False
def __init__(self, config, log):
self._log = log
self.config = config
def fetch_url(self, url, **kwargs):
"""Retrieve the content at a given URL, or return None if the source
is unreachable.
"""
try:
r = r_session.get(url)
except requests.RequestException as exc:
self._log.debug("lyrics request failed: {0}", exc)
return
if r.status_code == requests.codes.ok:
return r.text
else:
self._log.debug("failed to fetch: {0} ({1})", url, r.status_code)
return None
def fetch(
self, artist: str, title: str, album: str, length: int
) -> str | None:
@ -368,15 +381,6 @@ class LRCLib(Backend):
"""Log a warning message with the class name."""
self._log.warning(f"{self.__class__.__name__}: {message}", *args)
def fetch_json(self, *args, **kwargs):
"""Wrap the request method to raise an exception on HTTP errors."""
r = r_session.get(*args, **kwargs)
if r.status_code == HTTPStatus.NOT_FOUND:
raise NotFoundError("HTTP Error: Not Found", response=r)
r.raise_for_status()
return r.json()
def fetch_candidates(
self, artist: str, title: str, album: str, length: int
) -> Iterator[list[LRCLibItem]]:
@ -417,13 +421,7 @@ class LRCLib(Backend):
if item := self.pick_best_match(candidates):
return item.get_text(self.config["synced"])
except StopIteration:
pass
except requests.JSONDecodeError:
self.warn("Could not decode response JSON data")
except requests.RequestException as exc:
self.warn("Request error: {}", exc)
return None
return None
class DirectBackend(Backend):
@ -463,9 +461,7 @@ class MusiXmatch(DirectBackend):
def fetch(self, artist: str, title: str, *_) -> str | None:
url = self.build_url(artist, title)
html = self.fetch_url(url)
if not html:
return None
html = self.fetch_text(url)
if "We detected that your IP is blocked" in html:
self._log.warning(
"we are blocked at MusixMatch: url %s failed" % url
@ -531,6 +527,7 @@ class Genius(SearchBackend):
"""
base_url = "https://api.genius.com"
search_url = f"{base_url}/search"
def __init__(self, config, log):
super().__init__(config, log)
@ -553,10 +550,9 @@ class Genius(SearchBackend):
for hit in json["response"]["hits"]:
result = hit["result"]
if check(result["primary_artist"]["name"], result["title"]):
html = self.fetch_url(result["url"])
if not html:
return None
return self._scrape_lyrics_from_html(html)
return self._scrape_lyrics_from_html(
self.fetch_text(result["url"])
)
return None
@ -567,29 +563,20 @@ class Genius(SearchBackend):
:returns: json response
"""
search_url = self.base_url + "/search"
data = {"q": title + " " + artist.lower()}
try:
r = r_session.get(search_url, params=data, headers=self.headers)
except requests.RequestException as exc:
self._log.debug("Genius API request failed: {0}", exc)
return None
try:
return r.json()
except ValueError:
return None
return self.fetch_json(
self.search_url,
params={"q": f"{title} {artist.lower()}"},
headers=self.headers,
)
def replace_br(self, lyrics_div):
for br in lyrics_div.find_all("br"):
br.replace_with("\n")
def _scrape_lyrics_from_html(self, html):
def _scrape_lyrics_from_html(self, html: str) -> str | None:
"""Scrape lyrics from a given genius.com html"""
soup = try_parse_html(html)
if not soup:
return
# Remove script tags that they put in the middle of the lyrics.
[h.extract() for h in soup("script")]
@ -660,10 +647,12 @@ class Tekstowo(DirectBackend):
return cls.non_alpha_to_underscore(unidecode(text.lower()))
def fetch(self, artist: str, title: str, *_) -> str | None:
if html := self.fetch_url(self.build_url(artist, title)):
return self.extract_lyrics(html)
return None
# We are expecting to receive a 404 since we are guessing the URL.
# Thus suppress the error so that it does not end up in the logs.
with suppress(NotFoundError):
return self.extract_lyrics(
self.fetch_text(self.build_url(artist, title))
)
def extract_lyrics(self, html: str) -> str | None:
html = _scrape_strip_cruft(html)
@ -717,7 +706,7 @@ def _scrape_merge_paragraphs(html):
return re.sub(r"<div .*>\s*</div>", "\n", html)
def scrape_lyrics_from_html(html):
def scrape_lyrics_from_html(html: str) -> str | None:
"""Scrape lyrics from a URL. If no lyrics can be found, return None
instead.
"""
@ -737,8 +726,6 @@ def scrape_lyrics_from_html(html):
# extract all long text blocks that are not code
soup = try_parse_html(html, parse_only=SoupStrainer(string=is_text_notcode))
if not soup:
return None
# Get the longest text element (if any).
strings = sorted(soup.stripped_strings, key=len, reverse=True)
@ -831,39 +818,26 @@ class Google(SearchBackend):
"q": f"{artist} {title}",
}
data = self.fetch_url(self.SEARCH_URL, params=params)
if not data:
self._log.debug("google backend returned no data")
return None
try:
data = json.loads(data)
except ValueError as exc:
self._log.debug("google backend returned malformed JSON: {}", exc)
if "error" in data:
reason = data["error"]["errors"][0]["reason"]
self._log.debug("google backend error: {0}", reason)
return None
check_candidate = partial(self.is_page_candidate, artist, title)
for item in data.get("items", []):
for item in self.fetch_json(self.SEARCH_URL, params=params).get(
"items", []
):
url_link = item["link"]
if not check_candidate(url_link, item.get("title", "")):
continue
html = self.fetch_url(url_link)
if not html:
continue
lyrics = scrape_lyrics_from_html(html)
if not lyrics:
continue
with self.handle_request():
lyrics = scrape_lyrics_from_html(self.fetch_text(url_link))
if not lyrics:
continue
if self.is_lyrics(lyrics, artist):
self._log.debug("got lyrics from {0}", item["displayLink"])
return lyrics
if self.is_lyrics(lyrics, artist):
self._log.debug("got lyrics from {0}", item["displayLink"])
return lyrics
return None
class LyricsPlugin(plugins.BeetsPlugin):
class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
SOURCES = ["lrclib", "google", "musixmatch", "genius", "tekstowo"]
SOURCE_BACKENDS = {
"google": Google,
@ -934,7 +908,6 @@ class LyricsPlugin(plugins.BeetsPlugin):
self.config["bing_lang_from"] = [
x.lower() for x in self.config["bing_lang_from"].as_str_seq()
]
self.bing_auth_token = None
if not HAS_LANGDETECT and self.config["bing_client_secret"].get():
self._log.warning(
@ -962,7 +935,8 @@ class LyricsPlugin(plugins.BeetsPlugin):
return enabled_sources
def get_bing_access_token(self):
@cached_property
def bing_access_token(self) -> str | None:
params = {
"client_id": "beets",
"client_secret": self.config["bing_client_secret"],
@ -971,14 +945,11 @@ class LyricsPlugin(plugins.BeetsPlugin):
}
oauth_url = "https://datamarket.accesscontrol.windows.net/v2/OAuth2-13"
oauth_token = r_session.post(oauth_url, params=params).json()
if "access_token" in oauth_token:
return "Bearer " + oauth_token["access_token"]
else:
self._log.warning(
"Could not get Bing Translate API access token."
' Check your "bing_client_secret" password'
)
with self.handle_request():
r = r_session.post(oauth_url, params=params)
return r.json()["access_token"]
return None
def commands(self):
cmd = ui.Subcommand("lyrics", help="fetch song lyrics")
@ -1167,44 +1138,39 @@ class LyricsPlugin(plugins.BeetsPlugin):
None if no lyrics were found.
"""
for backend in self.backends:
lyrics = backend.fetch(artist, title, *args)
if lyrics:
self._log.debug(
"got lyrics from backend: {0}", backend.__class__.__name__
)
return _scrape_strip_cruft(lyrics, True)
with backend.handle_request():
if lyrics := backend.fetch(artist, title, *args):
self._log.debug(
"got lyrics from backend: {0}",
backend.__class__.__name__,
)
return _scrape_strip_cruft(lyrics, True)
return None
def append_translation(self, text, to_lang):
from xml.etree import ElementTree
if not self.bing_auth_token:
self.bing_auth_token = self.get_bing_access_token()
if self.bing_auth_token:
# Extract unique lines to limit API request size per song
text_lines = set(text.split("\n"))
url = (
"https://api.microsofttranslator.com/v2/Http.svc/"
"Translate?text=%s&to=%s" % ("|".join(text_lines), to_lang)
if not (token := self.bing_access_token):
self._log.warning(
"Could not get Bing Translate API access token. "
"Check your 'bing_client_secret' password."
)
r = r_session.get(
url, headers={"Authorization": self.bing_auth_token}
return text
# Extract unique lines to limit API request size per song
lines = text.split("\n")
unique_lines = set(lines)
url = "https://api.microsofttranslator.com/v2/Http.svc/Translate"
with self.handle_request():
text = self.fetch_text(
url,
headers={"Authorization": f"Bearer {token}"},
params={"text": "|".join(unique_lines), "to": to_lang},
)
if r.status_code != 200:
self._log.debug(
"translation API error {}: {}", r.status_code, r.text
)
if "token has expired" in r.text:
self.bing_auth_token = None
return self.append_translation(text, to_lang)
return text
lines_translated = ElementTree.fromstring(
r.text.encode("utf-8")
).text
# Use a translation mapping dict to build resulting lyrics
translations = dict(zip(text_lines, lines_translated.split("|")))
result = ""
for line in text.split("\n"):
result += "{} / {}\n".format(line, translations[line])
return result
if translated := ElementTree.fromstring(text.encode("utf-8")).text:
# Use a translation mapping dict to build resulting lyrics
translations = dict(zip(unique_lines, translated.split("|")))
return "".join(f"{ln} / {translations[ln]}\n" for ln in lines)
return text

View file

@ -202,7 +202,7 @@ def lyrics_root_dir(pytestconfig: pytest.Config):
return pytestconfig.rootpath / "test" / "rsrc" / "lyrics"
class LyricsBackendTest(PluginMixin):
class LyricsPluginMixin(PluginMixin):
plugin = "lyrics"
@pytest.fixture
@ -218,6 +218,42 @@ class LyricsBackendTest(PluginMixin):
return lyrics.LyricsPlugin()
class TestLyricsPlugin(LyricsPluginMixin):
@pytest.fixture
def backend_name(self):
"""Return lyrics configuration to test."""
return "lrclib"
@pytest.mark.parametrize(
"request_kwargs, expected_log_match",
[
(
{"status_code": HTTPStatus.BAD_GATEWAY},
r"lyrics: Request error: 502",
),
({"text": "invalid"}, r"lyrics: Could not decode.*JSON"),
],
)
def test_error_handling(
self,
requests_mock,
lyrics_plugin,
caplog,
request_kwargs,
expected_log_match,
):
"""Errors are logged with the plugin name."""
requests_mock.get(lyrics.LRCLib.SEARCH_URL, **request_kwargs)
assert lyrics_plugin.get_lyrics("", "", "", 0.0) is None
assert caplog.messages
last_log = caplog.messages[-1]
assert last_log
assert re.search(expected_log_match, last_log, re.I)
class LyricsBackendTest(LyricsPluginMixin):
@pytest.fixture
def backend(self, lyrics_plugin):
"""Return a lyrics backend instance."""
@ -399,13 +435,9 @@ class TestLRCLibLyrics(LyricsBackendTest):
return "lrclib"
@pytest.fixture
def request_kwargs(self, response_data):
return {"json": response_data}
@pytest.fixture
def fetch_lyrics(self, backend, requests_mock, request_kwargs):
def fetch_lyrics(self, backend, requests_mock, response_data):
requests_mock.get(backend.GET_URL, status_code=HTTPStatus.NOT_FOUND)
requests_mock.get(backend.SEARCH_URL, **request_kwargs)
requests_mock.get(backend.SEARCH_URL, json=response_data)
return partial(backend.fetch, "la", "la", "la", self.ITEM_DURATION)
@ -478,19 +510,3 @@ class TestLRCLibLyrics(LyricsBackendTest):
@pytest.mark.parametrize("plugin_config", [{"synced": True}])
def test_fetch_lyrics(self, fetch_lyrics, expected_lyrics):
assert fetch_lyrics() == expected_lyrics
@pytest.mark.parametrize(
"request_kwargs, expected_log_match",
[
(
{"status_code": HTTPStatus.BAD_GATEWAY},
r"LRCLib: Request error: 502",
),
({"text": "invalid"}, r"LRCLib: Could not decode.*JSON"),
],
)
def test_error(self, caplog, fetch_lyrics, expected_log_match):
assert fetch_lyrics() is None
assert caplog.messages
assert (last_log := caplog.messages[-1])
assert re.search(expected_log_match, last_log, re.I)