diff --git a/beetsplug/_typing.py b/beetsplug/_typing.py new file mode 100644 index 000000000..915ea77e8 --- /dev/null +++ b/beetsplug/_typing.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import NotRequired, TypedDict + +JSONDict = dict[str, Any] + + +class LRCLibAPI: + class Item(TypedDict): + """Lyrics data item returned by the LRCLib API.""" + + id: int + name: str + trackName: str + artistName: str + albumName: str + duration: float | None + instrumental: bool + plainLyrics: str + syncedLyrics: str | None + + +class GeniusAPI: + """Genius API data types. + + This documents *only* the fields that are used in the plugin. + :attr:`SearchResult` is an exception, since I thought some of the other + fields might be useful in the future. + """ + + class DateComponents(TypedDict): + year: int + month: int + day: int + + class Artist(TypedDict): + api_path: str + header_image_url: str + id: int + image_url: str + is_meme_verified: bool + is_verified: bool + name: str + url: str + + class Stats(TypedDict): + unreviewed_annotations: int + hot: bool + + class SearchResult(TypedDict): + annotation_count: int + api_path: str + artist_names: str + full_title: str + header_image_thumbnail_url: str + header_image_url: str + id: int + lyrics_owner_id: int + lyrics_state: str + path: str + primary_artist_names: str + pyongs_count: int | None + relationships_index_url: str + release_date_components: GeniusAPI.DateComponents + release_date_for_display: str + release_date_with_abbreviated_month_for_display: str + song_art_image_thumbnail_url: str + song_art_image_url: str + stats: GeniusAPI.Stats + title: str + title_with_featured: str + url: str + featured_artists: list[GeniusAPI.Artist] + primary_artist: GeniusAPI.Artist + primary_artists: list[GeniusAPI.Artist] + + class SearchHit(TypedDict): + result: GeniusAPI.SearchResult + + class SearchResponse(TypedDict): + hits: list[GeniusAPI.SearchHit] + + class Search(TypedDict): + response: GeniusAPI.SearchResponse + + +class GoogleCustomSearchAPI: + class Response(TypedDict): + """Search response from the Google Custom Search API. + + If the search returns no results, the :attr:`items` field is not found. + """ + + items: NotRequired[list[GoogleCustomSearchAPI.Item]] + + class Item(TypedDict): + """A Google Custom Search API result item. + + :attr:`title` field is shown to the user in the search interface, thus + it gets truncated with an ellipsis for longer queries. For most + results, the full title is available as ``og:title`` metatag found + under the :attr:`pagemap` field. Note neither this metatag nor the + ``pagemap`` field is guaranteed to be present in the data. + """ + + title: str + link: str + pagemap: NotRequired[GoogleCustomSearchAPI.Pagemap] + + class Pagemap(TypedDict): + """Pagemap data with a single meta tags dict in a list.""" + + metatags: list[JSONDict] diff --git a/beetsplug/lyrics.py b/beetsplug/lyrics.py index d1d715ce4..1732edbf7 100644 --- a/beetsplug/lyrics.py +++ b/beetsplug/lyrics.py @@ -16,52 +16,35 @@ from __future__ import annotations -import difflib +import atexit import errno import itertools -import json +import math import os.path import re -import struct -import unicodedata -import warnings -from contextlib import suppress +from contextlib import contextmanager, suppress from dataclasses import dataclass from functools import cached_property, partial, total_ordering +from html import unescape from http import HTTPStatus -from typing import TYPE_CHECKING, ClassVar, Iterable, Iterator -from urllib.parse import quote, urlencode +from typing import TYPE_CHECKING, Iterable, Iterator, NamedTuple +from urllib.parse import quote, quote_plus, urlencode, urlparse +import langdetect import requests -from typing_extensions import TypedDict +from bs4 import BeautifulSoup from unidecode import unidecode import beets from beets import plugins, ui +from beets.autotag.hooks import string_dist if TYPE_CHECKING: from beets.importer import ImportTask from beets.library import Item -try: - import bs4 - from bs4 import SoupStrainer + from ._typing import GeniusAPI, GoogleCustomSearchAPI, JSONDict, LRCLibAPI - HAS_BEAUTIFUL_SOUP = True -except ImportError: - HAS_BEAUTIFUL_SOUP = False - -try: - import langdetect - - HAS_LANGDETECT = True -except ImportError: - HAS_LANGDETECT = False - -DIV_RE = re.compile(r"<(/?)div>?", re.I) -COMMENT_RE = re.compile(r"", re.S) -TAG_RE = re.compile(r"<[^>]*>") -BREAK_RE = re.compile(r"\n?\s*]*)*>\s*\n?", re.I) USER_AGENT = f"beets/{beets.__version__}" INSTRUMENTAL_LYRICS = "[Instrumental]" @@ -105,39 +88,38 @@ class NotFoundError(requests.exceptions.HTTPError): pass +class CaptchaError(requests.exceptions.HTTPError): + pass + + +class TimeoutSession(requests.Session): + def request(self, *args, **kwargs): + """Wrap the request method to raise an exception on HTTP errors.""" + kwargs.setdefault("timeout", 10) + r = super().request(*args, **kwargs) + if r.status_code == HTTPStatus.NOT_FOUND: + raise NotFoundError("HTTP Error: Not Found", response=r) + if 300 <= r.status_code < 400: + raise CaptchaError("Captcha is required", response=r) + + r.raise_for_status() + + return r + + +r_session = TimeoutSession() +r_session.headers.update({"User-Agent": USER_AGENT}) + + +@atexit.register +def close_session(): + """Close the requests session on shut down.""" + r_session.close() + + # Utilities. -def unichar(i): - try: - return chr(i) - except ValueError: - return struct.pack("i", i).decode("utf-32") - - -def unescape(text): - """Resolve &#xxx; HTML entities (and some others).""" - if isinstance(text, bytes): - text = text.decode("utf-8", "ignore") - out = text.replace(" ", " ") - - def replchar(m): - num = m.group(1) - return unichar(int(num)) - - out = re.sub("&#(\\d+);", replchar, out) - return out - - -def extract_text_between(html, start_marker, end_marker): - try: - _, html = html.split(start_marker, 1) - html, _ = html.split(end_marker, 1) - except ValueError: - return "" - return html - - def search_pairs(item): """Yield a pairs of artists and titles to search for. @@ -176,10 +158,20 @@ def search_pairs(item): # Remove any featuring artists from the artists name rf"(.*?) {plugins.feat_tokens()}" ] - artists = generate_alternatives(artist, patterns) + + # Skip various artists + artists = [] + lower_artist = artist.lower() + if "various" not in lower_artist: + artists.extend(generate_alternatives(artist, patterns)) # Use the artist_sort as fallback only if it differs from artist to avoid # repeated remote requests with the same search terms - if artist_sort and artist.lower() != artist_sort.lower(): + artist_sort_lower = artist_sort.lower() + if ( + artist_sort + and lower_artist != artist_sort_lower + and "various" not in artist_sort_lower + ): artists.append(artist_sort) patterns = [ @@ -198,13 +190,13 @@ def search_pairs(item): multi_titles = [] for title in titles: multi_titles.append([title]) - if "/" in title: - multi_titles.append([x.strip() for x in title.split("/")]) + if " / " in title: + multi_titles.append([x.strip() for x in title.split(" / ")]) return itertools.product(artists, multi_titles) -def slug(text): +def slug(text: str) -> str: """Make a URL-safe, human-readable version of the given text This will do the following: @@ -214,81 +206,80 @@ def slug(text): 3. strip whitespace 4. replace other non-word characters with dashes 5. strip extra dashes - - This somewhat duplicates the :func:`Google.slugify` function but - slugify is not as generic as this one, which can be reused - elsewhere. """ return re.sub(r"\W+", "-", unidecode(text).lower().strip()).strip("-") -if HAS_BEAUTIFUL_SOUP: +class RequestHandler: + _log: beets.logging.Logger - def try_parse_html(html, **kwargs): - return bs4.BeautifulSoup(html, "html.parser", **kwargs) + def debug(self, message: str, *args) -> None: + """Log a debug message with the class name.""" + self._log.debug(f"{self.__class__.__name__}: {message}", *args) -else: + def info(self, message: str, *args) -> None: + """Log an info message with the class name.""" + self._log.info(f"{self.__class__.__name__}: {message}", *args) - def try_parse_html(html, **kwargs): - return None + def warn(self, message: str, *args) -> None: + """Log warning with the class name.""" + self._log.warning(f"{self.__class__.__name__}: {message}", *args) + + @staticmethod + def format_url(url: str, params: JSONDict | None) -> str: + if not params: + return url + + return f"{url}?{urlencode(params)}" + + def fetch_text( + self, url: str, params: JSONDict | None = None, **kwargs + ) -> str: + """Return text / HTML data from the given URL. + + Set the encoding to None to let requests handle it because some sites + set it incorrectly. + """ + url = self.format_url(url, params) + self.debug("Fetching HTML from {}", url) + r = r_session.get(url, **kwargs) + r.encoding = None + return r.text + + def fetch_json(self, url: str, params: JSONDict | None = None, **kwargs): + """Return JSON data from the given URL.""" + url = self.format_url(url, params) + self.debug("Fetching JSON from {}", url) + return r_session.get(url, **kwargs).json() + + @contextmanager + def handle_request(self) -> Iterator[None]: + try: + yield + except requests.JSONDecodeError: + self.warn("Could not decode response JSON data") + except requests.RequestException as exc: + self.warn("Request error: {}", exc) -class Backend: - REQUIRES_BS = False +class BackendClass(type): + @property + def name(cls) -> str: + """Return lowercase name of the backend class.""" + return cls.__name__.lower() + +class Backend(RequestHandler, metaclass=BackendClass): def __init__(self, config, log): self._log = log self.config = config - def fetch_url(self, url, **kwargs): - """Retrieve the content at a given URL, or return None if the source - is unreachable. - """ - try: - # Disable the InsecureRequestWarning that comes from using - # `verify=false`. - # https://github.com/kennethreitz/requests/issues/2214 - # We're not overly worried about the NSA MITMing our lyrics scraper - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - r = requests.get( - url, - verify=False, - headers={ - "User-Agent": USER_AGENT, - }, - timeout=10, - **kwargs, - ) - except requests.RequestException as exc: - self._log.debug("lyrics request failed: {0}", exc) - return - if r.status_code == requests.codes.ok: - return r.text - else: - self._log.debug("failed to fetch: {0} ({1})", url, r.status_code) - return None - def fetch( self, artist: str, title: str, album: str, length: int - ) -> str | None: + ) -> tuple[str, str] | None: raise NotImplementedError -class LRCLibItem(TypedDict): - """Lyrics data item returned by the LRCLib API.""" - - id: int - name: str - trackName: str - artistName: str - albumName: str - duration: float | None - instrumental: bool - plainLyrics: str - syncedLyrics: str | None - - @dataclass @total_ordering class LRCLyrics: @@ -296,6 +287,7 @@ class LRCLyrics: DURATION_DIFF_TOLERANCE = 0.05 target_duration: float + id: int duration: float instrumental: bool plain: str @@ -306,9 +298,12 @@ class LRCLyrics: return self.dist < other.dist @classmethod - def make(cls, candidate: LRCLibItem, target_duration: float) -> LRCLyrics: + def make( + cls, candidate: LRCLibAPI.Item, target_duration: float + ) -> LRCLyrics: return cls( target_duration, + candidate["id"], candidate["duration"] or 0.0, candidate["instrumental"], candidate["plainLyrics"], @@ -361,24 +356,9 @@ class LRCLib(Backend): GET_URL = f"{BASE_URL}/get" SEARCH_URL = f"{BASE_URL}/search" - def warn(self, message: str, *args) -> None: - """Log a warning message with the class name.""" - self._log.warning(f"{self.__class__.__name__}: {message}", *args) - - def fetch_json(self, *args, **kwargs): - """Wrap the request method to raise an exception on HTTP errors.""" - kwargs.setdefault("timeout", 10) - kwargs.setdefault("headers", {"User-Agent": USER_AGENT}) - r = requests.get(*args, **kwargs) - if r.status_code == HTTPStatus.NOT_FOUND: - raise NotFoundError("HTTP Error: Not Found", response=r) - r.raise_for_status() - - return r.json() - def fetch_candidates( self, artist: str, title: str, album: str, length: int - ) -> Iterator[list[LRCLibItem]]: + ) -> Iterator[list[LRCLibAPI.Item]]: """Yield lyrics candidates for the given song data. I found that the ``/get`` endpoint sometimes returns inaccurate or @@ -406,41 +386,20 @@ class LRCLib(Backend): def fetch( self, artist: str, title: str, album: str, length: int - ) -> str | None: + ) -> tuple[str, str] | None: """Fetch lyrics text for the given song data.""" evaluate_item = partial(LRCLyrics.make, target_duration=length) - try: - for group in self.fetch_candidates(artist, title, album, length): - candidates = [evaluate_item(item) for item in group] - if item := self.pick_best_match(candidates): - return item.get_text(self.config["synced"]) - except StopIteration: - pass - except requests.JSONDecodeError: - self.warn("Could not decode response JSON data") - except requests.RequestException as exc: - self.warn("Request error: {}", exc) + for group in self.fetch_candidates(artist, title, album, length): + candidates = [evaluate_item(item) for item in group] + if item := self.pick_best_match(candidates): + lyrics = item.get_text(self.config["synced"]) + return lyrics, f"{self.GET_URL}/{item.id}" return None -class DirectBackend(Backend): - """A backend for fetching lyrics directly.""" - - URL_TEMPLATE: ClassVar[str] #: May include formatting placeholders - - @classmethod - def encode(cls, text: str) -> str: - """Encode the string for inclusion in a URL.""" - raise NotImplementedError - - @classmethod - def build_url(cls, *args: str) -> str: - return cls.URL_TEMPLATE.format(*map(cls.encode, args)) - - -class MusiXmatch(DirectBackend): +class MusiXmatch(Backend): URL_TEMPLATE = "https://www.musixmatch.com/lyrics/{}/{}" REPLACEMENTS = { @@ -459,22 +418,22 @@ class MusiXmatch(DirectBackend): return quote(unidecode(text)) - def fetch(self, artist: str, title: str, *_) -> str | None: + @classmethod + def build_url(cls, *args: str) -> str: + return cls.URL_TEMPLATE.format(*map(cls.encode, args)) + + def fetch(self, artist: str, title: str, *_) -> tuple[str, str] | None: url = self.build_url(artist, title) - html = self.fetch_url(url) - if not html: - return None + html = self.fetch_text(url) if "We detected that your IP is blocked" in html: - self._log.warning( - "we are blocked at MusixMatch: url %s failed" % url - ) + self.warn("Failed: Blocked IP address") return None html_parts = html.split('

