# This file is part of beets. # Copyright 2016, Adrian Sampson. # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. """Fetches, embeds, and displays lyrics.""" from __future__ import annotations import atexit import errno import itertools import math import os.path import re from contextlib import contextmanager, suppress from dataclasses import dataclass from functools import cached_property, partial, total_ordering from html import unescape from http import HTTPStatus from typing import TYPE_CHECKING, ClassVar, Iterable, Iterator, NamedTuple from urllib.parse import quote, urlencode import requests from unidecode import unidecode import beets from beets import plugins, ui from beets.autotag.hooks import string_dist if TYPE_CHECKING: from beets.importer import ImportTask from beets.library import Item from ._typing import GeniusAPI, GoogleCustomSearchAPI, JSONDict, LRCLibAPI try: from bs4 import BeautifulSoup HAS_BEAUTIFUL_SOUP = True except ImportError: HAS_BEAUTIFUL_SOUP = False try: import langdetect HAS_LANGDETECT = True except ImportError: HAS_LANGDETECT = False BREAK_RE = re.compile(r"\n?\s*]*)*>\s*\n?", re.I) USER_AGENT = f"beets/{beets.__version__}" INSTRUMENTAL_LYRICS = "[Instrumental]" # The content for the base index.rst generated in ReST mode. REST_INDEX_TEMPLATE = """Lyrics ====== * :ref:`Song index ` * :ref:`search` Artist index: .. toctree:: :maxdepth: 1 :glob: artists/* """ # The content for the base conf.py generated. REST_CONF_TEMPLATE = """# -*- coding: utf-8 -*- master_doc = 'index' project = 'Lyrics' copyright = 'none' author = 'Various Authors' latex_documents = [ (master_doc, 'Lyrics.tex', project, author, 'manual'), ] epub_title = project epub_author = author epub_publisher = author epub_copyright = copyright epub_exclude_files = ['search.html'] epub_tocdepth = 1 epub_tocdup = False """ class NotFoundError(requests.exceptions.HTTPError): pass class TimeoutSession(requests.Session): def request(self, *args, **kwargs): """Wrap the request method to raise an exception on HTTP errors.""" kwargs.setdefault("timeout", 10) r = super().request(*args, **kwargs) if r.status_code == HTTPStatus.NOT_FOUND: raise NotFoundError("HTTP Error: Not Found", response=r) r.raise_for_status() return r r_session = TimeoutSession() r_session.headers.update({"User-Agent": USER_AGENT}) @atexit.register def close_session(): """Close the requests session on shut down.""" r_session.close() # Utilities. def search_pairs(item): """Yield a pairs of artists and titles to search for. The first item in the pair is the name of the artist, the second item is a list of song names. In addition to the artist and title obtained from the `item` the method tries to strip extra information like paranthesized suffixes and featured artists from the strings and add them as candidates. The artist sort name is added as a fallback candidate to help in cases where artist name includes special characters or is in a non-latin script. The method also tries to split multiple titles separated with `/`. """ def generate_alternatives(string, patterns): """Generate string alternatives by extracting first matching group for each given pattern. """ alternatives = [string] for pattern in patterns: match = re.search(pattern, string, re.IGNORECASE) if match: alternatives.append(match.group(1)) return alternatives title, artist, artist_sort = ( item.title.strip(), item.artist.strip(), item.artist_sort.strip(), ) if not title or not artist: return () patterns = [ # Remove any featuring artists from the artists name rf"(.*?) {plugins.feat_tokens()}" ] artists = generate_alternatives(artist, patterns) # Use the artist_sort as fallback only if it differs from artist to avoid # repeated remote requests with the same search terms if artist_sort and artist.lower() != artist_sort.lower(): artists.append(artist_sort) patterns = [ # Remove a parenthesized suffix from a title string. Common # examples include (live), (remix), and (acoustic). r"(.+?)\s+[(].*[)]$", # Remove any featuring artists from the title r"(.*?) {}".format(plugins.feat_tokens(for_artist=False)), # Remove part of title after colon ':' for songs with subtitles r"(.+?)\s*:.*", ] titles = generate_alternatives(title, patterns) # Check for a dual song (e.g. Pink Floyd - Speak to Me / Breathe) # and each of them. multi_titles = [] for title in titles: multi_titles.append([title]) if "/" in title: multi_titles.append([x.strip() for x in title.split("/")]) return itertools.product(artists, multi_titles) def slug(text: str) -> str: """Make a URL-safe, human-readable version of the given text This will do the following: 1. decode unicode characters into ASCII 2. shift everything to lowercase 3. strip whitespace 4. replace other non-word characters with dashes 5. strip extra dashes """ return re.sub(r"\W+", "-", unidecode(text).lower().strip()).strip("-") class RequestHandler: _log: beets.logging.Logger def debug(self, message: str, *args) -> None: """Log a debug message with the class name.""" self._log.debug(f"{self.__class__.__name__}: {message}", *args) def info(self, message: str, *args) -> None: """Log an info message with the class name.""" self._log.info(f"{self.__class__.__name__}: {message}", *args) def warn(self, message: str, *args) -> None: """Log warning with the class name.""" self._log.warning(f"{self.__class__.__name__}: {message}", *args) @staticmethod def format_url(url: str, params: JSONDict | None) -> str: if not params: return url return f"{url}?{urlencode(params)}" def fetch_text( self, url: str, params: JSONDict | None = None, **kwargs ) -> str: """Return text / HTML data from the given URL.""" url = self.format_url(url, params) self.debug("Fetching HTML from {}", url) return r_session.get(url, **kwargs).text def fetch_json(self, url: str, params: JSONDict | None = None, **kwargs): """Return JSON data from the given URL.""" url = self.format_url(url, params) self.debug("Fetching JSON from {}", url) return r_session.get(url, **kwargs).json() @contextmanager def handle_request(self) -> Iterator[None]: try: yield except requests.JSONDecodeError: self.warn("Could not decode response JSON data") except requests.RequestException as exc: self.warn("Request error: {}", exc) class Backend(RequestHandler): REQUIRES_BS = False def __init__(self, config, log): self._log = log self.config = config def fetch( self, artist: str, title: str, album: str, length: int ) -> str | None: raise NotImplementedError @dataclass @total_ordering class LRCLyrics: #: Percentage tolerance for max duration difference between lyrics and item. DURATION_DIFF_TOLERANCE = 0.05 target_duration: float duration: float instrumental: bool plain: str synced: str | None def __le__(self, other: LRCLyrics) -> bool: """Compare two lyrics items by their score.""" return self.dist < other.dist @classmethod def make( cls, candidate: LRCLibAPI.Item, target_duration: float ) -> LRCLyrics: return cls( target_duration, candidate["duration"] or 0.0, candidate["instrumental"], candidate["plainLyrics"], candidate["syncedLyrics"], ) @cached_property def duration_dist(self) -> float: """Return the absolute difference between lyrics and target duration.""" return abs(self.duration - self.target_duration) @cached_property def is_valid(self) -> bool: """Return whether the lyrics item is valid. Lyrics duration must be within the tolerance defined by :attr:`DURATION_DIFF_TOLERANCE`. """ return ( self.duration_dist <= self.target_duration * self.DURATION_DIFF_TOLERANCE ) @cached_property def dist(self) -> tuple[bool, float]: """Distance/score of the given lyrics item. Return a tuple with the following values: 1. Absolute difference between lyrics and target duration 2. Boolean telling whether synced lyrics are available. Best lyrics match is the one that has the closest duration to ``target_duration`` and has synced lyrics available. """ return not self.synced, self.duration_dist def get_text(self, want_synced: bool) -> str: if self.instrumental: return INSTRUMENTAL_LYRICS if want_synced and self.synced: return "\n".join(map(str.strip, self.synced.splitlines())) return self.plain class LRCLib(Backend): """Fetch lyrics from the LRCLib API.""" BASE_URL = "https://lrclib.net/api" GET_URL = f"{BASE_URL}/get" SEARCH_URL = f"{BASE_URL}/search" def fetch_candidates( self, artist: str, title: str, album: str, length: int ) -> Iterator[list[LRCLibAPI.Item]]: """Yield lyrics candidates for the given song data. I found that the ``/get`` endpoint sometimes returns inaccurate or unsynced lyrics, while ``search`` yields more suitable candidates. Therefore, we prioritize the latter and rank the results using our own algorithm. If the search does not give suitable lyrics, we fall back to the ``/get`` endpoint. Return an iterator over lists of candidates. """ base_params = {"artist_name": artist, "track_name": title} get_params = {**base_params, "duration": length} if album: get_params["album_name"] = album yield self.fetch_json(self.SEARCH_URL, params=base_params) with suppress(NotFoundError): yield [self.fetch_json(self.GET_URL, params=get_params)] @classmethod def pick_best_match(cls, lyrics: Iterable[LRCLyrics]) -> LRCLyrics | None: """Return best matching lyrics item from the given list.""" return min((li for li in lyrics if li.is_valid), default=None) def fetch( self, artist: str, title: str, album: str, length: int ) -> str | None: """Fetch lyrics text for the given song data.""" evaluate_item = partial(LRCLyrics.make, target_duration=length) for group in self.fetch_candidates(artist, title, album, length): candidates = [evaluate_item(item) for item in group] if item := self.pick_best_match(candidates): return item.get_text(self.config["synced"]) return None class DirectBackend(Backend): """A backend for fetching lyrics directly.""" URL_TEMPLATE: ClassVar[str] #: May include formatting placeholders @classmethod def encode(cls, text: str) -> str: """Encode the string for inclusion in a URL.""" raise NotImplementedError @classmethod def build_url(cls, *args: str) -> str: return cls.URL_TEMPLATE.format(*map(cls.encode, args)) class MusiXmatch(DirectBackend): URL_TEMPLATE = "https://www.musixmatch.com/lyrics/{}/{}" REPLACEMENTS = { r"\s+": "-", "<": "Less_Than", ">": "Greater_Than", "#": "Number_", r"[\[\{]": "(", r"[\]\}]": ")", } @classmethod def encode(cls, text: str) -> str: for old, new in cls.REPLACEMENTS.items(): text = re.sub(old, new, text) return quote(unidecode(text)) def fetch(self, artist: str, title: str, *_) -> str | None: url = self.build_url(artist, title) html = self.fetch_text(url) if "We detected that your IP is blocked" in html: self.warn("Failed: Blocked IP address") return None html_parts = html.split('

]+>|

