# This file is part of beets. # Copyright 2016, Adrian Sampson. # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. """Fetches, embeds, and displays lyrics.""" from __future__ import annotations import atexit import itertools import math import re import textwrap from contextlib import contextmanager, suppress from dataclasses import dataclass from functools import cached_property, partial, total_ordering from html import unescape from http import HTTPStatus from itertools import groupby from pathlib import Path from typing import TYPE_CHECKING, NamedTuple from urllib.parse import quote, quote_plus, urlencode, urlparse import langdetect import requests from bs4 import BeautifulSoup from unidecode import unidecode import beets from beets import plugins, ui from beets.autotag.distance import string_dist from beets.util.config import sanitize_choices if TYPE_CHECKING: from collections.abc import Iterable, Iterator from beets.importer import ImportTask from beets.library import Item, Library from beets.logging import BeetsLogger as Logger from ._typing import ( GeniusAPI, GoogleCustomSearchAPI, JSONDict, LRCLibAPI, TranslatorAPI, ) USER_AGENT = f"beets/{beets.__version__}" INSTRUMENTAL_LYRICS = "[Instrumental]" class NotFoundError(requests.exceptions.HTTPError): pass class CaptchaError(requests.exceptions.HTTPError): pass class TimeoutSession(requests.Session): def request(self, *args, **kwargs): """Wrap the request method to raise an exception on HTTP errors.""" kwargs.setdefault("timeout", 10) r = super().request(*args, **kwargs) if r.status_code == HTTPStatus.NOT_FOUND: raise NotFoundError("HTTP Error: Not Found", response=r) if 300 <= r.status_code < 400: raise CaptchaError("Captcha is required", response=r) r.raise_for_status() return r r_session = TimeoutSession() r_session.headers.update({"User-Agent": USER_AGENT}) @atexit.register def close_session(): """Close the requests session on shut down.""" r_session.close() # Utilities. def search_pairs(item): """Yield a pairs of artists and titles to search for. The first item in the pair is the name of the artist, the second item is a list of song names. In addition to the artist and title obtained from the `item` the method tries to strip extra information like paranthesized suffixes and featured artists from the strings and add them as candidates. The artist sort name is added as a fallback candidate to help in cases where artist name includes special characters or is in a non-latin script. The method also tries to split multiple titles separated with `/`. """ def generate_alternatives(string, patterns): """Generate string alternatives by extracting first matching group for each given pattern. """ alternatives = [string] for pattern in patterns: match = re.search(pattern, string, re.IGNORECASE) if match: alternatives.append(match.group(1)) return alternatives title, artist, artist_sort = ( item.title.strip(), item.artist.strip(), item.artist_sort.strip(), ) if not title or not artist: return () patterns = [ # Remove any featuring artists from the artists name rf"(.*?) {plugins.feat_tokens()}" ] # Skip various artists artists = [] lower_artist = artist.lower() if "various" not in lower_artist: artists.extend(generate_alternatives(artist, patterns)) # Use the artist_sort as fallback only if it differs from artist to avoid # repeated remote requests with the same search terms artist_sort_lower = artist_sort.lower() if ( artist_sort and lower_artist != artist_sort_lower and "various" not in artist_sort_lower ): artists.append(artist_sort) patterns = [ # Remove a parenthesized suffix from a title string. Common # examples include (live), (remix), and (acoustic). r"(.+?)\s+[(].*[)]$", # Remove any featuring artists from the title rf"(.*?) {plugins.feat_tokens(for_artist=False)}", # Remove part of title after colon ':' for songs with subtitles r"(.+?)\s*:.*", ] titles = generate_alternatives(title, patterns) # Check for a dual song (e.g. Pink Floyd - Speak to Me / Breathe) # and each of them. multi_titles = [] for title in titles: multi_titles.append([title]) if " / " in title: multi_titles.append([x.strip() for x in title.split(" / ")]) return itertools.product(artists, multi_titles) def slug(text: str) -> str: """Make a URL-safe, human-readable version of the given text This will do the following: 1. decode unicode characters into ASCII 2. shift everything to lowercase 3. strip whitespace 4. replace other non-word characters with dashes 5. strip extra dashes """ return re.sub(r"\W+", "-", unidecode(text).lower().strip()).strip("-") class RequestHandler: _log: Logger def debug(self, message: str, *args) -> None: """Log a debug message with the class name.""" self._log.debug(f"{self.__class__.__name__}: {message}", *args) def info(self, message: str, *args) -> None: """Log an info message with the class name.""" self._log.info(f"{self.__class__.__name__}: {message}", *args) def warn(self, message: str, *args) -> None: """Log warning with the class name.""" self._log.warning(f"{self.__class__.__name__}: {message}", *args) @staticmethod def format_url(url: str, params: JSONDict | None) -> str: if not params: return url return f"{url}?{urlencode(params)}" def fetch_text( self, url: str, params: JSONDict | None = None, **kwargs ) -> str: """Return text / HTML data from the given URL. Set the encoding to None to let requests handle it because some sites set it incorrectly. """ url = self.format_url(url, params) self.debug("Fetching HTML from {}", url) r = r_session.get(url, **kwargs) r.encoding = None return r.text def fetch_json(self, url: str, params: JSONDict | None = None, **kwargs): """Return JSON data from the given URL.""" url = self.format_url(url, params) self.debug("Fetching JSON from {}", url) return r_session.get(url, **kwargs).json() def post_json(self, url: str, params: JSONDict | None = None, **kwargs): """Send POST request and return JSON response.""" url = self.format_url(url, params) self.debug("Posting JSON to {}", url) return r_session.post(url, **kwargs).json() @contextmanager def handle_request(self) -> Iterator[None]: try: yield except requests.JSONDecodeError: self.warn("Could not decode response JSON data") except requests.RequestException as exc: self.warn("Request error: {}", exc) class BackendClass(type): @property def name(cls) -> str: """Return lowercase name of the backend class.""" return cls.__name__.lower() class Backend(RequestHandler, metaclass=BackendClass): def __init__(self, config, log): self._log = log self.config = config def fetch( self, artist: str, title: str, album: str, length: int ) -> tuple[str, str] | None: raise NotImplementedError @dataclass @total_ordering class LRCLyrics: #: Percentage tolerance for max duration difference between lyrics and item. DURATION_DIFF_TOLERANCE = 0.05 target_duration: float id: int duration: float instrumental: bool plain: str synced: str | None def __le__(self, other: LRCLyrics) -> bool: """Compare two lyrics items by their score.""" return self.dist < other.dist @classmethod def make( cls, candidate: LRCLibAPI.Item, target_duration: float ) -> LRCLyrics: return cls( target_duration, candidate["id"], candidate["duration"] or 0.0, candidate["instrumental"], candidate["plainLyrics"], candidate["syncedLyrics"], ) @cached_property def duration_dist(self) -> float: """Return the absolute difference between lyrics and target duration.""" return abs(self.duration - self.target_duration) @cached_property def is_valid(self) -> bool: """Return whether the lyrics item is valid. Lyrics duration must be within the tolerance defined by :attr:`DURATION_DIFF_TOLERANCE`. """ return ( self.duration_dist <= self.target_duration * self.DURATION_DIFF_TOLERANCE ) @cached_property def dist(self) -> tuple[bool, float]: """Distance/score of the given lyrics item. Return a tuple with the following values: 1. Absolute difference between lyrics and target duration 2. Boolean telling whether synced lyrics are available. Best lyrics match is the one that has the closest duration to ``target_duration`` and has synced lyrics available. """ return not self.synced, self.duration_dist def get_text(self, want_synced: bool) -> str: if self.instrumental: return INSTRUMENTAL_LYRICS if want_synced and self.synced: return "\n".join(map(str.strip, self.synced.splitlines())) return self.plain class LRCLib(Backend): """Fetch lyrics from the LRCLib API.""" BASE_URL = "https://lrclib.net/api" GET_URL = f"{BASE_URL}/get" SEARCH_URL = f"{BASE_URL}/search" def fetch_candidates( self, artist: str, title: str, album: str, length: int ) -> Iterator[list[LRCLibAPI.Item]]: """Yield lyrics candidates for the given song data. I found that the ``/get`` endpoint sometimes returns inaccurate or unsynced lyrics, while ``search`` yields more suitable candidates. Therefore, we prioritize the latter and rank the results using our own algorithm. If the search does not give suitable lyrics, we fall back to the ``/get`` endpoint. Return an iterator over lists of candidates. """ base_params = {"artist_name": artist, "track_name": title} get_params = {**base_params, "duration": length} if album: get_params["album_name"] = album yield self.fetch_json(self.SEARCH_URL, params=base_params) with suppress(NotFoundError): yield [self.fetch_json(self.GET_URL, params=get_params)] @classmethod def pick_best_match(cls, lyrics: Iterable[LRCLyrics]) -> LRCLyrics | None: """Return best matching lyrics item from the given list.""" return min((li for li in lyrics if li.is_valid), default=None) def fetch( self, artist: str, title: str, album: str, length: int ) -> tuple[str, str] | None: """Fetch lyrics text for the given song data.""" evaluate_item = partial(LRCLyrics.make, target_duration=length) for group in self.fetch_candidates(artist, title, album, length): candidates = [evaluate_item(item) for item in group] if item := self.pick_best_match(candidates): lyrics = item.get_text(self.config["synced"]) return lyrics, f"{self.GET_URL}/{item.id}" return None class MusiXmatch(Backend): URL_TEMPLATE = "https://www.musixmatch.com/lyrics/{}/{}" REPLACEMENTS = { r"\s+": "-", "<": "Less_Than", ">": "Greater_Than", "#": "Number_", r"[\[\{]": "(", r"[\]\}]": ")", } @classmethod def encode(cls, text: str) -> str: for old, new in cls.REPLACEMENTS.items(): text = re.sub(old, new, text) return quote(unidecode(text)) @classmethod def build_url(cls, *args: str) -> str: return cls.URL_TEMPLATE.format(*map(cls.encode, args)) def fetch(self, artist: str, title: str, *_) -> tuple[str, str] | None: url = self.build_url(artist, title) html = self.fetch_text(url) if "We detected that your IP is blocked" in html: self.warn("Failed: Blocked IP address") return None html_parts = html.split('

]+>|

.*", "", html_part)) lyrics = "\n".join(lyrics_parts) lyrics = lyrics.strip(',"').replace("\\n", "\n") # another odd case: sometimes only that string remains, for # missing songs. this seems to happen after being blocked # above, when filling in the CAPTCHA. if "Instant lyrics for all your music." in lyrics: return None # sometimes there are non-existent lyrics with some content if "Lyrics | Musixmatch" in lyrics: return None return lyrics, url class Html: collapse_space = partial(re.compile(r"(^| ) +", re.M).sub, r"\1") expand_br = partial(re.compile(r"\s*]*>\s*", re.I).sub, "\n") #: two newlines between paragraphs on the same line (musica, letras.mus.br) merge_blocks = partial(re.compile(r"(?)

]*>").sub, "\n\n") #: a single new line between paragraphs on separate lines #: (paroles.net, sweetslyrics.com, lacoccinelle.net) merge_lines = partial(re.compile(r"

\s+]*>(?!___)").sub, "\n") #: remove empty divs (lacoccinelle.net) remove_empty_tags = partial( re.compile(r"(<(div|span)[^>]*>\s*)").sub, "" ) #: remove Google Ads tags (musica.com) remove_aside = partial(re.compile("