", "

")) + lyrics_parts.append(re.sub(r"^[^>]+>|

.*", "", html_part)) lyrics = "\n".join(lyrics_parts) lyrics = lyrics.strip(',"').replace("\\n", "\n") # another odd case: sometimes only that string remains, for @@ -485,170 +444,184 @@ class MusiXmatch(DirectBackend): # sometimes there are non-existent lyrics with some content if "Lyrics | Musixmatch" in lyrics: return None - return lyrics + return lyrics, url -class Genius(Backend): - """Fetch lyrics from Genius via genius-api. - - Simply adapted from - bigishdata.com/2016/09/27/getting-song-lyrics-from-geniuss-api-scraping/ - """ - - REQUIRES_BS = True - - base_url = "https://api.genius.com" - - def __init__(self, config, log): - super().__init__(config, log) - self.api_key = config["genius_api_key"].as_str() - self.headers = { - "Authorization": "Bearer %s" % self.api_key, - "User-Agent": USER_AGENT, - } - - def fetch(self, artist: str, title: str, *_) -> str | None: - """Fetch lyrics from genius.com - - Because genius doesn't allow accessing lyrics via the api, - we first query the api for a url matching our artist & title, - then attempt to scrape that url for the lyrics. - """ - json = self._search(artist, title) - if not json: - self._log.debug("Genius API request returned invalid JSON") - return None - - # find a matching artist in the json - for hit in json["response"]["hits"]: - hit_artist = hit["result"]["primary_artist"]["name"] - - if slug(hit_artist) == slug(artist): - html = self.fetch_url(hit["result"]["url"]) - if not html: - return None - return self._scrape_lyrics_from_html(html) - - self._log.debug( - "Genius failed to find a matching artist for '{0}'", artist - ) - return None - - def _search(self, artist, title): - """Searches the genius api for a given artist and title - - https://docs.genius.com/#search-h2 - - :returns: json response - """ - search_url = self.base_url + "/search" - data = {"q": title + " " + artist.lower()} - try: - response = requests.get( - search_url, - params=data, - headers=self.headers, - timeout=10, - ) - except requests.RequestException as exc: - self._log.debug("Genius API request failed: {0}", exc) - return None - - try: - return response.json() - except ValueError: - return None - - def replace_br(self, lyrics_div): - for br in lyrics_div.find_all("br"): - br.replace_with("\n") - - def _scrape_lyrics_from_html(self, html): - """Scrape lyrics from a given genius.com html""" - - soup = try_parse_html(html) - if not soup: - return - - # Remove script tags that they put in the middle of the lyrics. - [h.extract() for h in soup("script")] - - # Most of the time, the page contains a div with class="lyrics" where - # all of the lyrics can be found already correctly formatted - # Sometimes, though, it packages the lyrics into separate divs, most - # likely for easier ad placement - - lyrics_divs = soup.find_all("div", {"data-lyrics-container": True}) - if not lyrics_divs: - self._log.debug("Received unusual song page html") - return self._try_extracting_lyrics_from_non_data_lyrics_container( - soup - ) - lyrics = "" - for lyrics_div in lyrics_divs: - self.replace_br(lyrics_div) - lyrics += lyrics_div.get_text() + "\n\n" - while lyrics[-1] == "\n": - lyrics = lyrics[:-1] - return lyrics - - def _try_extracting_lyrics_from_non_data_lyrics_container(self, soup): - """Extract lyrics from a div without attribute data-lyrics-container - This is the second most common layout on genius.com - """ - verse_div = soup.find("div", class_=re.compile("Lyrics__Container")) - if not verse_div: - if soup.find( - "div", - class_=re.compile("LyricsPlaceholder__Message"), - string="This song is an instrumental", - ): - self._log.debug("Detected instrumental") - return INSTRUMENTAL_LYRICS - else: - self._log.debug("Couldn't scrape page using known layouts") - return None - - lyrics_div = verse_div.parent - self.replace_br(lyrics_div) - - ads = lyrics_div.find_all( - "div", class_=re.compile("InreadAd__Container") - ) - for ad in ads: - ad.replace_with("\n") - - footers = lyrics_div.find_all( - "div", class_=re.compile("Lyrics__Footer") - ) - for footer in footers: - footer.replace_with("") - return lyrics_div.get_text() - - -class Tekstowo(DirectBackend): - """Fetch lyrics from Tekstowo.pl.""" - - REQUIRES_BS = True - URL_TEMPLATE = "https://www.tekstowo.pl/piosenka,{},{}.html" - - non_alpha_to_underscore = partial(re.compile(r"\W").sub, "_") +class Html: + collapse_space = partial(re.compile(r"(^| ) +", re.M).sub, r"\1") + expand_br = partial(re.compile(r"\s*]*>\s*", re.I).sub, "\n") + #: two newlines between paragraphs on the same line (musica, letras.mus.br) + merge_blocks = partial(re.compile(r"(?)

]*>").sub, "\n\n") + #: a single new line between paragraphs on separate lines + #: (paroles.net, sweetslyrics.com, lacoccinelle.net) + merge_lines = partial(re.compile(r"

\s+]*>(?!___)").sub, "\n") + #: remove empty divs (lacoccinelle.net) + remove_empty_tags = partial( + re.compile(r"(<(div|span)[^>]*>\s*)").sub, "" + ) + #: remove Google Ads tags (musica.com) + remove_aside = partial(re.compile("