diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 748cf24d1..333706dc7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: if: matrix.platform == 'ubuntu-latest' run: | sudo apt update - sudo apt install ffmpeg gobject-introspection libcairo2-dev libgirepository-2.0-dev pandoc + sudo apt install ffmpeg gobject-introspection libcairo2-dev libgirepository-2.0-dev pandoc imagemagick - name: Get changed lyrics files id: lyrics-update @@ -60,7 +60,7 @@ jobs: env: LYRICS_UPDATED: ${{ steps.lyrics-update.outputs.any_changed }} run: | - poetry install --extras=autobpm --extras=lyrics --extras=docs --extras=replaygain --extras=reflink + poetry install --extras=autobpm --extras=lyrics --extras=docs --extras=replaygain --extras=reflink --extras=fetchart poe docs poe test-with-coverage diff --git a/beets/test/helper.py b/beets/test/helper.py index a24836e84..b86db5b23 100644 --- a/beets/test/helper.py +++ b/beets/test/helper.py @@ -886,20 +886,43 @@ class FetchImageHelper: def run(self, *args, **kwargs): super().run(*args, **kwargs) - IMAGEHEADER = { + IMAGEHEADER: dict[str, bytes] = { "image/jpeg": b"\xff\xd8\xff" + b"\x00" * 3 + b"JFIF", "image/png": b"\211PNG\r\n\032\n", + "image/gif": b"GIF89a", + # dummy type that is definitely not a valid image content type + "image/watercolour": b"watercolour", + "text/html": ( + b"\n\n
\n\n" + b"\n\n" + ), } - def mock_response(self, url, content_type="image/jpeg", file_type=None): + def mock_response( + self, + url: str, + content_type: str = "image/jpeg", + file_type: None | str = None, + ) -> None: + # Potentially return a file of a type that differs from the + # server-advertised content type to mimic misbehaving servers. if file_type is None: file_type = content_type + + try: + # imghdr reads 32 bytes + header = self.IMAGEHEADER[file_type].ljust(32, b"\x00") + except KeyError: + # If we can't return a file that looks like real file of the requested + # type, better fail the test than returning something else, which might + # violate assumption made when writing a test. + raise AssertionError(f"Mocking {file_type} responses not supported") + responses.add( responses.GET, url, content_type=content_type, - # imghdr reads 32 bytes - body=self.IMAGEHEADER.get(file_type, b"").ljust(32, b"\x00"), + body=header, ) diff --git a/beetsplug/fetchart.py b/beetsplug/fetchart.py index 5451b4dbb..3473fe08b 100644 --- a/beetsplug/fetchart.py +++ b/beetsplug/fetchart.py @@ -14,10 +14,16 @@ """Fetches album art.""" +from __future__ import annotations + import os import re +from abc import ABC, abstractmethod from collections import OrderedDict from contextlib import closing +from enum import Enum +from functools import cached_property +from typing import TYPE_CHECKING, AnyStr, ClassVar, Literal, Tuple, Type import confuse import requests @@ -27,8 +33,15 @@ from beets import config, importer, plugins, ui, util from beets.util import bytestring_path, get_temp_filename, sorted_walk, syspath from beets.util.artresizer import ArtResizer +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator, Sequence + from logging import Logger + + from beets.importer import ImportSession, ImportTask + from beets.library import Album, Library + try: - from bs4 import BeautifulSoup + from bs4 import BeautifulSoup, Tag HAS_BEAUTIFUL_SOUP = True except ImportError: @@ -39,33 +52,54 @@ CONTENT_TYPES = {"image/jpeg": [b"jpg", b"jpeg"], "image/png": [b"png"]} IMAGE_EXTENSIONS = [ext for exts in CONTENT_TYPES.values() for ext in exts] +class ImageAction(Enum): + """Indicates whether an image is useable or requires post-processing.""" + + BAD = 0 + EXACT = 1 + DOWNSCALE = 2 + DOWNSIZE = 3 + DEINTERLACE = 4 + REFORMAT = 5 + + +class MetadataMatch(Enum): + """Indicates whether a `Candidate` matches the search criteria exactly.""" + + EXACT = 0 + FALLBACK = 1 + + +SourceLocation = Literal["local", "remote"] + + class Candidate: """Holds information about a matching artwork, deals with validation of dimension restrictions and resizing. """ - CANDIDATE_BAD = 0 - CANDIDATE_EXACT = 1 - CANDIDATE_DOWNSCALE = 2 - CANDIDATE_DOWNSIZE = 3 - CANDIDATE_DEINTERLACE = 4 - CANDIDATE_REFORMAT = 5 - - MATCH_EXACT = 0 - MATCH_FALLBACK = 1 - def __init__( - self, log, path=None, url=None, source="", match=None, size=None + self, + log: Logger, + source_name: str, + path: None | bytes = None, + url: None | str = None, + match: None | MetadataMatch = None, + size: None | Tuple[int, int] = None, ): self._log = log self.path = path self.url = url - self.source = source - self.check = None + self.source_name = source_name + self._check: None | ImageAction = None self.match = match self.size = size - def _validate(self, plugin, skip_check_for=None): + def _validate( + self, + plugin: FetchArtPlugin, + skip_check_for: None | list[ImageAction] = None, + ) -> ImageAction: """Determine whether the candidate artwork is valid based on its dimensions (width and ratio). @@ -74,21 +108,16 @@ class Candidate: validated for a particular operation without changing plugin configuration. - Return `CANDIDATE_BAD` if the file is unusable. - Return `CANDIDATE_EXACT` if the file is usable as-is. - Return `CANDIDATE_DOWNSCALE` if the file must be rescaled. - Return `CANDIDATE_DOWNSIZE` if the file must be resized, and possibly + Return `ImageAction.BAD` if the file is unusable. + Return `ImageAction.EXACT` if the file is usable as-is. + Return `ImageAction.DOWNSCALE` if the file must be rescaled. + Return `ImageAction.DOWNSIZE` if the file must be resized, and possibly also rescaled. - Return `CANDIDATE_DEINTERLACE` if the file must be deinterlaced. - Return `CANDIDATE_REFORMAT` if the file has to be converted. + Return `ImageAction.DEINTERLACE` if the file must be deinterlaced. + Return `ImageAction.REFORMAT` if the file has to be converted. """ if not self.path: - return self.CANDIDATE_BAD - - if skip_check_for is None: - skip_check_for = [] - if isinstance(skip_check_for, int): - skip_check_for = [skip_check_for] + return ImageAction.BAD if not ( plugin.enforce_ratio @@ -98,7 +127,7 @@ class Candidate: or plugin.deinterlace or plugin.cover_format ): - return self.CANDIDATE_EXACT + return ImageAction.EXACT # get_size returns None if no local imaging backend is available if not self.size: @@ -113,7 +142,7 @@ class Candidate: "`enforce_ratio` and `max_filesize` " "may be violated." ) - return self.CANDIDATE_EXACT + return ImageAction.EXACT short_edge = min(self.size) long_edge = max(self.size) @@ -123,7 +152,7 @@ class Candidate: self._log.debug( "image too small ({} < {})", self.size[0], plugin.minwidth ) - return self.CANDIDATE_BAD + return ImageAction.BAD # Check aspect ratio. edge_diff = long_edge - short_edge @@ -137,7 +166,7 @@ class Candidate: short_edge, plugin.margin_px, ) - return self.CANDIDATE_BAD + return ImageAction.BAD elif plugin.margin_percent: margin_px = plugin.margin_percent * long_edge if edge_diff > margin_px: @@ -148,13 +177,13 @@ class Candidate: short_edge, margin_px, ) - return self.CANDIDATE_BAD + return ImageAction.BAD elif edge_diff: # also reached for margin_px == 0 and margin_percent == 0.0 self._log.debug( "image is not square ({} != {})", self.size[0], self.size[1] ) - return self.CANDIDATE_BAD + return ImageAction.BAD # Check maximum dimension. downscale = False @@ -188,23 +217,29 @@ class Candidate: plugin.cover_format, ) - if downscale and (self.CANDIDATE_DOWNSCALE not in skip_check_for): - return self.CANDIDATE_DOWNSCALE - if reformat and (self.CANDIDATE_REFORMAT not in skip_check_for): - return self.CANDIDATE_REFORMAT + skip_check_for = skip_check_for or [] + + if downscale and (ImageAction.DOWNSCALE not in skip_check_for): + return ImageAction.DOWNSCALE + if reformat and (ImageAction.REFORMAT not in skip_check_for): + return ImageAction.REFORMAT if plugin.deinterlace and ( - self.CANDIDATE_DEINTERLACE not in skip_check_for + ImageAction.DEINTERLACE not in skip_check_for ): - return self.CANDIDATE_DEINTERLACE - if downsize and (self.CANDIDATE_DOWNSIZE not in skip_check_for): - return self.CANDIDATE_DOWNSIZE - return self.CANDIDATE_EXACT + return ImageAction.DEINTERLACE + if downsize and (ImageAction.DOWNSIZE not in skip_check_for): + return ImageAction.DOWNSIZE + return ImageAction.EXACT - def validate(self, plugin, skip_check_for=None): - self.check = self._validate(plugin, skip_check_for) - return self.check + def validate( + self, + plugin: FetchArtPlugin, + skip_check_for: None | list[ImageAction] = None, + ) -> ImageAction: + self._check = self._validate(plugin, skip_check_for) + return self._check - def resize(self, plugin): + def resize(self, plugin: FetchArtPlugin) -> None: """Resize the candidate artwork according to the plugin's configuration until it is valid or no further resizing is possible. @@ -214,25 +249,32 @@ class Candidate: checks_performed = [] # we don't want to resize the image if it's valid or bad - while current_check not in [self.CANDIDATE_BAD, self.CANDIDATE_EXACT]: + while current_check not in [ImageAction.BAD, ImageAction.EXACT]: self._resize(plugin, current_check) checks_performed.append(current_check) current_check = self.validate( plugin, skip_check_for=checks_performed ) - def _resize(self, plugin, check=None): + def _resize( + self, plugin: FetchArtPlugin, check: None | ImageAction = None + ) -> None: """Resize the candidate artwork according to the plugin's configuration and the specified check. """ - if check == self.CANDIDATE_DOWNSCALE: + # This must only be called when _validate returned something other than + # ImageAction.Bad or ImageAction.EXACT; then path and size are known. + assert self.path is not None + assert self.size is not None + + if check == ImageAction.DOWNSCALE: self.path = ArtResizer.shared.resize( plugin.maxwidth, self.path, quality=plugin.quality, max_filesize=plugin.max_filesize, ) - elif check == self.CANDIDATE_DOWNSIZE: + elif check == ImageAction.DOWNSIZE: # dimensions are correct, so maxwidth is set to maximum dimension self.path = ArtResizer.shared.resize( max(self.size), @@ -240,9 +282,9 @@ class Candidate: quality=plugin.quality, max_filesize=plugin.max_filesize, ) - elif check == self.CANDIDATE_DEINTERLACE: + elif check == ImageAction.DEINTERLACE: self.path = ArtResizer.shared.deinterlace(self.path) - elif check == self.CANDIDATE_REFORMAT: + elif check == ImageAction.REFORMAT: self.path = ArtResizer.shared.reformat( self.path, plugin.cover_format, @@ -250,7 +292,7 @@ class Candidate: ) -def _logged_get(log, *args, **kwargs): +def _logged_get(log: Logger, *args, **kwargs) -> requests.Response: """Like `requests.get`, but logs the effective URL to the specified `log` at the `DEBUG` level. @@ -295,7 +337,9 @@ class RequestMixin: must be named `self._log`. """ - def request(self, *args, **kwargs): + _log: Logger + + def request(self, *args, **kwargs) -> requests.Response: """Like `requests.get`, but uses the logger `self._log`. See also `_logged_get`. @@ -306,55 +350,88 @@ class RequestMixin: # ART SOURCES ################################################################ -class ArtSource(RequestMixin): - VALID_MATCHING_CRITERIA = ["default"] +class ArtSource(RequestMixin, ABC): + # Specify whether this source fetches local or remote images + LOC: ClassVar[SourceLocation] + # A list of methods to match metadata, sorted by descending accuracy + VALID_MATCHING_CRITERIA: list[str] = ["default"] + # A human-readable name for the art source + NAME: ClassVar[str] + # The key to select the art source in the config. This value will also be + # stored in the database. + ID: ClassVar[str] - def __init__(self, log, config, match_by=None): + def __init__( + self, + log: Logger, + config: confuse.ConfigView, + match_by: None | list[str] = None, + ) -> None: self._log = log self._config = config self.match_by = match_by or self.VALID_MATCHING_CRITERIA + @cached_property + def description(self) -> str: + return f"{self.ID}[{', '.join(self.match_by)}]" + @staticmethod - def add_default_config(config): + def add_default_config(config: confuse.ConfigView) -> None: pass @classmethod - def available(cls, log, config): + def available(cls, log: Logger, config: confuse.ConfigView) -> bool: """Return whether or not all dependencies are met and the art source is in fact usable. """ return True - def get(self, album, plugin, paths): - raise NotImplementedError() + @abstractmethod + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: + pass - def _candidate(self, **kwargs): - return Candidate(source=self, log=self._log, **kwargs) + def _candidate(self, **kwargs) -> Candidate: + return Candidate(source_name=self.ID, log=self._log, **kwargs) - def fetch_image(self, candidate, plugin): - raise NotImplementedError() + @abstractmethod + def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None: + """Fetch the image to a temporary file if it is not already available + as a local file. - def cleanup(self, candidate): + After calling this, `Candidate.path` is set to the image path if + successful, or to `None` otherwise. + """ + pass + + def cleanup(self, candidate: Candidate) -> None: pass class LocalArtSource(ArtSource): - IS_LOCAL = True - LOC_STR = "local" + LOC = "local" - def fetch_image(self, candidate, plugin): + def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None: pass class RemoteArtSource(ArtSource): - IS_LOCAL = False - LOC_STR = "remote" + LOC = "remote" - def fetch_image(self, candidate, plugin): + def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None: """Downloads an image from a URL and checks whether it seems to - actually be an image. If so, returns a path to the downloaded image. - Otherwise, returns None. + actually be an image. """ + # This must only be called for candidates that were returned by + # self.get, which are expected to have a url and no path (because they + # haven't been downloaded yet). + assert candidate.path is None + assert candidate.url is not None + if plugin.maxwidth: candidate.url = ArtResizer.shared.proxy_url( plugin.maxwidth, candidate.url @@ -418,7 +495,7 @@ class RemoteArtSource(ArtSource): for chunk in data: fh.write(chunk) self._log.debug( - "downloaded art to: {0}", util.displayable_path(filename) + "downloaded art to: {}", util.displayable_path(filename) ) candidate.path = util.bytestring_path(filename) return @@ -429,7 +506,7 @@ class RemoteArtSource(ArtSource): self._log.debug("error fetching art: {}", exc) return - def cleanup(self, candidate): + def cleanup(self, candidate: Candidate) -> None: if candidate.path: try: util.remove(path=candidate.path) @@ -439,34 +516,39 @@ class RemoteArtSource(ArtSource): class CoverArtArchive(RemoteArtSource): NAME = "Cover Art Archive" + ID = "coverart" VALID_MATCHING_CRITERIA = ["release", "releasegroup"] VALID_THUMBNAIL_SIZES = [250, 500, 1200] URL = "https://coverartarchive.org/release/{mbid}" GROUP_URL = "https://coverartarchive.org/release-group/{mbid}" - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: """Return the Cover Art Archive and Cover Art Archive release group URLs using album MusicBrainz release ID and release group ID. """ - def get_image_urls(url, preferred_width=None): + def get_image_urls( + url: str, + preferred_width: None | str = None, + ) -> Iterator[str]: try: response = self.request(url) except requests.RequestException: - self._log.debug( - "{}: error receiving response".format(self.NAME) - ) + self._log.debug("{}: error receiving response", self.NAME) return try: data = response.json() except ValueError: self._log.debug( - "{}: error loading response: {}".format( - self.NAME, response.text - ) + "{}: error loading response: {}", self.NAME, response.text ) return @@ -500,41 +582,53 @@ class CoverArtArchive(RemoteArtSource): if "release" in self.match_by and album.mb_albumid: for url in get_image_urls(release_url, preferred_width): - yield self._candidate(url=url, match=Candidate.MATCH_EXACT) + yield self._candidate(url=url, match=MetadataMatch.EXACT) if "releasegroup" in self.match_by and album.mb_releasegroupid: for url in get_image_urls(release_group_url, preferred_width): - yield self._candidate(url=url, match=Candidate.MATCH_FALLBACK) + yield self._candidate(url=url, match=MetadataMatch.FALLBACK) class Amazon(RemoteArtSource): NAME = "Amazon" + ID = "amazon" URL = "https://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg" INDICES = (1, 2) - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: """Generate URLs using Amazon ID (ASIN) string.""" if album.asin: for index in self.INDICES: yield self._candidate( url=self.URL % (album.asin, index), - match=Candidate.MATCH_EXACT, + match=MetadataMatch.EXACT, ) class AlbumArtOrg(RemoteArtSource): NAME = "AlbumArt.org scraper" + ID = "albumart" URL = "https://www.albumart.org/index_detail.php" PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"' - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ): """Return art URL from AlbumArt.org using album ASIN.""" if not album.asin: return # Get the page from albumart.org. try: resp = self.request(self.URL, params={"asin": album.asin}) - self._log.debug("scraped art URL: {0}", resp.url) + self._log.debug("scraped art URL: {}", resp.url) except requests.RequestException: self._log.debug("error scraping art page") return @@ -543,13 +637,14 @@ class AlbumArtOrg(RemoteArtSource): m = re.search(self.PAT, resp.text) if m: image_url = m.group(1) - yield self._candidate(url=image_url, match=Candidate.MATCH_EXACT) + yield self._candidate(url=image_url, match=MetadataMatch.EXACT) else: self._log.debug("no image found on page") class GoogleImages(RemoteArtSource): NAME = "Google Images" + ID = "google" URL = "https://www.googleapis.com/customsearch/v1" def __init__(self, *args, **kwargs): @@ -558,7 +653,7 @@ class GoogleImages(RemoteArtSource): self.cx = (self._config["google_engine"].get(),) @staticmethod - def add_default_config(config): + def add_default_config(config: confuse.ConfigView): config.add( { "google_key": None, @@ -569,13 +664,18 @@ class GoogleImages(RemoteArtSource): config["google_engine"].redact = True @classmethod - def available(cls, log, config): + def available(cls, log: Logger, config: confuse.ConfigView) -> bool: has_key = bool(config["google_key"].get()) if not has_key: log.debug("google: Disabling art source due to missing key") return has_key - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: """Return art URL from google custom search engine given an album title and interpreter. """ @@ -601,20 +701,18 @@ class GoogleImages(RemoteArtSource): try: data = response.json() except ValueError: - self._log.debug( - "google: error loading response: {}".format(response.text) - ) + self._log.debug("google: error loading response: {}", response.text) return if "error" in data: reason = data["error"]["errors"][0]["reason"] - self._log.debug("google fetchart error: {0}", reason) + self._log.debug("google fetchart error: {}", reason) return if "items" in data.keys(): for item in data["items"]: yield self._candidate( - url=item["link"], match=Candidate.MATCH_EXACT + url=item["link"], match=MetadataMatch.EXACT ) @@ -622,6 +720,7 @@ class FanartTV(RemoteArtSource): """Art from fanart.tv requested using their API""" NAME = "fanart.tv" + ID = "fanarttv" API_URL = "https://webservice.fanart.tv/v3/" API_ALBUMS = API_URL + "music/albums/" PROJECT_KEY = "61a7d0ab4e67162b7a0c7c35915cd48e" @@ -631,7 +730,7 @@ class FanartTV(RemoteArtSource): self.client_key = self._config["fanarttv_key"].get() @staticmethod - def add_default_config(config): + def add_default_config(config: confuse.ConfigView): config.add( { "fanarttv_key": None, @@ -639,7 +738,12 @@ class FanartTV(RemoteArtSource): ) config["fanarttv_key"].redact = True - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: if not album.mb_releasegroupid: return @@ -695,15 +799,21 @@ class FanartTV(RemoteArtSource): # fanart.tv has a strict size requirement for album art to be # uploaded yield self._candidate( - url=item["url"], match=Candidate.MATCH_EXACT, size=(1000, 1000) + url=item["url"], match=MetadataMatch.EXACT, size=(1000, 1000) ) class ITunesStore(RemoteArtSource): NAME = "iTunes Store" + ID = "itunes" API_URL = "https://itunes.apple.com/search" - def get(self, album, plugin, paths): + def get( + self, + album: Album, + plugin: FetchArtPlugin, + paths: None | Sequence[bytes], + ) -> Iterator[Candidate]: """Return art URL from iTunes Store given an album title.""" if not (album.albumartist and album.album): return @@ -718,13 +828,13 @@ class ITunesStore(RemoteArtSource): r = self.request(self.API_URL, params=payload) r.raise_for_status() except requests.RequestException as e: - self._log.debug("iTunes search failed: {0}", e) + self._log.debug("iTunes search failed: {}", e) return try: candidates = r.json()["results"] except ValueError as e: - self._log.debug("Could not decode json response: {0}", e) + self._log.debug("Could not decode json response: {}", e) return except KeyError as e: self._log.debug( @@ -752,7 +862,7 @@ class ITunesStore(RemoteArtSource): art_url = c["artworkUrl100"] art_url = art_url.replace("100x100bb", image_suffix) yield self._candidate( - url=art_url, match=Candidate.MATCH_EXACT + url=art_url, match=MetadataMatch.EXACT ) except KeyError as e: self._log.debug( @@ -767,7 +877,7 @@ class ITunesStore(RemoteArtSource): "100x100bb", image_suffix ) yield self._candidate( - url=fallback_art_url, match=Candidate.MATCH_FALLBACK + url=fallback_art_url, match=MetadataMatch.FALLBACK ) except KeyError as e: self._log.debug( @@ -779,6 +889,7 @@ class ITunesStore(RemoteArtSource): class Wikipedia(RemoteArtSource): NAME = "Wikipedia (queried through DBpedia)" + ID = "wikipedia" DBPEDIA_URL = "https://dbpedia.org/sparql" WIKIPEDIA_URL = "https://en.wikipedia.org/w/api.php" SPARQL_QUERY = """PREFIX rdf: