diff --git a/beets/util/__init__.py b/beets/util/__init__.py index e17de1f51..02ab9cf56 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -40,6 +40,7 @@ from typing import ( Any, AnyStr, Callable, + Generic, Iterable, NamedTuple, TypeVar, @@ -1041,6 +1042,53 @@ class cached_classproperty: return self.cache[owner] +class LazySharedInstance(Generic[T]): + """A descriptor that provides access to a lazily-created shared instance of + the containing class, while calling the class constructor to construct a + new object works as usual. + + ``` + ID: int = 0 + + class Foo: + def __init__(): + global ID + + self.id = ID + ID += 1 + + def func(self): + print(self.id) + + shared: LazySharedInstance[Foo] = LazySharedInstance() + + a0 = Foo() + a1 = Foo.shared + a2 = Foo() + a3 = Foo.shared + + a0.func() # 0 + a1.func() # 1 + a2.func() # 2 + a3.func() # 1 + ``` + """ + + _instance: T | None = None + + def __get__(self, instance: T | None, owner: type[T]) -> T: + if instance is not None: + raise RuntimeError( + "shared instances must be obtained from the class property, " + "not an instance" + ) + + if self._instance is None: + self._instance = owner() + + return self._instance + + def get_module_tempdir(module: str) -> Path: """Return the temporary directory for the given module. diff --git a/beets/util/artresizer.py b/beets/util/artresizer.py index ffbc2edba..33b98c413 100644 --- a/beets/util/artresizer.py +++ b/beets/util/artresizer.py @@ -16,23 +16,33 @@ public resizing proxy if neither is available. """ +from __future__ import annotations + import os import os.path import platform import re import subprocess +from abc import ABC, abstractmethod +from enum import Enum from itertools import chain +from typing import Any, ClassVar, Mapping from urllib.parse import urlencode from beets import logging, util -from beets.util import displayable_path, get_temp_filename, syspath +from beets.util import ( + LazySharedInstance, + displayable_path, + get_temp_filename, + syspath, +) PROXY_URL = "https://images.weserv.nl/" log = logging.getLogger("beets") -def resize_url(url, maxwidth, quality=0): +def resize_url(url: str, maxwidth: int, quality: int = 0) -> str: """Return a proxied image URL that resizes the original image to maxwidth (preserving aspect ratio). """ @@ -51,18 +61,118 @@ class LocalBackendNotAvailableError(Exception): pass -_NOT_AVAILABLE = object() +# Singleton pattern that the typechecker understands: +# https://peps.python.org/pep-0484/#support-for-singleton-types-in-unions +class NotAvailable(Enum): + token = 0 -class LocalBackend: +_NOT_AVAILABLE = NotAvailable.token + + +class LocalBackend(ABC): + NAME: ClassVar[str] + @classmethod - def available(cls): + @abstractmethod + def version(cls) -> Any: + """Return the backend version if its dependencies are satisfied or + raise `LocalBackendNotAvailableError`. + """ + pass + + @classmethod + def available(cls) -> bool: + """Return `True` this backend's dependencies are satisfied and it can + be used, `False` otherwise.""" try: cls.version() return True except LocalBackendNotAvailableError: return False + @abstractmethod + def resize( + self, + maxwidth: int, + path_in: bytes, + path_out: bytes | None = None, + quality: int = 0, + max_filesize: int = 0, + ) -> bytes: + """Resize an image to the given width and return the output path. + + On error, logs a warning and returns `path_in`. + """ + pass + + @abstractmethod + def get_size(self, path_in: bytes) -> tuple[int, int] | None: + """Return the (width, height) of the image or None if unavailable.""" + pass + + @abstractmethod + def deinterlace( + self, + path_in: bytes, + path_out: bytes | None = None, + ) -> bytes: + """Remove interlacing from an image and return the output path. + + On error, logs a warning and returns `path_in`. + """ + pass + + @abstractmethod + def get_format(self, path_in: bytes) -> str | None: + """Return the image format (e.g., 'PNG') or None if undetectable.""" + pass + + @abstractmethod + def convert_format( + self, + source: bytes, + target: bytes, + deinterlaced: bool, + ) -> bytes: + """Convert an image to a new format and return the new file path. + + On error, logs a warning and returns `source`. + """ + pass + + @property + def can_compare(self) -> bool: + """Indicate whether image comparison is supported by this backend.""" + return False + + def compare( + self, + im1: bytes, + im2: bytes, + compare_threshold: float, + ) -> bool | None: + """Compare two images and return `True` if they are similar enough, or + `None` if there is an error. + + This must only be called if `self.can_compare()` returns `True`. + """ + # It is an error to call this when ArtResizer.can_compare is not True. + raise NotImplementedError() + + @property + def can_write_metadata(self) -> bool: + """Indicate whether writing metadata to images is supported.""" + return False + + def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: + """Write key-value metadata into the image file. + + This must only be called if `self.can_write_metadata()` returns `True`. + """ + # It is an error to call this when ArtResizer.can_write_metadata is not True. + raise NotImplementedError() + class IMBackend(LocalBackend): NAME = "ImageMagick" @@ -70,11 +180,11 @@ class IMBackend(LocalBackend): # These fields are used as a cache for `version()`. `_legacy` indicates # whether the modern `magick` binary is available or whether to fall back # to the old-style `convert`, `identify`, etc. commands. - _version = None - _legacy = None + _version: tuple[int, int, int] | NotAvailable | None = None + _legacy: bool | None = None @classmethod - def version(cls): + def version(cls) -> tuple[int, int, int]: """Obtain and cache ImageMagick version. Raises `LocalBackendNotAvailableError` if not available. @@ -98,12 +208,17 @@ class IMBackend(LocalBackend): ) cls._legacy = legacy - if cls._version is _NOT_AVAILABLE: + # cls._version is never None here, but mypy doesn't get that + if cls._version is _NOT_AVAILABLE or cls._version is None: raise LocalBackendNotAvailableError() else: return cls._version - def __init__(self): + convert_cmd: list[str | bytes] + identify_cmd: list[str | bytes] + compare_cmd: list[str | bytes] + + def __init__(self) -> None: """Initialize a wrapper around ImageMagick for local image operations. Stores the ImageMagick version and legacy flag. If ImageMagick is not @@ -124,8 +239,13 @@ class IMBackend(LocalBackend): self.compare_cmd = ["magick", "compare"] def resize( - self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0 - ): + self, + maxwidth: int, + path_in: bytes, + path_out: bytes | None = None, + quality: int = 0, + max_filesize: int = 0, + ) -> bytes: """Resize using ImageMagick. Use the ``magick`` program or ``convert`` on older versions. Return @@ -145,7 +265,7 @@ class IMBackend(LocalBackend): # with regards to the height. # ImageMagick already seems to default to no interlace, but we include # it here for the sake of explicitness. - cmd = self.convert_cmd + [ + cmd: list[str | bytes] = self.convert_cmd + [ syspath(path_in, prefix=False), "-resize", f"{maxwidth}x>", @@ -174,8 +294,8 @@ class IMBackend(LocalBackend): return path_out - def get_size(self, path_in): - cmd = self.identify_cmd + [ + def get_size(self, path_in: bytes) -> tuple[int, int] | None: + cmd: list[str | bytes] = self.identify_cmd + [ "-format", "%w %h", syspath(path_in, prefix=False), @@ -194,12 +314,22 @@ class IMBackend(LocalBackend): ) return None try: - return tuple(map(int, out.split(b" "))) + size = tuple(map(int, out.split(b" "))) except IndexError: log.warning("Could not understand IM output: {0!r}", out) return None - def deinterlace(self, path_in, path_out=None): + if len(size) != 2: + log.warning("Could not understand IM output: {0!r}", out) + return None + + return size + + def deinterlace( + self, + path_in: bytes, + path_out: bytes | None = None, + ) -> bytes: if not path_out: path_out = get_temp_filename(__name__, "deinterlace_IM_", path_in) @@ -217,16 +347,24 @@ class IMBackend(LocalBackend): # FIXME: Should probably issue a warning? return path_in - def get_format(self, filepath): - cmd = self.identify_cmd + ["-format", "%[magick]", syspath(filepath)] + def get_format(self, path_in: bytes) -> str | None: + cmd = self.identify_cmd + ["-format", "%[magick]", syspath(path_in)] try: - return util.command_output(cmd).stdout - except subprocess.CalledProcessError: + # Image formats should really only be ASCII strings such as "PNG", + # if anything else is returned, something is off and we return + # None for safety. + return util.command_output(cmd).stdout.decode("ascii", "strict") + except (subprocess.CalledProcessError, UnicodeError): # FIXME: Should probably issue a warning? return None - def convert_format(self, source, target, deinterlaced): + def convert_format( + self, + source: bytes, + target: bytes, + deinterlaced: bool, + ) -> bytes: cmd = self.convert_cmd + [ syspath(source), *(["-interlace", "none"] if deinterlaced else []), @@ -243,10 +381,15 @@ class IMBackend(LocalBackend): return source @property - def can_compare(self): + def can_compare(self) -> bool: return self.version() > (6, 8, 7) - def compare(self, im1, im2, compare_threshold): + def compare( + self, + im1: bytes, + im2: bytes, + compare_threshold: float, + ) -> bool | None: is_windows = platform.system() == "Windows" # Converting images to grayscale tends to minimize the weight @@ -286,6 +429,10 @@ class IMBackend(LocalBackend): close_fds=not is_windows, ) + # help out mypy + assert convert_proc.stdout is not None + assert convert_proc.stderr is not None + # Check the convert output. We're not interested in the # standard output; that gets piped to the next stage. convert_proc.stdout.close() @@ -329,10 +476,10 @@ class IMBackend(LocalBackend): return phash_diff <= compare_threshold @property - def can_write_metadata(self): + def can_write_metadata(self) -> bool: return True - def write_metadata(self, file, metadata): + def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: assignments = list( chain.from_iterable(("-set", k, v) for k, v in metadata.items()) ) @@ -345,13 +492,13 @@ class PILBackend(LocalBackend): NAME = "PIL" @classmethod - def version(cls): + def version(cls) -> None: try: __import__("PIL", fromlist=["Image"]) except ImportError: raise LocalBackendNotAvailableError() - def __init__(self): + def __init__(self) -> None: """Initialize a wrapper around PIL for local image operations. If PIL is not available, raise an Exception. @@ -359,8 +506,13 @@ class PILBackend(LocalBackend): self.version() def resize( - self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0 - ): + self, + maxwidth: int, + path_in: bytes, + path_out: bytes | None = None, + quality: int = 0, + max_filesize: int = 0, + ) -> bytes: """Resize using Python Imaging Library (PIL). Return the output path of resized image. """ @@ -429,7 +581,7 @@ class PILBackend(LocalBackend): ) return path_in - def get_size(self, path_in): + def get_size(self, path_in: bytes) -> tuple[int, int] | None: from PIL import Image try: @@ -441,7 +593,11 @@ class PILBackend(LocalBackend): ) return None - def deinterlace(self, path_in, path_out=None): + def deinterlace( + self, + path_in: bytes, + path_out: bytes | None = None, + ) -> bytes: if not path_out: path_out = get_temp_filename(__name__, "deinterlace_PIL_", path_in) @@ -455,11 +611,11 @@ class PILBackend(LocalBackend): # FIXME: Should probably issue a warning? return path_in - def get_format(self, filepath): + def get_format(self, path_in: bytes) -> str | None: from PIL import Image, UnidentifiedImageError try: - with Image.open(syspath(filepath)) as im: + with Image.open(syspath(path_in)) as im: return im.format except ( ValueError, @@ -467,10 +623,15 @@ class PILBackend(LocalBackend): UnidentifiedImageError, FileNotFoundError, ): - log.exception("failed to detect image format for {}", filepath) + log.exception("failed to detect image format for {}", path_in) return None - def convert_format(self, source, target, deinterlaced): + def convert_format( + self, + source: bytes, + target: bytes, + deinterlaced: bool, + ) -> bytes: from PIL import Image, UnidentifiedImageError try: @@ -488,18 +649,23 @@ class PILBackend(LocalBackend): return source @property - def can_compare(self): + def can_compare(self) -> bool: return False - def compare(self, im1, im2, compare_threshold): + def compare( + self, + im1: bytes, + im2: bytes, + compare_threshold: float, + ) -> bool | None: # It is an error to call this when ArtResizer.can_compare is not True. raise NotImplementedError() @property - def can_write_metadata(self): + def can_write_metadata(self) -> bool: return True - def write_metadata(self, file, metadata): + def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: from PIL import Image, PngImagePlugin # FIXME: Detect and handle other file types (currently, the only user @@ -507,38 +673,22 @@ class PILBackend(LocalBackend): im = Image.open(syspath(file)) meta = PngImagePlugin.PngInfo() for k, v in metadata.items(): - meta.add_text(k, v, 0) + meta.add_text(k, v, zip=False) im.save(os.fsdecode(file), "PNG", pnginfo=meta) -class Shareable(type): - """A pseudo-singleton metaclass that allows both shared and - non-shared instances. The ``MyClass.shared`` property holds a - lazily-created shared instance of ``MyClass`` while calling - ``MyClass()`` to construct a new object works as usual. - """ - - def __init__(cls, name, bases, dict): - super().__init__(name, bases, dict) - cls._instance = None - - @property - def shared(cls): - if cls._instance is None: - cls._instance = cls() - return cls._instance - - -BACKEND_CLASSES = [ +BACKEND_CLASSES: list[type[LocalBackend]] = [ IMBackend, PILBackend, ] -class ArtResizer(metaclass=Shareable): - """A singleton class that performs image resizes.""" +class ArtResizer: + """A class that dispatches image operations to an available backend.""" - def __init__(self): + local_method: LocalBackend | None + + def __init__(self) -> None: """Create a resizer object with an inferred method.""" # Check if a local backend is available, and store an instance of the # backend class. Otherwise, fallback to the web proxy. @@ -550,25 +700,41 @@ class ArtResizer(metaclass=Shareable): except LocalBackendNotAvailableError: continue else: + # FIXME: Turn WEBPROXY into a backend class as well to remove all + # the special casing. Then simply delegate all methods to the + # backends. (How does proxy_url fit in here, however?) + # Use an ABC (or maybe a typing Protocol?) for backend + # methods, such that both individual backends as well as + # ArtResizer implement it. + # It should probably be configurable which backends classes to + # consider, similar to fetchart or lyrics backends (i.e. a list + # of backends sorted by priority). log.debug("artresizer: method is WEBPROXY") self.local_method = None + shared: LazySharedInstance[ArtResizer] = LazySharedInstance() + @property - def method(self): - if self.local: + def method(self) -> str: + if self.local_method is not None: return self.local_method.NAME else: return "WEBPROXY" def resize( - self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0 - ): + self, + maxwidth: int, + path_in: bytes, + path_out: bytes | None = None, + quality: int = 0, + max_filesize: int = 0, + ) -> bytes: """Manipulate an image file according to the method, returning a new path. For PIL or IMAGEMAGIC methods, resizes the image to a temporary file and encodes with the specified quality level. For WEBPROXY, returns `path_in` unmodified. """ - if self.local: + if self.local_method is not None: return self.local_method.resize( maxwidth, path_in, @@ -580,18 +746,22 @@ class ArtResizer(metaclass=Shareable): # Handled by `proxy_url` already. return path_in - def deinterlace(self, path_in, path_out=None): + def deinterlace( + self, + path_in: bytes, + path_out: bytes | None = None, + ) -> bytes: """Deinterlace an image. Only available locally. """ - if self.local: + if self.local_method is not None: return self.local_method.deinterlace(path_in, path_out) else: # FIXME: Should probably issue a warning? return path_in - def proxy_url(self, maxwidth, url, quality=0): + def proxy_url(self, maxwidth: int, url: str, quality: int = 0) -> str: """Modifies an image URL according the method, returning a new URL. For WEBPROXY, a URL on the proxy server is returned. Otherwise, the URL is returned unmodified. @@ -603,42 +773,48 @@ class ArtResizer(metaclass=Shareable): return resize_url(url, maxwidth, quality) @property - def local(self): + def local(self) -> bool: """A boolean indicating whether the resizing method is performed locally (i.e., PIL or ImageMagick). """ return self.local_method is not None - def get_size(self, path_in): + def get_size(self, path_in: bytes) -> tuple[int, int] | None: """Return the size of an image file as an int couple (width, height) in pixels. Only available locally. """ - if self.local: + if self.local_method is not None: return self.local_method.get_size(path_in) else: - # FIXME: Should probably issue a warning? - return path_in + raise RuntimeError( + "image cannot be obtained without artresizer backend" + ) - def get_format(self, path_in): + def get_format(self, path_in: bytes) -> str | None: """Returns the format of the image as a string. Only available locally. """ - if self.local: + if self.local_method is not None: return self.local_method.get_format(path_in) else: # FIXME: Should probably issue a warning? return None - def reformat(self, path_in, new_format, deinterlaced=True): + def reformat( + self, + path_in: bytes, + new_format: str, + deinterlaced: bool = True, + ) -> bytes: """Converts image to desired format, updating its extension, but keeping the same filename. Only available locally. """ - if not self.local: + if self.local_method is None: # FIXME: Should probably issue a warning? return path_in @@ -664,40 +840,45 @@ class ArtResizer(metaclass=Shareable): return result_path @property - def can_compare(self): + def can_compare(self) -> bool: """A boolean indicating whether image comparison is available""" - if self.local: + if self.local_method is not None: return self.local_method.can_compare else: return False - def compare(self, im1, im2, compare_threshold): + def compare( + self, + im1: bytes, + im2: bytes, + compare_threshold: float, + ) -> bool | None: """Return a boolean indicating whether two images are similar. Only available locally. """ - if self.local: + if self.local_method is not None: return self.local_method.compare(im1, im2, compare_threshold) else: # FIXME: Should probably issue a warning? return None @property - def can_write_metadata(self): + def can_write_metadata(self) -> bool: """A boolean indicating whether writing image metadata is supported.""" - if self.local: + if self.local_method is not None: return self.local_method.can_write_metadata else: return False - def write_metadata(self, file, metadata): + def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: """Write key-value metadata to the image file. Only available locally. Currently, expects the image to be a PNG file. """ - if self.local: + if self.local_method is not None: self.local_method.write_metadata(file, metadata) else: # FIXME: Should probably issue a warning?