# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Fetches, embeds, and displays lyrics."""
from __future__ import annotations
import atexit
import itertools
import math
import re
import textwrap
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import cached_property, partial, total_ordering
from html import unescape
from http import HTTPStatus
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Iterator, NamedTuple
from urllib.parse import quote, quote_plus, urlencode, urlparse
import langdetect
import requests
from bs4 import BeautifulSoup
from unidecode import unidecode
import beets
from beets import plugins, ui
from beets.autotag.distance import string_dist
from beets.util.config import sanitize_choices
if TYPE_CHECKING:
from beets.importer import ImportTask
from beets.library import Item, Library
from beets.logging import BeetsLogger as Logger
from ._typing import (
GeniusAPI,
GoogleCustomSearchAPI,
JSONDict,
LRCLibAPI,
TranslatorAPI,
)
USER_AGENT = f"beets/{beets.__version__}"
INSTRUMENTAL_LYRICS = "[Instrumental]"
class NotFoundError(requests.exceptions.HTTPError):
pass
class CaptchaError(requests.exceptions.HTTPError):
pass
class TimeoutSession(requests.Session):
def request(self, *args, **kwargs):
"""Wrap the request method to raise an exception on HTTP errors."""
kwargs.setdefault("timeout", 10)
r = super().request(*args, **kwargs)
if r.status_code == HTTPStatus.NOT_FOUND:
raise NotFoundError("HTTP Error: Not Found", response=r)
if 300 <= r.status_code < 400:
raise CaptchaError("Captcha is required", response=r)
r.raise_for_status()
return r
r_session = TimeoutSession()
r_session.headers.update({"User-Agent": USER_AGENT})
@atexit.register
def close_session():
"""Close the requests session on shut down."""
r_session.close()
# Utilities.
def search_pairs(item):
"""Yield a pairs of artists and titles to search for.
The first item in the pair is the name of the artist, the second
item is a list of song names.
In addition to the artist and title obtained from the `item` the
method tries to strip extra information like paranthesized suffixes
and featured artists from the strings and add them as candidates.
The artist sort name is added as a fallback candidate to help in
cases where artist name includes special characters or is in a
non-latin script.
The method also tries to split multiple titles separated with `/`.
"""
def generate_alternatives(string, patterns):
"""Generate string alternatives by extracting first matching group for
each given pattern.
"""
alternatives = [string]
for pattern in patterns:
match = re.search(pattern, string, re.IGNORECASE)
if match:
alternatives.append(match.group(1))
return alternatives
title, artist, artist_sort = (
item.title.strip(),
item.artist.strip(),
item.artist_sort.strip(),
)
if not title or not artist:
return ()
patterns = [
# Remove any featuring artists from the artists name
rf"(.*?) {plugins.feat_tokens()}"
]
# Skip various artists
artists = []
lower_artist = artist.lower()
if "various" not in lower_artist:
artists.extend(generate_alternatives(artist, patterns))
# Use the artist_sort as fallback only if it differs from artist to avoid
# repeated remote requests with the same search terms
artist_sort_lower = artist_sort.lower()
if (
artist_sort
and lower_artist != artist_sort_lower
and "various" not in artist_sort_lower
):
artists.append(artist_sort)
patterns = [
# Remove a parenthesized suffix from a title string. Common
# examples include (live), (remix), and (acoustic).
r"(.+?)\s+[(].*[)]$",
# Remove any featuring artists from the title
rf"(.*?) {plugins.feat_tokens(for_artist=False)}",
# Remove part of title after colon ':' for songs with subtitles
r"(.+?)\s*:.*",
]
titles = generate_alternatives(title, patterns)
# Check for a dual song (e.g. Pink Floyd - Speak to Me / Breathe)
# and each of them.
multi_titles = []
for title in titles:
multi_titles.append([title])
if " / " in title:
multi_titles.append([x.strip() for x in title.split(" / ")])
return itertools.product(artists, multi_titles)
def slug(text: str) -> str:
"""Make a URL-safe, human-readable version of the given text
This will do the following:
1. decode unicode characters into ASCII
2. shift everything to lowercase
3. strip whitespace
4. replace other non-word characters with dashes
5. strip extra dashes
"""
return re.sub(r"\W+", "-", unidecode(text).lower().strip()).strip("-")
class RequestHandler:
_log: Logger
def debug(self, message: str, *args) -> None:
"""Log a debug message with the class name."""
self._log.debug(f"{self.__class__.__name__}: {message}", *args)
def info(self, message: str, *args) -> None:
"""Log an info message with the class name."""
self._log.info(f"{self.__class__.__name__}: {message}", *args)
def warn(self, message: str, *args) -> None:
"""Log warning with the class name."""
self._log.warning(f"{self.__class__.__name__}: {message}", *args)
@staticmethod
def format_url(url: str, params: JSONDict | None) -> str:
if not params:
return url
return f"{url}?{urlencode(params)}"
def fetch_text(
self, url: str, params: JSONDict | None = None, **kwargs
) -> str:
"""Return text / HTML data from the given URL.
Set the encoding to None to let requests handle it because some sites
set it incorrectly.
"""
url = self.format_url(url, params)
self.debug("Fetching HTML from {}", url)
r = r_session.get(url, **kwargs)
r.encoding = None
return r.text
def fetch_json(self, url: str, params: JSONDict | None = None, **kwargs):
"""Return JSON data from the given URL."""
url = self.format_url(url, params)
self.debug("Fetching JSON from {}", url)
return r_session.get(url, **kwargs).json()
def post_json(self, url: str, params: JSONDict | None = None, **kwargs):
"""Send POST request and return JSON response."""
url = self.format_url(url, params)
self.debug("Posting JSON to {}", url)
return r_session.post(url, **kwargs).json()
@contextmanager
def handle_request(self) -> Iterator[None]:
try:
yield
except requests.JSONDecodeError:
self.warn("Could not decode response JSON data")
except requests.RequestException as exc:
self.warn("Request error: {}", exc)
class BackendClass(type):
@property
def name(cls) -> str:
"""Return lowercase name of the backend class."""
return cls.__name__.lower()
class Backend(RequestHandler, metaclass=BackendClass):
def __init__(self, config, log):
self._log = log
self.config = config
def fetch(
self, artist: str, title: str, album: str, length: int
) -> tuple[str, str] | None:
raise NotImplementedError
@dataclass
@total_ordering
class LRCLyrics:
#: Percentage tolerance for max duration difference between lyrics and item.
DURATION_DIFF_TOLERANCE = 0.05
target_duration: float
id: int
duration: float
instrumental: bool
plain: str
synced: str | None
def __le__(self, other: LRCLyrics) -> bool:
"""Compare two lyrics items by their score."""
return self.dist < other.dist
@classmethod
def make(
cls, candidate: LRCLibAPI.Item, target_duration: float
) -> LRCLyrics:
return cls(
target_duration,
candidate["id"],
candidate["duration"] or 0.0,
candidate["instrumental"],
candidate["plainLyrics"],
candidate["syncedLyrics"],
)
@cached_property
def duration_dist(self) -> float:
"""Return the absolute difference between lyrics and target duration."""
return abs(self.duration - self.target_duration)
@cached_property
def is_valid(self) -> bool:
"""Return whether the lyrics item is valid.
Lyrics duration must be within the tolerance defined by
:attr:`DURATION_DIFF_TOLERANCE`.
"""
return (
self.duration_dist
<= self.target_duration * self.DURATION_DIFF_TOLERANCE
)
@cached_property
def dist(self) -> tuple[bool, float]:
"""Distance/score of the given lyrics item.
Return a tuple with the following values:
1. Absolute difference between lyrics and target duration
2. Boolean telling whether synced lyrics are available.
Best lyrics match is the one that has the closest duration to
``target_duration`` and has synced lyrics available.
"""
return not self.synced, self.duration_dist
def get_text(self, want_synced: bool) -> str:
if self.instrumental:
return INSTRUMENTAL_LYRICS
if want_synced and self.synced:
return "\n".join(map(str.strip, self.synced.splitlines()))
return self.plain
class LRCLib(Backend):
"""Fetch lyrics from the LRCLib API."""
BASE_URL = "https://lrclib.net/api"
GET_URL = f"{BASE_URL}/get"
SEARCH_URL = f"{BASE_URL}/search"
def fetch_candidates(
self, artist: str, title: str, album: str, length: int
) -> Iterator[list[LRCLibAPI.Item]]:
"""Yield lyrics candidates for the given song data.
I found that the ``/get`` endpoint sometimes returns inaccurate or
unsynced lyrics, while ``search`` yields more suitable candidates.
Therefore, we prioritize the latter and rank the results using our own
algorithm. If the search does not give suitable lyrics, we fall back to
the ``/get`` endpoint.
Return an iterator over lists of candidates.
"""
base_params = {"artist_name": artist, "track_name": title}
get_params = {**base_params, "duration": length}
if album:
get_params["album_name"] = album
yield self.fetch_json(self.SEARCH_URL, params=base_params)
with suppress(NotFoundError):
yield [self.fetch_json(self.GET_URL, params=get_params)]
@classmethod
def pick_best_match(cls, lyrics: Iterable[LRCLyrics]) -> LRCLyrics | None:
"""Return best matching lyrics item from the given list."""
return min((li for li in lyrics if li.is_valid), default=None)
def fetch(
self, artist: str, title: str, album: str, length: int
) -> tuple[str, str] | None:
"""Fetch lyrics text for the given song data."""
evaluate_item = partial(LRCLyrics.make, target_duration=length)
for group in self.fetch_candidates(artist, title, album, length):
candidates = [evaluate_item(item) for item in group]
if item := self.pick_best_match(candidates):
lyrics = item.get_text(self.config["synced"])
return lyrics, f"{self.GET_URL}/{item.id}"
return None
class MusiXmatch(Backend):
URL_TEMPLATE = "https://www.musixmatch.com/lyrics/{}/{}"
REPLACEMENTS = {
r"\s+": "-",
"<": "Less_Than",
">": "Greater_Than",
"#": "Number_",
r"[\[\{]": "(",
r"[\]\}]": ")",
}
@classmethod
def encode(cls, text: str) -> str:
for old, new in cls.REPLACEMENTS.items():
text = re.sub(old, new, text)
return quote(unidecode(text))
@classmethod
def build_url(cls, *args: str) -> str:
return cls.URL_TEMPLATE.format(*map(cls.encode, args))
def fetch(self, artist: str, title: str, *_) -> tuple[str, str] | None:
url = self.build_url(artist, title)
html = self.fetch_text(url)
if "We detected that your IP is blocked" in html:
self.warn("Failed: Blocked IP address")
return None
html_parts = html.split('
]+>|
.*", "", html_part))
lyrics = "\n".join(lyrics_parts)
lyrics = lyrics.strip(',"').replace("\\n", "\n")
# another odd case: sometimes only that string remains, for
# missing songs. this seems to happen after being blocked
# above, when filling in the CAPTCHA.
if "Instant lyrics for all your music." in lyrics:
return None
# sometimes there are non-existent lyrics with some content
if "Lyrics | Musixmatch" in lyrics:
return None
return lyrics, url
class Html:
collapse_space = partial(re.compile(r"(^| ) +", re.M).sub, r"\1")
expand_br = partial(re.compile(r"\s* ]*>\s*", re.I).sub, "\n")
#: two newlines between paragraphs on the same line (musica, letras.mus.br)
merge_blocks = partial(re.compile(r"(?)
]*>").sub, "\n\n")
#: a single new line between paragraphs on separate lines
#: (paroles.net, sweetslyrics.com, lacoccinelle.net)
merge_lines = partial(re.compile(r"