Merge pull request #4313 from wisp3rwind/pr_artresizer_oo

Refactor: ArtResizer backends are classes
This commit is contained in:
Benedikt 2022-03-23 23:05:43 +01:00 committed by GitHub
commit 1167453ed8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 537 additions and 483 deletions

View file

@ -16,6 +16,7 @@
public resizing proxy if neither is available.
"""
from itertools import chain
import subprocess
import os
import os.path
@ -27,11 +28,6 @@ from beets import logging
from beets import util
from beets.util import bytestring_path, displayable_path, py3_path, syspath
# Resizing methods
PIL = 1
IMAGEMAGICK = 2
WEBPROXY = 3
PROXY_URL = 'https://images.weserv.nl/'
log = logging.getLogger('beets')
@ -61,327 +57,413 @@ def temp_file_for(path):
return bytestring_path(f.name)
def pil_resize(artresizer, maxwidth, path_in, path_out=None, quality=0,
max_filesize=0):
"""Resize using Python Imaging Library (PIL). Return the output path
of resized image.
"""
path_out = path_out or temp_file_for(path_in)
from PIL import Image
class LocalBackendNotAvailableError(Exception):
pass
log.debug('artresizer: PIL resizing {0} to {1}',
displayable_path(path_in), displayable_path(path_out))
try:
im = Image.open(syspath(path_in))
size = maxwidth, maxwidth
im.thumbnail(size, Image.ANTIALIAS)
_NOT_AVAILABLE = object()
if quality == 0:
# Use PIL's default quality.
quality = -1
# progressive=False only affects JPEGs and is the default,
# but we include it here for explicitness.
im.save(py3_path(path_out), quality=quality, progressive=False)
class LocalBackend:
@classmethod
def available(cls):
try:
cls.version()
return True
except LocalBackendNotAvailableError:
return False
if max_filesize > 0:
# If maximum filesize is set, we attempt to lower the quality of
# jpeg conversion by a proportional amount, up to 3 attempts
# First, set the maximum quality to either provided, or 95
if quality > 0:
lower_qual = quality
else:
lower_qual = 95
for i in range(5):
# 5 attempts is an abitrary choice
filesize = os.stat(syspath(path_out)).st_size
log.debug("PIL Pass {0} : Output size: {1}B", i, filesize)
if filesize <= max_filesize:
return path_out
# The relationship between filesize & quality will be
# image dependent.
lower_qual -= 10
# Restrict quality dropping below 10
if lower_qual < 10:
lower_qual = 10
# Use optimize flag to improve filesize decrease
im.save(py3_path(path_out), quality=lower_qual,
optimize=True, progressive=False)
log.warning("PIL Failed to resize file to below {0}B",
max_filesize)
return path_out
class IMBackend(LocalBackend):
NAME = "ImageMagick"
# These fields are used as a cache for `version()`. `_legacy` indicates
# whether the modern `magick` binary is available or whether to fall back
# to the old-style `convert`, `identify`, etc. commands.
_version = None
_legacy = None
@classmethod
def version(cls):
"""Obtain and cache ImageMagick version.
Raises `LocalBackendNotAvailableError` if not available.
"""
if cls._version is None:
for cmd_name, legacy in (('magick', False), ('convert', True)):
try:
out = util.command_output([cmd_name, "--version"]).stdout
except (subprocess.CalledProcessError, OSError) as exc:
log.debug('ImageMagick version check failed: {}', exc)
cls._version = _NOT_AVAILABLE
else:
if b'imagemagick' in out.lower():
pattern = br".+ (\d+)\.(\d+)\.(\d+).*"
match = re.search(pattern, out)
if match:
cls._version = (int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
cls._legacy = legacy
if cls._version is _NOT_AVAILABLE:
raise LocalBackendNotAvailableError()
else:
return path_out
except OSError:
log.error("PIL cannot create thumbnail for '{0}'",
displayable_path(path_in))
return path_in
return cls._version
def __init__(self):
"""Initialize a wrapper around ImageMagick for local image operations.
def im_resize(artresizer, maxwidth, path_in, path_out=None, quality=0,
max_filesize=0):
"""Resize using ImageMagick.
Stores the ImageMagick version and legacy flag. If ImageMagick is not
available, raise an Exception.
"""
self.version()
Use the ``magick`` program or ``convert`` on older versions. Return
the output path of resized image.
"""
path_out = path_out or temp_file_for(path_in)
log.debug('artresizer: ImageMagick resizing {0} to {1}',
displayable_path(path_in), displayable_path(path_out))
# Use ImageMagick's magick binary when it's available.
# If it's not, fall back to the older, separate convert
# and identify commands.
if self._legacy:
self.convert_cmd = ['convert']
self.identify_cmd = ['identify']
self.compare_cmd = ['compare']
else:
self.convert_cmd = ['magick']
self.identify_cmd = ['magick', 'identify']
self.compare_cmd = ['magick', 'compare']
# "-resize WIDTHx>" shrinks images with the width larger
# than the given width while maintaining the aspect ratio
# with regards to the height.
# ImageMagick already seems to default to no interlace, but we include it
# here for the sake of explicitness.
cmd = artresizer.im_convert_cmd + [
syspath(path_in, prefix=False),
'-resize', f'{maxwidth}x>',
'-interlace', 'none',
]
def resize(self, maxwidth, path_in, path_out=None, quality=0,
max_filesize=0):
"""Resize using ImageMagick.
if quality > 0:
cmd += ['-quality', f'{quality}']
Use the ``magick`` program or ``convert`` on older versions. Return
the output path of resized image.
"""
path_out = path_out or temp_file_for(path_in)
log.debug('artresizer: ImageMagick resizing {0} to {1}',
displayable_path(path_in), displayable_path(path_out))
# "-define jpeg:extent=SIZEb" sets the target filesize for imagemagick to
# SIZE in bytes.
if max_filesize > 0:
cmd += ['-define', f'jpeg:extent={max_filesize}b']
# "-resize WIDTHx>" shrinks images with the width larger
# than the given width while maintaining the aspect ratio
# with regards to the height.
# ImageMagick already seems to default to no interlace, but we include
# it here for the sake of explicitness.
cmd = self.convert_cmd + [
syspath(path_in, prefix=False),
'-resize', f'{maxwidth}x>',
'-interlace', 'none',
]
cmd.append(syspath(path_out, prefix=False))
if quality > 0:
cmd += ['-quality', f'{quality}']
try:
util.command_output(cmd)
except subprocess.CalledProcessError:
log.warning('artresizer: IM convert failed for {0}',
displayable_path(path_in))
return path_in
# "-define jpeg:extent=SIZEb" sets the target filesize for imagemagick
# to SIZE in bytes.
if max_filesize > 0:
cmd += ['-define', f'jpeg:extent={max_filesize}b']
return path_out
cmd.append(syspath(path_out, prefix=False))
try:
util.command_output(cmd)
except subprocess.CalledProcessError:
log.warning('artresizer: IM convert failed for {0}',
displayable_path(path_in))
return path_in
BACKEND_FUNCS = {
PIL: pil_resize,
IMAGEMAGICK: im_resize,
}
def pil_getsize(artresizer, path_in):
from PIL import Image
try:
im = Image.open(syspath(path_in))
return im.size
except OSError as exc:
log.error("PIL could not read file {}: {}",
displayable_path(path_in), exc)
return None
def im_getsize(artresizer, path_in):
cmd = artresizer.im_identify_cmd + \
['-format', '%w %h', syspath(path_in, prefix=False)]
try:
out = util.command_output(cmd).stdout
except subprocess.CalledProcessError as exc:
log.warning('ImageMagick size query failed')
log.debug(
'`convert` exited with (status {}) when '
'getting size with command {}:\n{}',
exc.returncode, cmd, exc.output.strip()
)
return None
try:
return tuple(map(int, out.split(b' ')))
except IndexError:
log.warning('Could not understand IM output: {0!r}', out)
return None
BACKEND_GET_SIZE = {
PIL: pil_getsize,
IMAGEMAGICK: im_getsize,
}
def pil_deinterlace(artresizer, path_in, path_out=None):
path_out = path_out or temp_file_for(path_in)
from PIL import Image
try:
im = Image.open(syspath(path_in))
im.save(py3_path(path_out), progressive=False)
return path_out
except IOError:
return path_in
def get_size(self, path_in):
cmd = self.identify_cmd + [
'-format', '%w %h', syspath(path_in, prefix=False)
]
def im_deinterlace(artresizer, path_in, path_out=None):
path_out = path_out or temp_file_for(path_in)
cmd = artresizer.im_convert_cmd + [
syspath(path_in, prefix=False),
'-interlace', 'none',
syspath(path_out, prefix=False),
]
try:
util.command_output(cmd)
return path_out
except subprocess.CalledProcessError:
return path_in
DEINTERLACE_FUNCS = {
PIL: pil_deinterlace,
IMAGEMAGICK: im_deinterlace,
}
def im_get_format(artresizer, filepath):
cmd = artresizer.im_identify_cmd + [
'-format', '%[magick]',
syspath(filepath)
]
try:
return util.command_output(cmd).stdout
except subprocess.CalledProcessError:
return None
def pil_get_format(artresizer, filepath):
from PIL import Image, UnidentifiedImageError
try:
with Image.open(syspath(filepath)) as im:
return im.format
except (ValueError, TypeError, UnidentifiedImageError, FileNotFoundError):
log.exception("failed to detect image format for {}", filepath)
return None
BACKEND_GET_FORMAT = {
PIL: pil_get_format,
IMAGEMAGICK: im_get_format,
}
def im_convert_format(artresizer, source, target, deinterlaced):
cmd = artresizer.im_convert_cmd + [
syspath(source),
*(["-interlace", "none"] if deinterlaced else []),
syspath(target),
]
try:
subprocess.check_call(
cmd,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL
)
return target
except subprocess.CalledProcessError:
return source
def pil_convert_format(artresizer, source, target, deinterlaced):
from PIL import Image, UnidentifiedImageError
try:
with Image.open(syspath(source)) as im:
im.save(py3_path(target), progressive=not deinterlaced)
return target
except (ValueError, TypeError, UnidentifiedImageError, FileNotFoundError,
OSError):
log.exception("failed to convert image {} -> {}", source, target)
return source
BACKEND_CONVERT_IMAGE_FORMAT = {
PIL: pil_convert_format,
IMAGEMAGICK: im_convert_format,
}
def im_compare(artresizer, im1, im2, compare_threshold):
is_windows = platform.system() == "Windows"
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score. So we first convert both images
# to grayscale and then pipe them into the `compare` command.
# On Windows, ImageMagick doesn't support the magic \\?\ prefix
# on paths, so we pass `prefix=False` to `syspath`.
convert_cmd = artresizer.im_convert_cmd + [
syspath(im2, prefix=False), syspath(im1, prefix=False),
'-colorspace', 'gray', 'MIFF:-'
]
compare_cmd = artresizer.im_compare_cmd + [
'-metric', 'PHASH', '-', 'null:',
]
log.debug('comparing images with pipeline {} | {}',
convert_cmd, compare_cmd)
convert_proc = subprocess.Popen(
convert_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
compare_proc = subprocess.Popen(
compare_cmd,
stdin=convert_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
# Check the convert output. We're not interested in the
# standard output; that gets piped to the next stage.
convert_proc.stdout.close()
convert_stderr = convert_proc.stderr.read()
convert_proc.stderr.close()
convert_proc.wait()
if convert_proc.returncode:
log.debug(
'ImageMagick convert failed with status {}: {!r}',
convert_proc.returncode,
convert_stderr,
)
return None
# Check the compare output.
stdout, stderr = compare_proc.communicate()
if compare_proc.returncode:
if compare_proc.returncode != 1:
log.debug('ImageMagick compare failed: {0}, {1}',
displayable_path(im2), displayable_path(im1))
try:
out = util.command_output(cmd).stdout
except subprocess.CalledProcessError as exc:
log.warning('ImageMagick size query failed')
log.debug(
'`convert` exited with (status {}) when '
'getting size with command {}:\n{}',
exc.returncode, cmd, exc.output.strip()
)
return None
try:
return tuple(map(int, out.split(b' ')))
except IndexError:
log.warning('Could not understand IM output: {0!r}', out)
return None
out_str = stderr
else:
out_str = stdout
try:
phash_diff = float(out_str)
except ValueError:
log.debug('IM output is not a number: {0!r}', out_str)
return None
def deinterlace(self, path_in, path_out=None):
path_out = path_out or temp_file_for(path_in)
log.debug('ImageMagick compare score: {0}', phash_diff)
return phash_diff <= compare_threshold
cmd = self.convert_cmd + [
syspath(path_in, prefix=False),
'-interlace', 'none',
syspath(path_out, prefix=False),
]
try:
util.command_output(cmd)
return path_out
except subprocess.CalledProcessError:
# FIXME: Should probably issue a warning?
return path_in
def get_format(self, filepath):
cmd = self.identify_cmd + [
'-format', '%[magick]',
syspath(filepath)
]
try:
return util.command_output(cmd).stdout
except subprocess.CalledProcessError:
# FIXME: Should probably issue a warning?
return None
def convert_format(self, source, target, deinterlaced):
cmd = self.convert_cmd + [
syspath(source),
*(["-interlace", "none"] if deinterlaced else []),
syspath(target),
]
try:
subprocess.check_call(
cmd,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL
)
return target
except subprocess.CalledProcessError:
# FIXME: Should probably issue a warning?
return source
@property
def can_compare(self):
return self.version() > (6, 8, 7)
def compare(self, im1, im2, compare_threshold):
is_windows = platform.system() == "Windows"
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score. So we first convert both images
# to grayscale and then pipe them into the `compare` command.
# On Windows, ImageMagick doesn't support the magic \\?\ prefix
# on paths, so we pass `prefix=False` to `syspath`.
convert_cmd = self.convert_cmd + [
syspath(im2, prefix=False), syspath(im1, prefix=False),
'-colorspace', 'gray', 'MIFF:-'
]
compare_cmd = self.compare_cmd + [
'-metric', 'PHASH', '-', 'null:',
]
log.debug('comparing images with pipeline {} | {}',
convert_cmd, compare_cmd)
convert_proc = subprocess.Popen(
convert_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
compare_proc = subprocess.Popen(
compare_cmd,
stdin=convert_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
# Check the convert output. We're not interested in the
# standard output; that gets piped to the next stage.
convert_proc.stdout.close()
convert_stderr = convert_proc.stderr.read()
convert_proc.stderr.close()
convert_proc.wait()
if convert_proc.returncode:
log.debug(
'ImageMagick convert failed with status {}: {!r}',
convert_proc.returncode,
convert_stderr,
)
return None
# Check the compare output.
stdout, stderr = compare_proc.communicate()
if compare_proc.returncode:
if compare_proc.returncode != 1:
log.debug('ImageMagick compare failed: {0}, {1}',
displayable_path(im2), displayable_path(im1))
return None
out_str = stderr
else:
out_str = stdout
try:
phash_diff = float(out_str)
except ValueError:
log.debug('IM output is not a number: {0!r}', out_str)
return None
log.debug('ImageMagick compare score: {0}', phash_diff)
return phash_diff <= compare_threshold
@property
def can_write_metadata(self):
return True
def write_metadata(self, file, metadata):
assignments = list(chain.from_iterable(
('-set', k, v) for k, v in metadata.items()
))
command = self.convert_cmd + [file, *assignments, file]
util.command_output(command)
def pil_compare(artresizer, im1, im2, compare_threshold):
# It is an error to call this when ArtResizer.can_compare is not True.
raise NotImplementedError()
class PILBackend(LocalBackend):
NAME = "PIL"
@classmethod
def version(cls):
try:
__import__('PIL', fromlist=['Image'])
except ImportError:
raise LocalBackendNotAvailableError()
BACKEND_COMPARE = {
PIL: pil_compare,
IMAGEMAGICK: im_compare,
}
def __init__(self):
"""Initialize a wrapper around PIL for local image operations.
If PIL is not available, raise an Exception.
"""
self.version()
def resize(self, maxwidth, path_in, path_out=None, quality=0,
max_filesize=0):
"""Resize using Python Imaging Library (PIL). Return the output path
of resized image.
"""
path_out = path_out or temp_file_for(path_in)
from PIL import Image
log.debug('artresizer: PIL resizing {0} to {1}',
displayable_path(path_in), displayable_path(path_out))
try:
im = Image.open(syspath(path_in))
size = maxwidth, maxwidth
im.thumbnail(size, Image.ANTIALIAS)
if quality == 0:
# Use PIL's default quality.
quality = -1
# progressive=False only affects JPEGs and is the default,
# but we include it here for explicitness.
im.save(py3_path(path_out), quality=quality, progressive=False)
if max_filesize > 0:
# If maximum filesize is set, we attempt to lower the quality
# of jpeg conversion by a proportional amount, up to 3 attempts
# First, set the maximum quality to either provided, or 95
if quality > 0:
lower_qual = quality
else:
lower_qual = 95
for i in range(5):
# 5 attempts is an abitrary choice
filesize = os.stat(syspath(path_out)).st_size
log.debug("PIL Pass {0} : Output size: {1}B", i, filesize)
if filesize <= max_filesize:
return path_out
# The relationship between filesize & quality will be
# image dependent.
lower_qual -= 10
# Restrict quality dropping below 10
if lower_qual < 10:
lower_qual = 10
# Use optimize flag to improve filesize decrease
im.save(py3_path(path_out), quality=lower_qual,
optimize=True, progressive=False)
log.warning("PIL Failed to resize file to below {0}B",
max_filesize)
return path_out
else:
return path_out
except OSError:
log.error("PIL cannot create thumbnail for '{0}'",
displayable_path(path_in))
return path_in
def get_size(self, path_in):
from PIL import Image
try:
im = Image.open(syspath(path_in))
return im.size
except OSError as exc:
log.error("PIL could not read file {}: {}",
displayable_path(path_in), exc)
return None
def deinterlace(self, path_in, path_out=None):
path_out = path_out or temp_file_for(path_in)
from PIL import Image
try:
im = Image.open(syspath(path_in))
im.save(py3_path(path_out), progressive=False)
return path_out
except IOError:
# FIXME: Should probably issue a warning?
return path_in
def get_format(self, filepath):
from PIL import Image, UnidentifiedImageError
try:
with Image.open(syspath(filepath)) as im:
return im.format
except (ValueError, TypeError, UnidentifiedImageError,
FileNotFoundError):
log.exception("failed to detect image format for {}", filepath)
return None
def convert_format(self, source, target, deinterlaced):
from PIL import Image, UnidentifiedImageError
try:
with Image.open(syspath(source)) as im:
im.save(py3_path(target), progressive=not deinterlaced)
return target
except (ValueError, TypeError, UnidentifiedImageError,
FileNotFoundError, OSError):
log.exception("failed to convert image {} -> {}", source, target)
return source
@property
def can_compare(self):
return False
def compare(self, im1, im2, compare_threshold):
# It is an error to call this when ArtResizer.can_compare is not True.
raise NotImplementedError()
@property
def can_write_metadata(self):
return True
def write_metadata(self, file, metadata):
from PIL import Image, PngImagePlugin
# FIXME: Detect and handle other file types (currently, the only user
# is the thumbnails plugin, which generates PNG images).
im = Image.open(file)
meta = PngImagePlugin.PngInfo()
for k, v in metadata.items():
meta.add_text(k, v, 0)
im.save(file, "PNG", pnginfo=meta)
class Shareable(type):
@ -402,6 +484,12 @@ class Shareable(type):
return cls._instance
BACKEND_CLASSES = [
IMBackend,
PILBackend,
]
class ArtResizer(metaclass=Shareable):
"""A singleton class that performs image resizes.
"""
@ -409,22 +497,25 @@ class ArtResizer(metaclass=Shareable):
def __init__(self):
"""Create a resizer object with an inferred method.
"""
self.method = self._check_method()
log.debug("artresizer: method is {0}", self.method)
# Check if a local backend is availabe, and store an instance of the
# backend class. Otherwise, fallback to the web proxy.
for backend_cls in BACKEND_CLASSES:
try:
self.local_method = backend_cls()
log.debug(f"artresizer: method is {self.local_method.NAME}")
break
except LocalBackendNotAvailableError:
continue
else:
log.debug("artresizer: method is WEBPROXY")
self.local_method = None
# Use ImageMagick's magick binary when it's available. If it's
# not, fall back to the older, separate convert and identify
# commands.
if self.method[0] == IMAGEMAGICK:
self.im_legacy = self.method[2]
if self.im_legacy:
self.im_convert_cmd = ['convert']
self.im_identify_cmd = ['identify']
self.im_compare_cmd = ['compare']
else:
self.im_convert_cmd = ['magick']
self.im_identify_cmd = ['magick', 'identify']
self.im_compare_cmd = ['magick', 'compare']
@property
def method(self):
if self.local:
return self.local_method.NAME
else:
return "WEBPROXY"
def resize(
self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0
@ -435,17 +526,23 @@ class ArtResizer(metaclass=Shareable):
For WEBPROXY, returns `path_in` unmodified.
"""
if self.local:
func = BACKEND_FUNCS[self.method[0]]
return func(self, maxwidth, path_in, path_out,
quality=quality, max_filesize=max_filesize)
return self.local_method.resize(
maxwidth, path_in, path_out,
quality=quality, max_filesize=max_filesize
)
else:
# Handled by `proxy_url` already.
return path_in
def deinterlace(self, path_in, path_out=None):
"""Deinterlace an image.
Only available locally.
"""
if self.local:
func = DEINTERLACE_FUNCS[self.method[0]]
return func(self, path_in, path_out)
return self.local_method.deinterlace(path_in, path_out)
else:
# FIXME: Should probably issue a warning?
return path_in
def proxy_url(self, maxwidth, url, quality=0):
@ -454,6 +551,7 @@ class ArtResizer(metaclass=Shareable):
Otherwise, the URL is returned unmodified.
"""
if self.local:
# Going to be handled by `resize()`.
return url
else:
return resize_url(url, maxwidth, quality)
@ -463,7 +561,7 @@ class ArtResizer(metaclass=Shareable):
"""A boolean indicating whether the resizing method is performed
locally (i.e., PIL or ImageMagick).
"""
return self.method[0] in BACKEND_FUNCS
return self.local_method is not None
def get_size(self, path_in):
"""Return the size of an image file as an int couple (width, height)
@ -472,9 +570,10 @@ class ArtResizer(metaclass=Shareable):
Only available locally.
"""
if self.local:
func = BACKEND_GET_SIZE[self.method[0]]
return func(self, path_in)
return None
return self.local_method.get_size(path_in)
else:
# FIXME: Should probably issue a warning?
return path_in
def get_format(self, path_in):
"""Returns the format of the image as a string.
@ -482,9 +581,10 @@ class ArtResizer(metaclass=Shareable):
Only available locally.
"""
if self.local:
func = BACKEND_GET_FORMAT[self.method[0]]
return func(self, path_in)
return None
return self.local_method.get_format(path_in)
else:
# FIXME: Should probably issue a warning?
return None
def reformat(self, path_in, new_format, deinterlaced=True):
"""Converts image to desired format, updating its extension, but
@ -493,6 +593,7 @@ class ArtResizer(metaclass=Shareable):
Only available locally.
"""
if not self.local:
# FIXME: Should probably issue a warning?
return path_in
new_format = new_format.lower()
@ -503,13 +604,14 @@ class ArtResizer(metaclass=Shareable):
fname, ext = os.path.splitext(path_in)
path_new = fname + b'.' + new_format.encode('utf8')
func = BACKEND_CONVERT_IMAGE_FORMAT[self.method[0]]
# allows the exception to propagate, while still making sure a changed
# file path was removed
result_path = path_in
try:
result_path = func(self, path_in, path_new, deinterlaced)
result_path = self.local_method.convert_format(
path_in, path_new, deinterlaced
)
finally:
if result_path != path_in:
os.unlink(path_in)
@ -519,7 +621,10 @@ class ArtResizer(metaclass=Shareable):
def can_compare(self):
"""A boolean indicating whether image comparison is available"""
return self.method[0] == IMAGEMAGICK and self.method[1] > (6, 8, 7)
if self.local:
return self.local_method.can_compare
else:
return False
def compare(self, im1, im2, compare_threshold):
"""Return a boolean indicating whether two images are similar.
@ -527,63 +632,27 @@ class ArtResizer(metaclass=Shareable):
Only available locally.
"""
if self.local:
func = BACKEND_COMPARE[self.method[0]]
return func(self, im1, im2, compare_threshold)
return None
@staticmethod
def _check_method():
"""Return a tuple indicating an available method and its version.
The result has at least two elements:
- The method, eitehr WEBPROXY, PIL, or IMAGEMAGICK.
- The version.
If the method is IMAGEMAGICK, there is also a third element: a
bool flag indicating whether to use the `magick` binary or
legacy single-purpose executables (`convert`, `identify`, etc.)
"""
version = get_im_version()
if version:
version, legacy = version
return IMAGEMAGICK, version, legacy
version = get_pil_version()
if version:
return PIL, version
return WEBPROXY, (0)
def get_im_version():
"""Get the ImageMagick version and legacy flag as a pair. Or return
None if ImageMagick is not available.
"""
for cmd_name, legacy in ((['magick'], False), (['convert'], True)):
cmd = cmd_name + ['--version']
try:
out = util.command_output(cmd).stdout
except (subprocess.CalledProcessError, OSError) as exc:
log.debug('ImageMagick version check failed: {}', exc)
return self.local_method.compare(im1, im2, compare_threshold)
else:
if b'imagemagick' in out.lower():
pattern = br".+ (\d+)\.(\d+)\.(\d+).*"
match = re.search(pattern, out)
if match:
version = (int(match.group(1)),
int(match.group(2)),
int(match.group(3)))
return version, legacy
# FIXME: Should probably issue a warning?
return None
return None
@property
def can_write_metadata(self):
"""A boolean indicating whether writing image metadata is supported."""
if self.local:
return self.local_method.can_write_metadata
else:
return False
def get_pil_version():
"""Get the PIL/Pillow version, or None if it is unavailable.
"""
try:
__import__('PIL', fromlist=['Image'])
return (0,)
except ImportError:
return None
def write_metadata(self, file, metadata):
"""Write key-value metadata to the image file.
Only available locally. Currently, expects the image to be a PNG file.
"""
if self.local:
self.local_method.write_metadata(file, metadata)
else:
# FIXME: Should probably issue a warning?
pass

View file

@ -22,7 +22,6 @@ Spec: standards.freedesktop.org/thumbnail-spec/latest/index.html
from hashlib import md5
import os
import shutil
from itertools import chain
from pathlib import PurePosixPath
import ctypes
import ctypes.util
@ -32,7 +31,7 @@ from xdg import BaseDirectory
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs
from beets import util
from beets.util.artresizer import ArtResizer, get_im_version, get_pil_version
from beets.util.artresizer import ArtResizer
BASE_DIR = os.path.join(BaseDirectory.xdg_cache_home, "thumbnails")
@ -49,7 +48,6 @@ class ThumbnailsPlugin(BeetsPlugin):
'dolphin': False,
})
self.write_metadata = None
if self.config['auto'] and self._check_local_ok():
self.register_listener('art_set', self.process_album)
@ -90,14 +88,12 @@ class ThumbnailsPlugin(BeetsPlugin):
if not os.path.exists(dir):
os.makedirs(dir)
if get_im_version():
self.write_metadata = write_metadata_im
tool = "IM"
else:
assert get_pil_version() # since we're local
self.write_metadata = write_metadata_pil
tool = "PIL"
self._log.debug("using {0} to write metadata", tool)
if not ArtResizer.shared.can_write_metadata:
raise RuntimeError(
f"Thumbnails: ArtResizer backend {ArtResizer.shared.method}"
f" unexpectedly cannot write image metadata."
)
self._log.debug(f"using {ArtResizer.shared.method} to write metadata")
uri_getter = GioURI()
if not uri_getter.available:
@ -171,7 +167,7 @@ class ThumbnailsPlugin(BeetsPlugin):
metadata = {"Thumb::URI": self.get_uri(album.artpath),
"Thumb::MTime": str(mtime)}
try:
self.write_metadata(image_path, metadata)
ArtResizer.shared.write_metadata(image_path, metadata)
except Exception:
self._log.exception("could not write metadata to {0}",
util.displayable_path(image_path))
@ -188,26 +184,6 @@ class ThumbnailsPlugin(BeetsPlugin):
self._log.debug("Wrote file {0}", util.displayable_path(outfilename))
def write_metadata_im(file, metadata):
"""Enrich the file metadata with `metadata` dict thanks to IM."""
command = ['convert', file] + \
list(chain.from_iterable(('-set', k, v)
for k, v in metadata.items())) + [file]
util.command_output(command)
return True
def write_metadata_pil(file, metadata):
"""Enrich the file metadata with `metadata` dict thanks to PIL."""
from PIL import Image, PngImagePlugin
im = Image.open(file)
meta = PngImagePlugin.PngInfo()
for k, v in metadata.items():
meta.add_text(k, v, 0)
im.save(file, "PNG", pnginfo=meta)
return True
class URIGetter:
available = False
name = "Abstract base"

View file

@ -31,7 +31,7 @@ from beets import library
from beets import importer
from beets import logging
from beets import util
from beets.util.artresizer import ArtResizer, WEBPROXY
from beets.util.artresizer import ArtResizer
import confuse
@ -787,7 +787,7 @@ class ArtForAlbumTest(UseThePlugin):
"""Skip the test if the art resizer doesn't have ImageMagick or
PIL (so comparisons and measurements are unavailable).
"""
if ArtResizer.shared.method[0] == WEBPROXY:
if not ArtResizer.shared.local:
self.skipTest("ArtResizer has no local imaging backend available")
def test_respect_minwidth(self):

View file

@ -16,20 +16,36 @@
import unittest
from unittest.mock import patch
import os
from test import _common
from test.helper import TestHelper
from beets.util import command_output, syspath
from beets.util.artresizer import (
pil_resize,
im_resize,
get_im_version,
get_pil_version,
pil_deinterlace,
im_deinterlace,
ArtResizer,
)
from beets.util.artresizer import IMBackend, PILBackend
class DummyIMBackend(IMBackend):
"""An `IMBackend` which pretends that ImageMagick is available.
The version is sufficiently recent to support image comparison.
"""
def __init__(self):
"""Init a dummy backend class for mocked ImageMagick tests."""
self.version = (7, 0, 0)
self.legacy = False
self.convert_cmd = ['magick']
self.identify_cmd = ['magick', 'identify']
self.compare_cmd = ['magick', 'compare']
class DummyPILBackend(PILBackend):
"""An `PILBackend` which pretends that PIL is available."""
def __init__(self):
"""Init a dummy backend class for mocked PIL tests."""
pass
class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
@ -46,11 +62,10 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
"""Called after each test, unloading all plugins."""
self.teardown_beets()
def _test_img_resize(self, resize_func):
def _test_img_resize(self, backend):
"""Test resizing based on file size, given a resize_func."""
# Check quality setting unaffected by new parameter
im_95_qual = resize_func(
ArtResizer.shared,
im_95_qual = backend.resize(
225,
self.IMG_225x225,
quality=95,
@ -60,8 +75,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
self.assertExists(im_95_qual)
# Attempt a lower filesize with same quality
im_a = resize_func(
ArtResizer.shared,
im_a = backend.resize(
225,
self.IMG_225x225,
quality=95,
@ -73,8 +87,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
os.stat(syspath(im_95_qual)).st_size)
# Attempt with lower initial quality
im_75_qual = resize_func(
ArtResizer.shared,
im_75_qual = backend.resize(
225,
self.IMG_225x225,
quality=75,
@ -82,8 +95,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
)
self.assertExists(im_75_qual)
im_b = resize_func(
ArtResizer.shared,
im_b = backend.resize(
225,
self.IMG_225x225,
quality=95,
@ -94,42 +106,56 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper):
self.assertLess(os.stat(syspath(im_b)).st_size,
os.stat(syspath(im_75_qual)).st_size)
@unittest.skipUnless(get_pil_version(), "PIL not available")
@unittest.skipUnless(PILBackend.available(), "PIL not available")
def test_pil_file_resize(self):
"""Test PIL resize function is lowering file size."""
self._test_img_resize(pil_resize)
self._test_img_resize(PILBackend())
@unittest.skipUnless(get_im_version(), "ImageMagick not available")
@unittest.skipUnless(IMBackend.available(), "ImageMagick not available")
def test_im_file_resize(self):
"""Test IM resize function is lowering file size."""
self._test_img_resize(im_resize)
self._test_img_resize(IMBackend())
@unittest.skipUnless(get_pil_version(), "PIL not available")
@unittest.skipUnless(PILBackend.available(), "PIL not available")
def test_pil_file_deinterlace(self):
"""Test PIL deinterlace function.
Check if pil_deinterlace function returns images
Check if the `PILBackend.deinterlace()` function returns images
that are non-progressive
"""
path = pil_deinterlace(ArtResizer.shared, self.IMG_225x225)
path = PILBackend().deinterlace(self.IMG_225x225)
from PIL import Image
with Image.open(path) as img:
self.assertFalse('progression' in img.info)
@unittest.skipUnless(get_im_version(), "ImageMagick not available")
@unittest.skipUnless(IMBackend.available(), "ImageMagick not available")
def test_im_file_deinterlace(self):
"""Test ImageMagick deinterlace function.
Check if im_deinterlace function returns images
Check if the `IMBackend.deinterlace()` function returns images
that are non-progressive.
"""
path = im_deinterlace(ArtResizer.shared, self.IMG_225x225)
cmd = ArtResizer.shared.im_identify_cmd + [
im = IMBackend()
path = im.deinterlace(self.IMG_225x225)
cmd = im.identify_cmd + [
'-format', '%[interlace]', syspath(path, prefix=False),
]
out = command_output(cmd).stdout
self.assertTrue(out == b'None')
@patch('beets.util.artresizer.util')
def test_write_metadata_im(self, mock_util):
"""Test writing image metadata."""
metadata = {"a": "A", "b": "B"}
im = DummyIMBackend()
im.write_metadata("foo", metadata)
try:
command = im.convert_cmd + "foo -set a A -set b B foo".split()
mock_util.command_output.assert_called_once_with(command)
except AssertionError:
command = im.convert_cmd + "foo -set b B -set a A foo".split()
mock_util.command_output.assert_called_once_with(command)
def suite():
"""Run this suite of tests."""

View file

@ -21,10 +21,11 @@ import unittest
from test import _common
from test.helper import TestHelper
from test.test_art_resize import DummyIMBackend
from mediafile import MediaFile
from beets import config, logging, ui
from beets.util import artresizer, syspath, displayable_path
from beets.util import syspath, displayable_path
from beets.util.artresizer import ArtResizer
from beets import art
@ -220,9 +221,8 @@ class DummyArtResizer(ArtResizer):
"""An `ArtResizer` which pretends that ImageMagick is available, and has
a sufficiently recent version to support image comparison.
"""
@staticmethod
def _check_method():
return artresizer.IMAGEMAGICK, (7, 0, 0), True
def __init__(self):
self.local_method = DummyIMBackend()
@patch('beets.util.artresizer.subprocess')

View file

@ -23,7 +23,6 @@ from test.helper import TestHelper
from beets.util import bytestring_path
from beetsplug.thumbnails import (ThumbnailsPlugin, NORMAL_DIR, LARGE_DIR,
write_metadata_im, write_metadata_pil,
PathlibURI, GioURI)
@ -34,22 +33,11 @@ class ThumbnailsTest(unittest.TestCase, TestHelper):
def tearDown(self):
self.teardown_beets()
@patch('beetsplug.thumbnails.util')
def test_write_metadata_im(self, mock_util):
metadata = {"a": "A", "b": "B"}
write_metadata_im("foo", metadata)
try:
command = "convert foo -set a A -set b B foo".split(' ')
mock_util.command_output.assert_called_once_with(command)
except AssertionError:
command = "convert foo -set b B -set a A foo".split(' ')
mock_util.command_output.assert_called_once_with(command)
@patch('beetsplug.thumbnails.ArtResizer')
@patch('beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok')
@patch('beetsplug.thumbnails.os.stat')
def test_add_tags(self, mock_stat, _):
def test_add_tags(self, mock_stat, _, mock_artresizer):
plugin = ThumbnailsPlugin()
plugin.write_metadata = Mock()
plugin.get_uri = Mock(side_effect={b"/path/to/cover":
"COVER_URI"}.__getitem__)
album = Mock(artpath=b"/path/to/cover")
@ -59,24 +47,25 @@ class ThumbnailsTest(unittest.TestCase, TestHelper):
metadata = {"Thumb::URI": "COVER_URI",
"Thumb::MTime": "12345"}
plugin.write_metadata.assert_called_once_with(b"/path/to/thumbnail",
metadata)
mock_artresizer.shared.write_metadata.assert_called_once_with(
b"/path/to/thumbnail",
metadata,
)
mock_stat.assert_called_once_with(album.artpath)
@patch('beetsplug.thumbnails.os')
@patch('beetsplug.thumbnails.ArtResizer')
@patch('beetsplug.thumbnails.get_im_version')
@patch('beetsplug.thumbnails.get_pil_version')
@patch('beetsplug.thumbnails.GioURI')
def test_check_local_ok(self, mock_giouri, mock_pil, mock_im,
mock_artresizer, mock_os):
def test_check_local_ok(self, mock_giouri, mock_artresizer, mock_os):
# test local resizing capability
mock_artresizer.shared.local = False
mock_artresizer.shared.can_write_metadata = False
plugin = ThumbnailsPlugin()
self.assertFalse(plugin._check_local_ok())
# test dirs creation
mock_artresizer.shared.local = True
mock_artresizer.shared.can_write_metadata = True
def exists(path):
if path == NORMAL_DIR:
@ -91,20 +80,14 @@ class ThumbnailsTest(unittest.TestCase, TestHelper):
# test metadata writer function
mock_os.path.exists = lambda _: True
mock_pil.return_value = False
mock_im.return_value = False
with self.assertRaises(AssertionError):
mock_artresizer.shared.local = True
mock_artresizer.shared.can_write_metadata = False
with self.assertRaises(RuntimeError):
ThumbnailsPlugin()
mock_pil.return_value = True
self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_pil)
mock_im.return_value = True
self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_im)
mock_pil.return_value = False
self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_im)
mock_artresizer.shared.local = True
mock_artresizer.shared.can_write_metadata = True
self.assertTrue(ThumbnailsPlugin()._check_local_ok())
# test URI getter function