.*", "", html_part)) lyrics = "\n".join(lyrics_parts) lyrics = lyrics.strip(',"').replace("\\n", "\n") # another odd case: sometimes only that string remains, for # missing songs. this seems to happen after being blocked # above, when filling in the CAPTCHA. if "Instant lyrics for all your music." in lyrics: return None # sometimes there are non-existent lyrics with some content if "Lyrics | Musixmatch" in lyrics: return None return lyrics class SearchResult(NamedTuple): artist: str title: str url: str class SearchBackend(Backend): REQUIRES_BS = True @cached_property def dist_thresh(self) -> float: return self.config["dist_thresh"].get(float) def check_match( self, target_artist: str, target_title: str, result: SearchResult ) -> bool: """Check if the given search result is a 'good enough' match.""" max_dist = max( string_dist(target_artist, result.artist), string_dist(target_title, result.title), ) if (max_dist := round(max_dist, 2)) <= self.dist_thresh: return True if math.isclose(max_dist, self.dist_thresh, abs_tol=0.4): # log out the candidate that did not make it but was close. # This may show a matching candidate with some noise in the name self.debug( "({}, {}) does not match ({}, {}) but dist was close: {:.2f}", result.artist, result.title, target_artist, target_title, max_dist, ) return False def search(self, artist: str, title: str) -> Iterable[SearchResult]: """Search for the given query and yield search results.""" raise NotImplementedError def get_results(self, artist: str, title: str) -> Iterable[SearchResult]: check_match = partial(self.check_match, artist, title) for candidate in self.search(artist, title): if check_match(candidate): yield candidate def fetch(self, artist: str, title: str, *_) -> str | None: """Fetch lyrics for the given artist and title.""" for result in self.get_results(artist, title): if (html := self.fetch_text(result.url)) and ( lyrics := self.scrape(html) ): return lyrics return None @classmethod def scrape(cls, html: str) -> str | None: """Scrape the lyrics from the given HTML.""" raise NotImplementedError class Genius(SearchBackend): """Fetch lyrics from Genius via genius-api. Because genius doesn't allow accessing lyrics via the api, we first query the api for a url matching our artist & title, then scrape the HTML text for the JSON data containing the lyrics. """ SEARCH_URL = "https://api.genius.com/search" LYRICS_IN_JSON_RE = re.compile(r'(?<=.\\"html\\":\\").*?(?=(? dict[str, str]: return {"Authorization": f'Bearer {self.config["genius_api_key"]}'} def search(self, artist: str, title: str) -> Iterable[SearchResult]: search_data: GeniusAPI.Search = self.fetch_json( self.SEARCH_URL, params={"q": f"{artist} {title}"}, headers=self.headers, ) for r in (hit["result"] for hit in search_data["response"]["hits"]): yield SearchResult(r["artist_names"], r["title"], r["url"]) @classmethod def scrape(cls, html: str) -> str | None: if m := cls.LYRICS_IN_JSON_RE.search(html): html_text = cls.remove_backslash(m[0]).replace(r"\n", "\n") return get_soup(html_text).get_text().strip() return None class Tekstowo(DirectBackend): """Fetch lyrics from Tekstowo.pl.""" REQUIRES_BS = True URL_TEMPLATE = "https://www.tekstowo.pl/piosenka,{},{}.html" non_alpha_to_underscore = partial(re.compile(r"\W").sub, "_") @classmethod def encode(cls, text: str) -> str: return cls.non_alpha_to_underscore(unidecode(text.lower())) def fetch(self, artist: str, title: str, *_) -> str | None: # We are expecting to receive a 404 since we are guessing the URL. # Thus suppress the error so that it does not end up in the logs. with suppress(NotFoundError): return self.scrape(self.fetch_text(self.build_url(artist, title))) return None @classmethod def scrape(cls, html: str) -> str | None: soup = get_soup(html) if lyrics_div := soup.select_one("div.song-text > div.inner-text"): return lyrics_div.get_text() return None collapse_newlines = partial(re.compile(r"\n{3,}").sub, r"\n\n") def _scrape_strip_cruft(html: str) -> str: """Clean up HTML""" html = unescape(html) html = html.replace("\r", "\n") # Normalize EOL. html = re.sub(r" +", " ", html) # Whitespaces collapse. html = BREAK_RE.sub("\n", html) #
eats up surrounding '\n'. html = re.sub(r"(?s)<(script).*?", "", html) # Strip script tags. html = re.sub("\u2005", " ", html) # replace unicode with regular space html = re.sub("