diff --git a/beets/util/artresizer.py b/beets/util/artresizer.py index 0041db230..225280a94 100644 --- a/beets/util/artresizer.py +++ b/beets/util/artresizer.py @@ -16,6 +16,7 @@ public resizing proxy if neither is available. """ +from itertools import chain import subprocess import os import os.path @@ -27,11 +28,6 @@ from beets import logging from beets import util from beets.util import bytestring_path, displayable_path, py3_path, syspath -# Resizing methods -PIL = 1 -IMAGEMAGICK = 2 -WEBPROXY = 3 - PROXY_URL = 'https://images.weserv.nl/' log = logging.getLogger('beets') @@ -61,327 +57,413 @@ def temp_file_for(path): return bytestring_path(f.name) -def pil_resize(artresizer, maxwidth, path_in, path_out=None, quality=0, - max_filesize=0): - """Resize using Python Imaging Library (PIL). Return the output path - of resized image. - """ - path_out = path_out or temp_file_for(path_in) - from PIL import Image +class LocalBackendNotAvailableError(Exception): + pass - log.debug('artresizer: PIL resizing {0} to {1}', - displayable_path(path_in), displayable_path(path_out)) - try: - im = Image.open(syspath(path_in)) - size = maxwidth, maxwidth - im.thumbnail(size, Image.ANTIALIAS) +_NOT_AVAILABLE = object() - if quality == 0: - # Use PIL's default quality. - quality = -1 - # progressive=False only affects JPEGs and is the default, - # but we include it here for explicitness. - im.save(py3_path(path_out), quality=quality, progressive=False) +class LocalBackend: + @classmethod + def available(cls): + try: + cls.version() + return True + except LocalBackendNotAvailableError: + return False - if max_filesize > 0: - # If maximum filesize is set, we attempt to lower the quality of - # jpeg conversion by a proportional amount, up to 3 attempts - # First, set the maximum quality to either provided, or 95 - if quality > 0: - lower_qual = quality - else: - lower_qual = 95 - for i in range(5): - # 5 attempts is an abitrary choice - filesize = os.stat(syspath(path_out)).st_size - log.debug("PIL Pass {0} : Output size: {1}B", i, filesize) - if filesize <= max_filesize: - return path_out - # The relationship between filesize & quality will be - # image dependent. - lower_qual -= 10 - # Restrict quality dropping below 10 - if lower_qual < 10: - lower_qual = 10 - # Use optimize flag to improve filesize decrease - im.save(py3_path(path_out), quality=lower_qual, - optimize=True, progressive=False) - log.warning("PIL Failed to resize file to below {0}B", - max_filesize) - return path_out +class IMBackend(LocalBackend): + NAME = "ImageMagick" + + # These fields are used as a cache for `version()`. `_legacy` indicates + # whether the modern `magick` binary is available or whether to fall back + # to the old-style `convert`, `identify`, etc. commands. + _version = None + _legacy = None + + @classmethod + def version(cls): + """Obtain and cache ImageMagick version. + + Raises `LocalBackendNotAvailableError` if not available. + """ + if cls._version is None: + for cmd_name, legacy in (('magick', False), ('convert', True)): + try: + out = util.command_output([cmd_name, "--version"]).stdout + except (subprocess.CalledProcessError, OSError) as exc: + log.debug('ImageMagick version check failed: {}', exc) + cls._version = _NOT_AVAILABLE + else: + if b'imagemagick' in out.lower(): + pattern = br".+ (\d+)\.(\d+)\.(\d+).*" + match = re.search(pattern, out) + if match: + cls._version = (int(match.group(1)), + int(match.group(2)), + int(match.group(3))) + cls._legacy = legacy + + if cls._version is _NOT_AVAILABLE: + raise LocalBackendNotAvailableError() else: - return path_out - except OSError: - log.error("PIL cannot create thumbnail for '{0}'", - displayable_path(path_in)) - return path_in + return cls._version + def __init__(self): + """Initialize a wrapper around ImageMagick for local image operations. -def im_resize(artresizer, maxwidth, path_in, path_out=None, quality=0, - max_filesize=0): - """Resize using ImageMagick. + Stores the ImageMagick version and legacy flag. If ImageMagick is not + available, raise an Exception. + """ + self.version() - Use the ``magick`` program or ``convert`` on older versions. Return - the output path of resized image. - """ - path_out = path_out or temp_file_for(path_in) - log.debug('artresizer: ImageMagick resizing {0} to {1}', - displayable_path(path_in), displayable_path(path_out)) + # Use ImageMagick's magick binary when it's available. + # If it's not, fall back to the older, separate convert + # and identify commands. + if self._legacy: + self.convert_cmd = ['convert'] + self.identify_cmd = ['identify'] + self.compare_cmd = ['compare'] + else: + self.convert_cmd = ['magick'] + self.identify_cmd = ['magick', 'identify'] + self.compare_cmd = ['magick', 'compare'] - # "-resize WIDTHx>" shrinks images with the width larger - # than the given width while maintaining the aspect ratio - # with regards to the height. - # ImageMagick already seems to default to no interlace, but we include it - # here for the sake of explicitness. - cmd = artresizer.im_convert_cmd + [ - syspath(path_in, prefix=False), - '-resize', f'{maxwidth}x>', - '-interlace', 'none', - ] + def resize(self, maxwidth, path_in, path_out=None, quality=0, + max_filesize=0): + """Resize using ImageMagick. - if quality > 0: - cmd += ['-quality', f'{quality}'] + Use the ``magick`` program or ``convert`` on older versions. Return + the output path of resized image. + """ + path_out = path_out or temp_file_for(path_in) + log.debug('artresizer: ImageMagick resizing {0} to {1}', + displayable_path(path_in), displayable_path(path_out)) - # "-define jpeg:extent=SIZEb" sets the target filesize for imagemagick to - # SIZE in bytes. - if max_filesize > 0: - cmd += ['-define', f'jpeg:extent={max_filesize}b'] + # "-resize WIDTHx>" shrinks images with the width larger + # than the given width while maintaining the aspect ratio + # with regards to the height. + # ImageMagick already seems to default to no interlace, but we include + # it here for the sake of explicitness. + cmd = self.convert_cmd + [ + syspath(path_in, prefix=False), + '-resize', f'{maxwidth}x>', + '-interlace', 'none', + ] - cmd.append(syspath(path_out, prefix=False)) + if quality > 0: + cmd += ['-quality', f'{quality}'] - try: - util.command_output(cmd) - except subprocess.CalledProcessError: - log.warning('artresizer: IM convert failed for {0}', - displayable_path(path_in)) - return path_in + # "-define jpeg:extent=SIZEb" sets the target filesize for imagemagick + # to SIZE in bytes. + if max_filesize > 0: + cmd += ['-define', f'jpeg:extent={max_filesize}b'] - return path_out + cmd.append(syspath(path_out, prefix=False)) + try: + util.command_output(cmd) + except subprocess.CalledProcessError: + log.warning('artresizer: IM convert failed for {0}', + displayable_path(path_in)) + return path_in -BACKEND_FUNCS = { - PIL: pil_resize, - IMAGEMAGICK: im_resize, -} - - -def pil_getsize(artresizer, path_in): - from PIL import Image - - try: - im = Image.open(syspath(path_in)) - return im.size - except OSError as exc: - log.error("PIL could not read file {}: {}", - displayable_path(path_in), exc) - return None - - -def im_getsize(artresizer, path_in): - cmd = artresizer.im_identify_cmd + \ - ['-format', '%w %h', syspath(path_in, prefix=False)] - - try: - out = util.command_output(cmd).stdout - except subprocess.CalledProcessError as exc: - log.warning('ImageMagick size query failed') - log.debug( - '`convert` exited with (status {}) when ' - 'getting size with command {}:\n{}', - exc.returncode, cmd, exc.output.strip() - ) - return None - try: - return tuple(map(int, out.split(b' '))) - except IndexError: - log.warning('Could not understand IM output: {0!r}', out) - return None - - -BACKEND_GET_SIZE = { - PIL: pil_getsize, - IMAGEMAGICK: im_getsize, -} - - -def pil_deinterlace(artresizer, path_in, path_out=None): - path_out = path_out or temp_file_for(path_in) - from PIL import Image - - try: - im = Image.open(syspath(path_in)) - im.save(py3_path(path_out), progressive=False) return path_out - except IOError: - return path_in + def get_size(self, path_in): + cmd = self.identify_cmd + [ + '-format', '%w %h', syspath(path_in, prefix=False) + ] -def im_deinterlace(artresizer, path_in, path_out=None): - path_out = path_out or temp_file_for(path_in) - - cmd = artresizer.im_convert_cmd + [ - syspath(path_in, prefix=False), - '-interlace', 'none', - syspath(path_out, prefix=False), - ] - - try: - util.command_output(cmd) - return path_out - except subprocess.CalledProcessError: - return path_in - - -DEINTERLACE_FUNCS = { - PIL: pil_deinterlace, - IMAGEMAGICK: im_deinterlace, -} - - -def im_get_format(artresizer, filepath): - cmd = artresizer.im_identify_cmd + [ - '-format', '%[magick]', - syspath(filepath) - ] - - try: - return util.command_output(cmd).stdout - except subprocess.CalledProcessError: - return None - - -def pil_get_format(artresizer, filepath): - from PIL import Image, UnidentifiedImageError - - try: - with Image.open(syspath(filepath)) as im: - return im.format - except (ValueError, TypeError, UnidentifiedImageError, FileNotFoundError): - log.exception("failed to detect image format for {}", filepath) - return None - - -BACKEND_GET_FORMAT = { - PIL: pil_get_format, - IMAGEMAGICK: im_get_format, -} - - -def im_convert_format(artresizer, source, target, deinterlaced): - cmd = artresizer.im_convert_cmd + [ - syspath(source), - *(["-interlace", "none"] if deinterlaced else []), - syspath(target), - ] - - try: - subprocess.check_call( - cmd, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL - ) - return target - except subprocess.CalledProcessError: - return source - - -def pil_convert_format(artresizer, source, target, deinterlaced): - from PIL import Image, UnidentifiedImageError - - try: - with Image.open(syspath(source)) as im: - im.save(py3_path(target), progressive=not deinterlaced) - return target - except (ValueError, TypeError, UnidentifiedImageError, FileNotFoundError, - OSError): - log.exception("failed to convert image {} -> {}", source, target) - return source - - -BACKEND_CONVERT_IMAGE_FORMAT = { - PIL: pil_convert_format, - IMAGEMAGICK: im_convert_format, -} - - -def im_compare(artresizer, im1, im2, compare_threshold): - is_windows = platform.system() == "Windows" - - # Converting images to grayscale tends to minimize the weight - # of colors in the diff score. So we first convert both images - # to grayscale and then pipe them into the `compare` command. - # On Windows, ImageMagick doesn't support the magic \\?\ prefix - # on paths, so we pass `prefix=False` to `syspath`. - convert_cmd = artresizer.im_convert_cmd + [ - syspath(im2, prefix=False), syspath(im1, prefix=False), - '-colorspace', 'gray', 'MIFF:-' - ] - compare_cmd = artresizer.im_compare_cmd + [ - '-metric', 'PHASH', '-', 'null:', - ] - log.debug('comparing images with pipeline {} | {}', - convert_cmd, compare_cmd) - convert_proc = subprocess.Popen( - convert_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=not is_windows, - ) - compare_proc = subprocess.Popen( - compare_cmd, - stdin=convert_proc.stdout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=not is_windows, - ) - - # Check the convert output. We're not interested in the - # standard output; that gets piped to the next stage. - convert_proc.stdout.close() - convert_stderr = convert_proc.stderr.read() - convert_proc.stderr.close() - convert_proc.wait() - if convert_proc.returncode: - log.debug( - 'ImageMagick convert failed with status {}: {!r}', - convert_proc.returncode, - convert_stderr, - ) - return None - - # Check the compare output. - stdout, stderr = compare_proc.communicate() - if compare_proc.returncode: - if compare_proc.returncode != 1: - log.debug('ImageMagick compare failed: {0}, {1}', - displayable_path(im2), displayable_path(im1)) + try: + out = util.command_output(cmd).stdout + except subprocess.CalledProcessError as exc: + log.warning('ImageMagick size query failed') + log.debug( + '`convert` exited with (status {}) when ' + 'getting size with command {}:\n{}', + exc.returncode, cmd, exc.output.strip() + ) + return None + try: + return tuple(map(int, out.split(b' '))) + except IndexError: + log.warning('Could not understand IM output: {0!r}', out) return None - out_str = stderr - else: - out_str = stdout - try: - phash_diff = float(out_str) - except ValueError: - log.debug('IM output is not a number: {0!r}', out_str) - return None + def deinterlace(self, path_in, path_out=None): + path_out = path_out or temp_file_for(path_in) - log.debug('ImageMagick compare score: {0}', phash_diff) - return phash_diff <= compare_threshold + cmd = self.convert_cmd + [ + syspath(path_in, prefix=False), + '-interlace', 'none', + syspath(path_out, prefix=False), + ] + + try: + util.command_output(cmd) + return path_out + except subprocess.CalledProcessError: + # FIXME: Should probably issue a warning? + return path_in + + def get_format(self, filepath): + cmd = self.identify_cmd + [ + '-format', '%[magick]', + syspath(filepath) + ] + + try: + return util.command_output(cmd).stdout + except subprocess.CalledProcessError: + # FIXME: Should probably issue a warning? + return None + + def convert_format(self, source, target, deinterlaced): + cmd = self.convert_cmd + [ + syspath(source), + *(["-interlace", "none"] if deinterlaced else []), + syspath(target), + ] + + try: + subprocess.check_call( + cmd, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL + ) + return target + except subprocess.CalledProcessError: + # FIXME: Should probably issue a warning? + return source + + @property + def can_compare(self): + return self.version() > (6, 8, 7) + + def compare(self, im1, im2, compare_threshold): + is_windows = platform.system() == "Windows" + + # Converting images to grayscale tends to minimize the weight + # of colors in the diff score. So we first convert both images + # to grayscale and then pipe them into the `compare` command. + # On Windows, ImageMagick doesn't support the magic \\?\ prefix + # on paths, so we pass `prefix=False` to `syspath`. + convert_cmd = self.convert_cmd + [ + syspath(im2, prefix=False), syspath(im1, prefix=False), + '-colorspace', 'gray', 'MIFF:-' + ] + compare_cmd = self.compare_cmd + [ + '-metric', 'PHASH', '-', 'null:', + ] + log.debug('comparing images with pipeline {} | {}', + convert_cmd, compare_cmd) + convert_proc = subprocess.Popen( + convert_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=not is_windows, + ) + compare_proc = subprocess.Popen( + compare_cmd, + stdin=convert_proc.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=not is_windows, + ) + + # Check the convert output. We're not interested in the + # standard output; that gets piped to the next stage. + convert_proc.stdout.close() + convert_stderr = convert_proc.stderr.read() + convert_proc.stderr.close() + convert_proc.wait() + if convert_proc.returncode: + log.debug( + 'ImageMagick convert failed with status {}: {!r}', + convert_proc.returncode, + convert_stderr, + ) + return None + + # Check the compare output. + stdout, stderr = compare_proc.communicate() + if compare_proc.returncode: + if compare_proc.returncode != 1: + log.debug('ImageMagick compare failed: {0}, {1}', + displayable_path(im2), displayable_path(im1)) + return None + out_str = stderr + else: + out_str = stdout + + try: + phash_diff = float(out_str) + except ValueError: + log.debug('IM output is not a number: {0!r}', out_str) + return None + + log.debug('ImageMagick compare score: {0}', phash_diff) + return phash_diff <= compare_threshold + + @property + def can_write_metadata(self): + return True + + def write_metadata(self, file, metadata): + assignments = list(chain.from_iterable( + ('-set', k, v) for k, v in metadata.items() + )) + command = self.convert_cmd + [file, *assignments, file] + + util.command_output(command) -def pil_compare(artresizer, im1, im2, compare_threshold): - # It is an error to call this when ArtResizer.can_compare is not True. - raise NotImplementedError() +class PILBackend(LocalBackend): + NAME = "PIL" + @classmethod + def version(cls): + try: + __import__('PIL', fromlist=['Image']) + except ImportError: + raise LocalBackendNotAvailableError() -BACKEND_COMPARE = { - PIL: pil_compare, - IMAGEMAGICK: im_compare, -} + def __init__(self): + """Initialize a wrapper around PIL for local image operations. + + If PIL is not available, raise an Exception. + """ + self.version() + + def resize(self, maxwidth, path_in, path_out=None, quality=0, + max_filesize=0): + """Resize using Python Imaging Library (PIL). Return the output path + of resized image. + """ + path_out = path_out or temp_file_for(path_in) + from PIL import Image + + log.debug('artresizer: PIL resizing {0} to {1}', + displayable_path(path_in), displayable_path(path_out)) + + try: + im = Image.open(syspath(path_in)) + size = maxwidth, maxwidth + im.thumbnail(size, Image.ANTIALIAS) + + if quality == 0: + # Use PIL's default quality. + quality = -1 + + # progressive=False only affects JPEGs and is the default, + # but we include it here for explicitness. + im.save(py3_path(path_out), quality=quality, progressive=False) + + if max_filesize > 0: + # If maximum filesize is set, we attempt to lower the quality + # of jpeg conversion by a proportional amount, up to 3 attempts + # First, set the maximum quality to either provided, or 95 + if quality > 0: + lower_qual = quality + else: + lower_qual = 95 + for i in range(5): + # 5 attempts is an abitrary choice + filesize = os.stat(syspath(path_out)).st_size + log.debug("PIL Pass {0} : Output size: {1}B", i, filesize) + if filesize <= max_filesize: + return path_out + # The relationship between filesize & quality will be + # image dependent. + lower_qual -= 10 + # Restrict quality dropping below 10 + if lower_qual < 10: + lower_qual = 10 + # Use optimize flag to improve filesize decrease + im.save(py3_path(path_out), quality=lower_qual, + optimize=True, progressive=False) + log.warning("PIL Failed to resize file to below {0}B", + max_filesize) + return path_out + + else: + return path_out + except OSError: + log.error("PIL cannot create thumbnail for '{0}'", + displayable_path(path_in)) + return path_in + + def get_size(self, path_in): + from PIL import Image + + try: + im = Image.open(syspath(path_in)) + return im.size + except OSError as exc: + log.error("PIL could not read file {}: {}", + displayable_path(path_in), exc) + return None + + def deinterlace(self, path_in, path_out=None): + path_out = path_out or temp_file_for(path_in) + from PIL import Image + + try: + im = Image.open(syspath(path_in)) + im.save(py3_path(path_out), progressive=False) + return path_out + except IOError: + # FIXME: Should probably issue a warning? + return path_in + + def get_format(self, filepath): + from PIL import Image, UnidentifiedImageError + + try: + with Image.open(syspath(filepath)) as im: + return im.format + except (ValueError, TypeError, UnidentifiedImageError, + FileNotFoundError): + log.exception("failed to detect image format for {}", filepath) + return None + + def convert_format(self, source, target, deinterlaced): + from PIL import Image, UnidentifiedImageError + + try: + with Image.open(syspath(source)) as im: + im.save(py3_path(target), progressive=not deinterlaced) + return target + except (ValueError, TypeError, UnidentifiedImageError, + FileNotFoundError, OSError): + log.exception("failed to convert image {} -> {}", source, target) + return source + + @property + def can_compare(self): + return False + + def compare(self, im1, im2, compare_threshold): + # It is an error to call this when ArtResizer.can_compare is not True. + raise NotImplementedError() + + @property + def can_write_metadata(self): + return True + + def write_metadata(self, file, metadata): + from PIL import Image, PngImagePlugin + + # FIXME: Detect and handle other file types (currently, the only user + # is the thumbnails plugin, which generates PNG images). + im = Image.open(file) + meta = PngImagePlugin.PngInfo() + for k, v in metadata.items(): + meta.add_text(k, v, 0) + im.save(file, "PNG", pnginfo=meta) class Shareable(type): @@ -402,6 +484,12 @@ class Shareable(type): return cls._instance +BACKEND_CLASSES = [ + IMBackend, + PILBackend, +] + + class ArtResizer(metaclass=Shareable): """A singleton class that performs image resizes. """ @@ -409,22 +497,25 @@ class ArtResizer(metaclass=Shareable): def __init__(self): """Create a resizer object with an inferred method. """ - self.method = self._check_method() - log.debug("artresizer: method is {0}", self.method) + # Check if a local backend is availabe, and store an instance of the + # backend class. Otherwise, fallback to the web proxy. + for backend_cls in BACKEND_CLASSES: + try: + self.local_method = backend_cls() + log.debug(f"artresizer: method is {self.local_method.NAME}") + break + except LocalBackendNotAvailableError: + continue + else: + log.debug("artresizer: method is WEBPROXY") + self.local_method = None - # Use ImageMagick's magick binary when it's available. If it's - # not, fall back to the older, separate convert and identify - # commands. - if self.method[0] == IMAGEMAGICK: - self.im_legacy = self.method[2] - if self.im_legacy: - self.im_convert_cmd = ['convert'] - self.im_identify_cmd = ['identify'] - self.im_compare_cmd = ['compare'] - else: - self.im_convert_cmd = ['magick'] - self.im_identify_cmd = ['magick', 'identify'] - self.im_compare_cmd = ['magick', 'compare'] + @property + def method(self): + if self.local: + return self.local_method.NAME + else: + return "WEBPROXY" def resize( self, maxwidth, path_in, path_out=None, quality=0, max_filesize=0 @@ -435,17 +526,23 @@ class ArtResizer(metaclass=Shareable): For WEBPROXY, returns `path_in` unmodified. """ if self.local: - func = BACKEND_FUNCS[self.method[0]] - return func(self, maxwidth, path_in, path_out, - quality=quality, max_filesize=max_filesize) + return self.local_method.resize( + maxwidth, path_in, path_out, + quality=quality, max_filesize=max_filesize + ) else: + # Handled by `proxy_url` already. return path_in def deinterlace(self, path_in, path_out=None): + """Deinterlace an image. + + Only available locally. + """ if self.local: - func = DEINTERLACE_FUNCS[self.method[0]] - return func(self, path_in, path_out) + return self.local_method.deinterlace(path_in, path_out) else: + # FIXME: Should probably issue a warning? return path_in def proxy_url(self, maxwidth, url, quality=0): @@ -454,6 +551,7 @@ class ArtResizer(metaclass=Shareable): Otherwise, the URL is returned unmodified. """ if self.local: + # Going to be handled by `resize()`. return url else: return resize_url(url, maxwidth, quality) @@ -463,7 +561,7 @@ class ArtResizer(metaclass=Shareable): """A boolean indicating whether the resizing method is performed locally (i.e., PIL or ImageMagick). """ - return self.method[0] in BACKEND_FUNCS + return self.local_method is not None def get_size(self, path_in): """Return the size of an image file as an int couple (width, height) @@ -472,9 +570,10 @@ class ArtResizer(metaclass=Shareable): Only available locally. """ if self.local: - func = BACKEND_GET_SIZE[self.method[0]] - return func(self, path_in) - return None + return self.local_method.get_size(path_in) + else: + # FIXME: Should probably issue a warning? + return path_in def get_format(self, path_in): """Returns the format of the image as a string. @@ -482,9 +581,10 @@ class ArtResizer(metaclass=Shareable): Only available locally. """ if self.local: - func = BACKEND_GET_FORMAT[self.method[0]] - return func(self, path_in) - return None + return self.local_method.get_format(path_in) + else: + # FIXME: Should probably issue a warning? + return None def reformat(self, path_in, new_format, deinterlaced=True): """Converts image to desired format, updating its extension, but @@ -493,6 +593,7 @@ class ArtResizer(metaclass=Shareable): Only available locally. """ if not self.local: + # FIXME: Should probably issue a warning? return path_in new_format = new_format.lower() @@ -503,13 +604,14 @@ class ArtResizer(metaclass=Shareable): fname, ext = os.path.splitext(path_in) path_new = fname + b'.' + new_format.encode('utf8') - func = BACKEND_CONVERT_IMAGE_FORMAT[self.method[0]] # allows the exception to propagate, while still making sure a changed # file path was removed result_path = path_in try: - result_path = func(self, path_in, path_new, deinterlaced) + result_path = self.local_method.convert_format( + path_in, path_new, deinterlaced + ) finally: if result_path != path_in: os.unlink(path_in) @@ -519,7 +621,10 @@ class ArtResizer(metaclass=Shareable): def can_compare(self): """A boolean indicating whether image comparison is available""" - return self.method[0] == IMAGEMAGICK and self.method[1] > (6, 8, 7) + if self.local: + return self.local_method.can_compare + else: + return False def compare(self, im1, im2, compare_threshold): """Return a boolean indicating whether two images are similar. @@ -527,63 +632,27 @@ class ArtResizer(metaclass=Shareable): Only available locally. """ if self.local: - func = BACKEND_COMPARE[self.method[0]] - return func(self, im1, im2, compare_threshold) - return None - - @staticmethod - def _check_method(): - """Return a tuple indicating an available method and its version. - - The result has at least two elements: - - The method, eitehr WEBPROXY, PIL, or IMAGEMAGICK. - - The version. - - If the method is IMAGEMAGICK, there is also a third element: a - bool flag indicating whether to use the `magick` binary or - legacy single-purpose executables (`convert`, `identify`, etc.) - """ - version = get_im_version() - if version: - version, legacy = version - return IMAGEMAGICK, version, legacy - - version = get_pil_version() - if version: - return PIL, version - - return WEBPROXY, (0) - - -def get_im_version(): - """Get the ImageMagick version and legacy flag as a pair. Or return - None if ImageMagick is not available. - """ - for cmd_name, legacy in ((['magick'], False), (['convert'], True)): - cmd = cmd_name + ['--version'] - - try: - out = util.command_output(cmd).stdout - except (subprocess.CalledProcessError, OSError) as exc: - log.debug('ImageMagick version check failed: {}', exc) + return self.local_method.compare(im1, im2, compare_threshold) else: - if b'imagemagick' in out.lower(): - pattern = br".+ (\d+)\.(\d+)\.(\d+).*" - match = re.search(pattern, out) - if match: - version = (int(match.group(1)), - int(match.group(2)), - int(match.group(3))) - return version, legacy + # FIXME: Should probably issue a warning? + return None - return None + @property + def can_write_metadata(self): + """A boolean indicating whether writing image metadata is supported.""" + if self.local: + return self.local_method.can_write_metadata + else: + return False -def get_pil_version(): - """Get the PIL/Pillow version, or None if it is unavailable. - """ - try: - __import__('PIL', fromlist=['Image']) - return (0,) - except ImportError: - return None + def write_metadata(self, file, metadata): + """Write key-value metadata to the image file. + + Only available locally. Currently, expects the image to be a PNG file. + """ + if self.local: + self.local_method.write_metadata(file, metadata) + else: + # FIXME: Should probably issue a warning? + pass diff --git a/beetsplug/thumbnails.py b/beetsplug/thumbnails.py index 6bd9cbac6..b81957593 100644 --- a/beetsplug/thumbnails.py +++ b/beetsplug/thumbnails.py @@ -22,7 +22,6 @@ Spec: standards.freedesktop.org/thumbnail-spec/latest/index.html from hashlib import md5 import os import shutil -from itertools import chain from pathlib import PurePosixPath import ctypes import ctypes.util @@ -32,7 +31,7 @@ from xdg import BaseDirectory from beets.plugins import BeetsPlugin from beets.ui import Subcommand, decargs from beets import util -from beets.util.artresizer import ArtResizer, get_im_version, get_pil_version +from beets.util.artresizer import ArtResizer BASE_DIR = os.path.join(BaseDirectory.xdg_cache_home, "thumbnails") @@ -49,7 +48,6 @@ class ThumbnailsPlugin(BeetsPlugin): 'dolphin': False, }) - self.write_metadata = None if self.config['auto'] and self._check_local_ok(): self.register_listener('art_set', self.process_album) @@ -90,14 +88,12 @@ class ThumbnailsPlugin(BeetsPlugin): if not os.path.exists(dir): os.makedirs(dir) - if get_im_version(): - self.write_metadata = write_metadata_im - tool = "IM" - else: - assert get_pil_version() # since we're local - self.write_metadata = write_metadata_pil - tool = "PIL" - self._log.debug("using {0} to write metadata", tool) + if not ArtResizer.shared.can_write_metadata: + raise RuntimeError( + f"Thumbnails: ArtResizer backend {ArtResizer.shared.method}" + f" unexpectedly cannot write image metadata." + ) + self._log.debug(f"using {ArtResizer.shared.method} to write metadata") uri_getter = GioURI() if not uri_getter.available: @@ -171,7 +167,7 @@ class ThumbnailsPlugin(BeetsPlugin): metadata = {"Thumb::URI": self.get_uri(album.artpath), "Thumb::MTime": str(mtime)} try: - self.write_metadata(image_path, metadata) + ArtResizer.shared.write_metadata(image_path, metadata) except Exception: self._log.exception("could not write metadata to {0}", util.displayable_path(image_path)) @@ -188,26 +184,6 @@ class ThumbnailsPlugin(BeetsPlugin): self._log.debug("Wrote file {0}", util.displayable_path(outfilename)) -def write_metadata_im(file, metadata): - """Enrich the file metadata with `metadata` dict thanks to IM.""" - command = ['convert', file] + \ - list(chain.from_iterable(('-set', k, v) - for k, v in metadata.items())) + [file] - util.command_output(command) - return True - - -def write_metadata_pil(file, metadata): - """Enrich the file metadata with `metadata` dict thanks to PIL.""" - from PIL import Image, PngImagePlugin - im = Image.open(file) - meta = PngImagePlugin.PngInfo() - for k, v in metadata.items(): - meta.add_text(k, v, 0) - im.save(file, "PNG", pnginfo=meta) - return True - - class URIGetter: available = False name = "Abstract base" diff --git a/test/test_art.py b/test/test_art.py index 498c4cedc..b32285e70 100644 --- a/test/test_art.py +++ b/test/test_art.py @@ -31,7 +31,7 @@ from beets import library from beets import importer from beets import logging from beets import util -from beets.util.artresizer import ArtResizer, WEBPROXY +from beets.util.artresizer import ArtResizer import confuse @@ -787,7 +787,7 @@ class ArtForAlbumTest(UseThePlugin): """Skip the test if the art resizer doesn't have ImageMagick or PIL (so comparisons and measurements are unavailable). """ - if ArtResizer.shared.method[0] == WEBPROXY: + if not ArtResizer.shared.local: self.skipTest("ArtResizer has no local imaging backend available") def test_respect_minwidth(self): diff --git a/test/test_art_resize.py b/test/test_art_resize.py index 4600bab77..9660d96a2 100644 --- a/test/test_art_resize.py +++ b/test/test_art_resize.py @@ -16,20 +16,36 @@ import unittest +from unittest.mock import patch import os from test import _common from test.helper import TestHelper from beets.util import command_output, syspath -from beets.util.artresizer import ( - pil_resize, - im_resize, - get_im_version, - get_pil_version, - pil_deinterlace, - im_deinterlace, - ArtResizer, -) +from beets.util.artresizer import IMBackend, PILBackend + + +class DummyIMBackend(IMBackend): + """An `IMBackend` which pretends that ImageMagick is available. + + The version is sufficiently recent to support image comparison. + """ + + def __init__(self): + """Init a dummy backend class for mocked ImageMagick tests.""" + self.version = (7, 0, 0) + self.legacy = False + self.convert_cmd = ['magick'] + self.identify_cmd = ['magick', 'identify'] + self.compare_cmd = ['magick', 'compare'] + + +class DummyPILBackend(PILBackend): + """An `PILBackend` which pretends that PIL is available.""" + + def __init__(self): + """Init a dummy backend class for mocked PIL tests.""" + pass class ArtResizerFileSizeTest(_common.TestCase, TestHelper): @@ -46,11 +62,10 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper): """Called after each test, unloading all plugins.""" self.teardown_beets() - def _test_img_resize(self, resize_func): + def _test_img_resize(self, backend): """Test resizing based on file size, given a resize_func.""" # Check quality setting unaffected by new parameter - im_95_qual = resize_func( - ArtResizer.shared, + im_95_qual = backend.resize( 225, self.IMG_225x225, quality=95, @@ -60,8 +75,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper): self.assertExists(im_95_qual) # Attempt a lower filesize with same quality - im_a = resize_func( - ArtResizer.shared, + im_a = backend.resize( 225, self.IMG_225x225, quality=95, @@ -73,8 +87,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper): os.stat(syspath(im_95_qual)).st_size) # Attempt with lower initial quality - im_75_qual = resize_func( - ArtResizer.shared, + im_75_qual = backend.resize( 225, self.IMG_225x225, quality=75, @@ -82,8 +95,7 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper): ) self.assertExists(im_75_qual) - im_b = resize_func( - ArtResizer.shared, + im_b = backend.resize( 225, self.IMG_225x225, quality=95, @@ -94,42 +106,56 @@ class ArtResizerFileSizeTest(_common.TestCase, TestHelper): self.assertLess(os.stat(syspath(im_b)).st_size, os.stat(syspath(im_75_qual)).st_size) - @unittest.skipUnless(get_pil_version(), "PIL not available") + @unittest.skipUnless(PILBackend.available(), "PIL not available") def test_pil_file_resize(self): """Test PIL resize function is lowering file size.""" - self._test_img_resize(pil_resize) + self._test_img_resize(PILBackend()) - @unittest.skipUnless(get_im_version(), "ImageMagick not available") + @unittest.skipUnless(IMBackend.available(), "ImageMagick not available") def test_im_file_resize(self): """Test IM resize function is lowering file size.""" - self._test_img_resize(im_resize) + self._test_img_resize(IMBackend()) - @unittest.skipUnless(get_pil_version(), "PIL not available") + @unittest.skipUnless(PILBackend.available(), "PIL not available") def test_pil_file_deinterlace(self): """Test PIL deinterlace function. - Check if pil_deinterlace function returns images + Check if the `PILBackend.deinterlace()` function returns images that are non-progressive """ - path = pil_deinterlace(ArtResizer.shared, self.IMG_225x225) + path = PILBackend().deinterlace(self.IMG_225x225) from PIL import Image with Image.open(path) as img: self.assertFalse('progression' in img.info) - @unittest.skipUnless(get_im_version(), "ImageMagick not available") + @unittest.skipUnless(IMBackend.available(), "ImageMagick not available") def test_im_file_deinterlace(self): """Test ImageMagick deinterlace function. - Check if im_deinterlace function returns images + Check if the `IMBackend.deinterlace()` function returns images that are non-progressive. """ - path = im_deinterlace(ArtResizer.shared, self.IMG_225x225) - cmd = ArtResizer.shared.im_identify_cmd + [ + im = IMBackend() + path = im.deinterlace(self.IMG_225x225) + cmd = im.identify_cmd + [ '-format', '%[interlace]', syspath(path, prefix=False), ] out = command_output(cmd).stdout self.assertTrue(out == b'None') + @patch('beets.util.artresizer.util') + def test_write_metadata_im(self, mock_util): + """Test writing image metadata.""" + metadata = {"a": "A", "b": "B"} + im = DummyIMBackend() + im.write_metadata("foo", metadata) + try: + command = im.convert_cmd + "foo -set a A -set b B foo".split() + mock_util.command_output.assert_called_once_with(command) + except AssertionError: + command = im.convert_cmd + "foo -set b B -set a A foo".split() + mock_util.command_output.assert_called_once_with(command) + def suite(): """Run this suite of tests.""" diff --git a/test/test_embedart.py b/test/test_embedart.py index 0fed08f98..f41180ec1 100644 --- a/test/test_embedart.py +++ b/test/test_embedart.py @@ -21,10 +21,11 @@ import unittest from test import _common from test.helper import TestHelper +from test.test_art_resize import DummyIMBackend from mediafile import MediaFile from beets import config, logging, ui -from beets.util import artresizer, syspath, displayable_path +from beets.util import syspath, displayable_path from beets.util.artresizer import ArtResizer from beets import art @@ -220,9 +221,8 @@ class DummyArtResizer(ArtResizer): """An `ArtResizer` which pretends that ImageMagick is available, and has a sufficiently recent version to support image comparison. """ - @staticmethod - def _check_method(): - return artresizer.IMAGEMAGICK, (7, 0, 0), True + def __init__(self): + self.local_method = DummyIMBackend() @patch('beets.util.artresizer.subprocess') diff --git a/test/test_thumbnails.py b/test/test_thumbnails.py index e8ab21d72..891411535 100644 --- a/test/test_thumbnails.py +++ b/test/test_thumbnails.py @@ -23,7 +23,6 @@ from test.helper import TestHelper from beets.util import bytestring_path from beetsplug.thumbnails import (ThumbnailsPlugin, NORMAL_DIR, LARGE_DIR, - write_metadata_im, write_metadata_pil, PathlibURI, GioURI) @@ -34,22 +33,11 @@ class ThumbnailsTest(unittest.TestCase, TestHelper): def tearDown(self): self.teardown_beets() - @patch('beetsplug.thumbnails.util') - def test_write_metadata_im(self, mock_util): - metadata = {"a": "A", "b": "B"} - write_metadata_im("foo", metadata) - try: - command = "convert foo -set a A -set b B foo".split(' ') - mock_util.command_output.assert_called_once_with(command) - except AssertionError: - command = "convert foo -set b B -set a A foo".split(' ') - mock_util.command_output.assert_called_once_with(command) - + @patch('beetsplug.thumbnails.ArtResizer') @patch('beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok') @patch('beetsplug.thumbnails.os.stat') - def test_add_tags(self, mock_stat, _): + def test_add_tags(self, mock_stat, _, mock_artresizer): plugin = ThumbnailsPlugin() - plugin.write_metadata = Mock() plugin.get_uri = Mock(side_effect={b"/path/to/cover": "COVER_URI"}.__getitem__) album = Mock(artpath=b"/path/to/cover") @@ -59,24 +47,25 @@ class ThumbnailsTest(unittest.TestCase, TestHelper): metadata = {"Thumb::URI": "COVER_URI", "Thumb::MTime": "12345"} - plugin.write_metadata.assert_called_once_with(b"/path/to/thumbnail", - metadata) + mock_artresizer.shared.write_metadata.assert_called_once_with( + b"/path/to/thumbnail", + metadata, + ) mock_stat.assert_called_once_with(album.artpath) @patch('beetsplug.thumbnails.os') @patch('beetsplug.thumbnails.ArtResizer') - @patch('beetsplug.thumbnails.get_im_version') - @patch('beetsplug.thumbnails.get_pil_version') @patch('beetsplug.thumbnails.GioURI') - def test_check_local_ok(self, mock_giouri, mock_pil, mock_im, - mock_artresizer, mock_os): + def test_check_local_ok(self, mock_giouri, mock_artresizer, mock_os): # test local resizing capability mock_artresizer.shared.local = False + mock_artresizer.shared.can_write_metadata = False plugin = ThumbnailsPlugin() self.assertFalse(plugin._check_local_ok()) # test dirs creation mock_artresizer.shared.local = True + mock_artresizer.shared.can_write_metadata = True def exists(path): if path == NORMAL_DIR: @@ -91,20 +80,14 @@ class ThumbnailsTest(unittest.TestCase, TestHelper): # test metadata writer function mock_os.path.exists = lambda _: True - mock_pil.return_value = False - mock_im.return_value = False - with self.assertRaises(AssertionError): + + mock_artresizer.shared.local = True + mock_artresizer.shared.can_write_metadata = False + with self.assertRaises(RuntimeError): ThumbnailsPlugin() - mock_pil.return_value = True - self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_pil) - - mock_im.return_value = True - self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_im) - - mock_pil.return_value = False - self.assertEqual(ThumbnailsPlugin().write_metadata, write_metadata_im) - + mock_artresizer.shared.local = True + mock_artresizer.shared.can_write_metadata = True self.assertTrue(ThumbnailsPlugin()._check_local_ok()) # test URI getter function