Art resizer typings v2 (#5734)

A rebased & amended version of
https://github.com/beetbox/beets/pull/4649.

I left a FIXME note regarding a cleanup that I feel should be done at
some point, but that is out-of-scope here when just adding initial
typings.
This commit is contained in:
Benedikt 2025-05-13 12:18:53 +02:00 committed by GitHub
commit 589bd6d599
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 317 additions and 88 deletions

View file

@ -40,6 +40,7 @@ from typing import (
Any,
AnyStr,
Callable,
Generic,
Iterable,
NamedTuple,
TypeVar,
@ -1041,6 +1042,53 @@ class cached_classproperty:
return self.cache[owner]
class LazySharedInstance(Generic[T]):
"""A descriptor that provides access to a lazily-created shared instance of
the containing class, while calling the class constructor to construct a
new object works as usual.
```
ID: int = 0
class Foo:
def __init__():
global ID
self.id = ID
ID += 1
def func(self):
print(self.id)
shared: LazySharedInstance[Foo] = LazySharedInstance()
a0 = Foo()
a1 = Foo.shared
a2 = Foo()
a3 = Foo.shared
a0.func() # 0
a1.func() # 1
a2.func() # 2
a3.func() # 1
```
"""
_instance: T | None = None
def __get__(self, instance: T | None, owner: type[T]) -> T:
if instance is not None:
raise RuntimeError(
"shared instances must be obtained from the class property, "
"not an instance"
)
if self._instance is None:
self._instance = owner()
return self._instance
def get_module_tempdir(module: str) -> Path:
"""Return the temporary directory for the given module.

View file

@ -16,23 +16,33 @@
public resizing proxy if neither is available.
"""
from __future__ import annotations
import os
import os.path
import platform
import re
import subprocess
from abc import ABC, abstractmethod
from enum import Enum
from itertools import chain
from typing import Any, ClassVar, Mapping
from urllib.parse import urlencode
from beets import logging, util
from beets.util import displayable_path, get_temp_filename, syspath
from beets.util import (
LazySharedInstance,
displayable_path,
get_temp_filename,
syspath,
)
PROXY_URL = "https://images.weserv.nl/"
log = logging.getLogger("beets")
def resize_url(url, maxwidth, quality=0):
def resize_url(url: str, maxwidth: int, quality: int = 0) -> str:
"""Return a proxied image URL that resizes the original image to
maxwidth (preserving aspect ratio).
"""
@ -51,18 +61,118 @@ class LocalBackendNotAvailableError(Exception):
pass
_NOT_AVAILABLE = object()
# Singleton pattern that the typechecker understands:
# https://peps.python.org/pep-0484/#support-for-singleton-types-in-unions
class NotAvailable(Enum):
token = 0
class LocalBackend:
_NOT_AVAILABLE = NotAvailable.token
class LocalBackend(ABC):
NAME: ClassVar[str]
@classmethod
def available(cls):
@abstractmethod
def version(cls) -> Any:
"""Return the backend version if its dependencies are satisfied or
raise `LocalBackendNotAvailableError`.
"""
pass
@classmethod
def available(cls) -> bool:
"""Return `True` this backend's dependencies are satisfied and it can
be used, `False` otherwise."""
try:
cls.version()
return True
except LocalBackendNotAvailableError:
return False
@abstractmethod
def resize(
self,
maxwidth: int,
path_in: bytes,
path_out: bytes | None = None,
quality: int = 0,
max_filesize: int = 0,
) -> bytes:
"""Resize an image to the given width and return the output path.
On error, logs a warning and returns `path_in`.
"""
pass
@abstractmethod
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
"""Return the (width, height) of the image or None if unavailable."""
pass
@abstractmethod
def deinterlace(
self,
path_in: bytes,
path_out: bytes | None = None,
) -> bytes:
"""Remove interlacing from an image and return the output path.
On error, logs a warning and returns `path_in`.
"""
pass
@abstractmethod
def get_format(self, path_in: bytes) -> str | None:
"""Return the image format (e.g., 'PNG') or None if undetectable."""
pass
@abstractmethod
def convert_format(
self,
source: bytes,
target: bytes,
deinterlaced: bool,
) -> bytes:
"""Convert an image to a new format and return the new file path.
On error, logs a warning and returns `source`.
"""
pass
@property
def can_compare(self) -> bool:
"""Indicate whether image comparison is supported by this backend."""
return False
def compare(
self,
im1: bytes,
im2: bytes,
compare_threshold: float,
) -> bool | None:
"""Compare two images and return `True` if they are similar enough, or
`None` if there is an error.
This must only be called if `self.can_compare()` returns `True`.
"""
# It is an error to call this when ArtResizer.can_compare is not True.
raise NotImplementedError()
@property
def can_write_metadata(self) -> bool:
"""Indicate whether writing metadata to images is supported."""
return False
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
"""Write key-value metadata into the image file.
This must only be called if `self.can_write_metadata()` returns `True`.
"""
# It is an error to call this when ArtResizer.can_write_metadata is not True.
raise NotImplementedError()
class IMBackend(LocalBackend):
NAME = "ImageMagick"
@ -70,11 +180,11 @@ class IMBackend(LocalBackend):
# These fields are used as a cache for `version()`. `_legacy` indicates
# whether the modern `magick` binary is available or whether to fall back
# to the old-style `convert`, `identify`, etc. commands.
_version = None
_legacy = None
_version: tuple[int, int, int] | NotAvailable | None = None
_legacy: bool | None = None
@classmethod
def version(cls):
def version(cls) -> tuple[int, int, int]:
"""Obtain and cache ImageMagick version.
Raises `LocalBackendNotAvailableError` if not available.
@ -98,12 +208,17 @@ class IMBackend(LocalBackend):
)
cls._legacy = legacy
if cls._version is _NOT_AVAILABLE:
# cls._version is never None here, but mypy doesn't get that
if cls._version is _NOT_AVAILABLE or cls._version is None:
raise LocalBackendNotAvailableError()
else:
return cls._version
def __init__(self):
convert_cmd: list[str | bytes]
identify_cmd: list[str | bytes]
compare_cmd: list[str | bytes]
def __init__(self) -> None:
"""Initialize a wrapper around ImageMagick for local image operations.
Stores the ImageMagick version and legacy flag. If ImageMagick is not
@ -124,8 +239,13 @@ class IMBackend(LocalBackend):
self.compare_cmd = ["magick", "compare"]
def resize(
self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0
):
self,
maxwidth: int,
path_in: bytes,
path_out: bytes | None = None,
quality: int = 0,
max_filesize: int = 0,
) -> bytes:
"""Resize using ImageMagick.
Use the ``magick`` program or ``convert`` on older versions. Return
@ -145,7 +265,7 @@ class IMBackend(LocalBackend):
# with regards to the height.
# ImageMagick already seems to default to no interlace, but we include
# it here for the sake of explicitness.
cmd = self.convert_cmd + [
cmd: list[str | bytes] = self.convert_cmd + [
syspath(path_in, prefix=False),
"-resize",
f"{maxwidth}x>",
@ -174,8 +294,8 @@ class IMBackend(LocalBackend):
return path_out
def get_size(self, path_in):
cmd = self.identify_cmd + [
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
cmd: list[str | bytes] = self.identify_cmd + [
"-format",
"%w %h",
syspath(path_in, prefix=False),
@ -194,12 +314,22 @@ class IMBackend(LocalBackend):
)
return None
try:
return tuple(map(int, out.split(b" ")))
size = tuple(map(int, out.split(b" ")))
except IndexError:
log.warning("Could not understand IM output: {0!r}", out)
return None
def deinterlace(self, path_in, path_out=None):
if len(size) != 2:
log.warning("Could not understand IM output: {0!r}", out)
return None
return size
def deinterlace(
self,
path_in: bytes,
path_out: bytes | None = None,
) -> bytes:
if not path_out:
path_out = get_temp_filename(__name__, "deinterlace_IM_", path_in)
@ -217,16 +347,24 @@ class IMBackend(LocalBackend):
# FIXME: Should probably issue a warning?
return path_in
def get_format(self, filepath):
cmd = self.identify_cmd + ["-format", "%[magick]", syspath(filepath)]
def get_format(self, path_in: bytes) -> str | None:
cmd = self.identify_cmd + ["-format", "%[magick]", syspath(path_in)]
try:
return util.command_output(cmd).stdout
except subprocess.CalledProcessError:
# Image formats should really only be ASCII strings such as "PNG",
# if anything else is returned, something is off and we return
# None for safety.
return util.command_output(cmd).stdout.decode("ascii", "strict")
except (subprocess.CalledProcessError, UnicodeError):
# FIXME: Should probably issue a warning?
return None
def convert_format(self, source, target, deinterlaced):
def convert_format(
self,
source: bytes,
target: bytes,
deinterlaced: bool,
) -> bytes:
cmd = self.convert_cmd + [
syspath(source),
*(["-interlace", "none"] if deinterlaced else []),
@ -243,10 +381,15 @@ class IMBackend(LocalBackend):
return source
@property
def can_compare(self):
def can_compare(self) -> bool:
return self.version() > (6, 8, 7)
def compare(self, im1, im2, compare_threshold):
def compare(
self,
im1: bytes,
im2: bytes,
compare_threshold: float,
) -> bool | None:
is_windows = platform.system() == "Windows"
# Converting images to grayscale tends to minimize the weight
@ -286,6 +429,10 @@ class IMBackend(LocalBackend):
close_fds=not is_windows,
)
# help out mypy
assert convert_proc.stdout is not None
assert convert_proc.stderr is not None
# Check the convert output. We're not interested in the
# standard output; that gets piped to the next stage.
convert_proc.stdout.close()
@ -329,10 +476,10 @@ class IMBackend(LocalBackend):
return phash_diff <= compare_threshold
@property
def can_write_metadata(self):
def can_write_metadata(self) -> bool:
return True
def write_metadata(self, file, metadata):
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
assignments = list(
chain.from_iterable(("-set", k, v) for k, v in metadata.items())
)
@ -345,13 +492,13 @@ class PILBackend(LocalBackend):
NAME = "PIL"
@classmethod
def version(cls):
def version(cls) -> None:
try:
__import__("PIL", fromlist=["Image"])
except ImportError:
raise LocalBackendNotAvailableError()
def __init__(self):
def __init__(self) -> None:
"""Initialize a wrapper around PIL for local image operations.
If PIL is not available, raise an Exception.
@ -359,8 +506,13 @@ class PILBackend(LocalBackend):
self.version()
def resize(
self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0
):
self,
maxwidth: int,
path_in: bytes,
path_out: bytes | None = None,
quality: int = 0,
max_filesize: int = 0,
) -> bytes:
"""Resize using Python Imaging Library (PIL). Return the output path
of resized image.
"""
@ -429,7 +581,7 @@ class PILBackend(LocalBackend):
)
return path_in
def get_size(self, path_in):
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
from PIL import Image
try:
@ -441,7 +593,11 @@ class PILBackend(LocalBackend):
)
return None
def deinterlace(self, path_in, path_out=None):
def deinterlace(
self,
path_in: bytes,
path_out: bytes | None = None,
) -> bytes:
if not path_out:
path_out = get_temp_filename(__name__, "deinterlace_PIL_", path_in)
@ -455,11 +611,11 @@ class PILBackend(LocalBackend):
# FIXME: Should probably issue a warning?
return path_in
def get_format(self, filepath):
def get_format(self, path_in: bytes) -> str | None:
from PIL import Image, UnidentifiedImageError
try:
with Image.open(syspath(filepath)) as im:
with Image.open(syspath(path_in)) as im:
return im.format
except (
ValueError,
@ -467,10 +623,15 @@ class PILBackend(LocalBackend):
UnidentifiedImageError,
FileNotFoundError,
):
log.exception("failed to detect image format for {}", filepath)
log.exception("failed to detect image format for {}", path_in)
return None
def convert_format(self, source, target, deinterlaced):
def convert_format(
self,
source: bytes,
target: bytes,
deinterlaced: bool,
) -> bytes:
from PIL import Image, UnidentifiedImageError
try:
@ -488,18 +649,23 @@ class PILBackend(LocalBackend):
return source
@property
def can_compare(self):
def can_compare(self) -> bool:
return False
def compare(self, im1, im2, compare_threshold):
def compare(
self,
im1: bytes,
im2: bytes,
compare_threshold: float,
) -> bool | None:
# It is an error to call this when ArtResizer.can_compare is not True.
raise NotImplementedError()
@property
def can_write_metadata(self):
def can_write_metadata(self) -> bool:
return True
def write_metadata(self, file, metadata):
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
from PIL import Image, PngImagePlugin
# FIXME: Detect and handle other file types (currently, the only user
@ -507,38 +673,22 @@ class PILBackend(LocalBackend):
im = Image.open(syspath(file))
meta = PngImagePlugin.PngInfo()
for k, v in metadata.items():
meta.add_text(k, v, 0)
meta.add_text(k, v, zip=False)
im.save(os.fsdecode(file), "PNG", pnginfo=meta)
class Shareable(type):
"""A pseudo-singleton metaclass that allows both shared and
non-shared instances. The ``MyClass.shared`` property holds a
lazily-created shared instance of ``MyClass`` while calling
``MyClass()`` to construct a new object works as usual.
"""
def __init__(cls, name, bases, dict):
super().__init__(name, bases, dict)
cls._instance = None
@property
def shared(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
BACKEND_CLASSES = [
BACKEND_CLASSES: list[type[LocalBackend]] = [
IMBackend,
PILBackend,
]
class ArtResizer(metaclass=Shareable):
"""A singleton class that performs image resizes."""
class ArtResizer:
"""A class that dispatches image operations to an available backend."""
def __init__(self):
local_method: LocalBackend | None
def __init__(self) -> None:
"""Create a resizer object with an inferred method."""
# Check if a local backend is available, and store an instance of the
# backend class. Otherwise, fallback to the web proxy.
@ -550,25 +700,41 @@ class ArtResizer(metaclass=Shareable):
except LocalBackendNotAvailableError:
continue
else:
# FIXME: Turn WEBPROXY into a backend class as well to remove all
# the special casing. Then simply delegate all methods to the
# backends. (How does proxy_url fit in here, however?)
# Use an ABC (or maybe a typing Protocol?) for backend
# methods, such that both individual backends as well as
# ArtResizer implement it.
# It should probably be configurable which backends classes to
# consider, similar to fetchart or lyrics backends (i.e. a list
# of backends sorted by priority).
log.debug("artresizer: method is WEBPROXY")
self.local_method = None
shared: LazySharedInstance[ArtResizer] = LazySharedInstance()
@property
def method(self):
if self.local:
def method(self) -> str:
if self.local_method is not None:
return self.local_method.NAME
else:
return "WEBPROXY"
def resize(
self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0
):
self,
maxwidth: int,
path_in: bytes,
path_out: bytes | None = None,
quality: int = 0,
max_filesize: int = 0,
) -> bytes:
"""Manipulate an image file according to the method, returning a
new path. For PIL or IMAGEMAGIC methods, resizes the image to a
temporary file and encodes with the specified quality level.
For WEBPROXY, returns `path_in` unmodified.
"""
if self.local:
if self.local_method is not None:
return self.local_method.resize(
maxwidth,
path_in,
@ -580,18 +746,22 @@ class ArtResizer(metaclass=Shareable):
# Handled by `proxy_url` already.
return path_in
def deinterlace(self, path_in, path_out=None):
def deinterlace(
self,
path_in: bytes,
path_out: bytes | None = None,
) -> bytes:
"""Deinterlace an image.
Only available locally.
"""
if self.local:
if self.local_method is not None:
return self.local_method.deinterlace(path_in, path_out)
else:
# FIXME: Should probably issue a warning?
return path_in
def proxy_url(self, maxwidth, url, quality=0):
def proxy_url(self, maxwidth: int, url: str, quality: int = 0) -> str:
"""Modifies an image URL according the method, returning a new
URL. For WEBPROXY, a URL on the proxy server is returned.
Otherwise, the URL is returned unmodified.
@ -603,42 +773,48 @@ class ArtResizer(metaclass=Shareable):
return resize_url(url, maxwidth, quality)
@property
def local(self):
def local(self) -> bool:
"""A boolean indicating whether the resizing method is performed
locally (i.e., PIL or ImageMagick).
"""
return self.local_method is not None
def get_size(self, path_in):
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
"""Return the size of an image file as an int couple (width, height)
in pixels.
Only available locally.
"""
if self.local:
if self.local_method is not None:
return self.local_method.get_size(path_in)
else:
# FIXME: Should probably issue a warning?
return path_in
raise RuntimeError(
"image cannot be obtained without artresizer backend"
)
def get_format(self, path_in):
def get_format(self, path_in: bytes) -> str | None:
"""Returns the format of the image as a string.
Only available locally.
"""
if self.local:
if self.local_method is not None:
return self.local_method.get_format(path_in)
else:
# FIXME: Should probably issue a warning?
return None
def reformat(self, path_in, new_format, deinterlaced=True):
def reformat(
self,
path_in: bytes,
new_format: str,
deinterlaced: bool = True,
) -> bytes:
"""Converts image to desired format, updating its extension, but
keeping the same filename.
Only available locally.
"""
if not self.local:
if self.local_method is None:
# FIXME: Should probably issue a warning?
return path_in
@ -664,40 +840,45 @@ class ArtResizer(metaclass=Shareable):
return result_path
@property
def can_compare(self):
def can_compare(self) -> bool:
"""A boolean indicating whether image comparison is available"""
if self.local:
if self.local_method is not None:
return self.local_method.can_compare
else:
return False
def compare(self, im1, im2, compare_threshold):
def compare(
self,
im1: bytes,
im2: bytes,
compare_threshold: float,
) -> bool | None:
"""Return a boolean indicating whether two images are similar.
Only available locally.
"""
if self.local:
if self.local_method is not None:
return self.local_method.compare(im1, im2, compare_threshold)
else:
# FIXME: Should probably issue a warning?
return None
@property
def can_write_metadata(self):
def can_write_metadata(self) -> bool:
"""A boolean indicating whether writing image metadata is supported."""
if self.local:
if self.local_method is not None:
return self.local_method.can_write_metadata
else:
return False
def write_metadata(self, file, metadata):
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
"""Write key-value metadata to the image file.
Only available locally. Currently, expects the image to be a PNG file.
"""
if self.local:
if self.local_method is not None:
self.local_method.write_metadata(file, metadata)
else:
# FIXME: Should probably issue a warning?