From 0af2cdaa3a78493fa5483a8786b429e78cfe21fd Mon Sep 17 00:00:00 2001 From: Adrian Sampson Date: Sun, 5 Apr 2015 12:15:38 -0700 Subject: [PATCH] New `beets.art` image utilities This makes it cleaner to share the high-level image-embedding logic between the `embedart` and `convert` plugins. This resolves a regression, introduced in f504c786, that inadvertently activated the former plugin when the latter was enabled. I also like avoiding cross-plugin imports. --- beets/art.py | 197 ++++++++++++++++++++++++++++++++++++++++++ beetsplug/convert.py | 6 +- beetsplug/embedart.py | 191 +++------------------------------------- test/test_embedart.py | 22 ++--- 4 files changed, 224 insertions(+), 192 deletions(-) create mode 100644 beets/art.py diff --git a/beets/art.py b/beets/art.py new file mode 100644 index 000000000..c0c4a2b96 --- /dev/null +++ b/beets/art.py @@ -0,0 +1,197 @@ +# This file is part of beets. +# Copyright 2015, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +"""High-level utilities for manipulating image files associated with +music and items' embedded album art. +""" + +import subprocess +import platform +from tempfile import NamedTemporaryFile +import imghdr +import os + +from beets.util import displayable_path, syspath +from beets.util.artresizer import ArtResizer +from beets import mediafile +from beets import config + + +def mediafile_image(image_path, maxwidth=None): + """Return a `mediafile.Image` object for the path. + """ + + with open(syspath(image_path), 'rb') as f: + data = f.read() + return mediafile.Image(data, type=mediafile.ImageType.front) + + +def get_art(log, item): + # Extract the art. + try: + mf = mediafile.MediaFile(syspath(item.path)) + except mediafile.UnreadableFileError as exc: + log.warning(u'Could not extract art from {0}: {1}', + displayable_path(item.path), exc) + return + + return mf.art + + +def embed_item(log, item, imagepath, maxwidth=None, itempath=None, + compare_threshold=0, ifempty=False, as_album=False): + """Embed an image into the item's media file. + """ + if compare_threshold: + if not check_art_similarity(log, item, imagepath, compare_threshold): + log.info(u'Image not similar; skipping.') + return + if ifempty and get_art(log, item): + log.info(u'media file already contained art') + return + if maxwidth and not as_album: + imagepath = resize_image(log, imagepath, maxwidth) + + try: + log.debug(u'embedding {0}', displayable_path(imagepath)) + image = mediafile_image(imagepath, maxwidth) + except IOError as exc: + log.warning(u'could not read image file: {0}', exc) + return + item.try_write(path=itempath, tags={'images': [image]}) + + +def embed_album(log, album, maxwidth=None, quiet=False, + compare_threshold=0, ifempty=False): + """Embed album art into all of the album's items. + """ + imagepath = album.artpath + if not imagepath: + log.info(u'No album art present for {0}', album) + return + if not os.path.isfile(syspath(imagepath)): + log.info(u'Album art not found at {0} for {1}', + displayable_path(imagepath), album) + return + if maxwidth: + imagepath = resize_image(log, imagepath, maxwidth) + + log.info(u'Embedding album art into {0}', album) + + for item in album.items(): + embed_item(log, item, imagepath, maxwidth, None, + compare_threshold, ifempty, as_album=True) + + +def resize_image(log, imagepath, maxwidth): + """Returns path to an image resized to maxwidth. + """ + log.debug(u'Resizing album art to {0} pixels wide', maxwidth) + imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath)) + return imagepath + + +def check_art_similarity(log, item, imagepath, compare_threshold): + """A boolean indicating if an image is similar to embedded item art. + """ + with NamedTemporaryFile(delete=True) as f: + art = extract(log, f.name, item) + + if art: + is_windows = platform.system() == "Windows" + + # Converting images to grayscale tends to minimize the weight + # of colors in the diff score. + convert_proc = subprocess.Popen( + [b'convert', syspath(imagepath), syspath(art), + b'-colorspace', b'gray', b'MIFF:-'], + stdout=subprocess.PIPE, + close_fds=not is_windows, + ) + compare_proc = subprocess.Popen( + [b'compare', b'-metric', b'PHASH', b'-', b'null:'], + stdin=convert_proc.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=not is_windows, + ) + convert_proc.stdout.close() + + stdout, stderr = compare_proc.communicate() + if compare_proc.returncode: + if compare_proc.returncode != 1: + log.debug(u'IM phashes compare failed for {0}, {1}', + displayable_path(imagepath), + displayable_path(art)) + return + out_str = stderr + else: + out_str = stdout + + try: + phash_diff = float(out_str) + except ValueError: + log.debug(u'IM output is not a number: {0!r}', out_str) + return + + log.debug(u'compare PHASH score is {0}', phash_diff) + return phash_diff <= compare_threshold + + return True + + +def extract(log, outpath, item): + art = get_art(log, item) + + if not art: + log.info(u'No album art present in {0}, skipping.', item) + return + + # Add an extension to the filename. + ext = imghdr.what(None, h=art) + if not ext: + log.warning(u'Unknown image type in {0}.', + displayable_path(item.path)) + return + outpath += b'.' + ext + + log.info(u'Extracting album art from: {0} to: {1}', + item, displayable_path(outpath)) + with open(syspath(outpath), 'wb') as f: + f.write(art) + return outpath + + +def extract_first(log, outpath, items): + for item in items: + real_path = extract(log, outpath, item) + if real_path: + return real_path + + +def clear(log, lib, query): + id3v23 = config['id3v23'].get(bool) + + items = lib.items(query) + log.info(u'Clearing album art from {0} items', len(items)) + for item in items: + log.debug(u'Clearing art for {0}', item) + try: + mf = mediafile.MediaFile(syspath(item.path), id3v23) + except mediafile.UnreadableFileError as exc: + log.warning(u'Could not read file {0}: {1}', + displayable_path(item.path), exc) + else: + del mf.art + mf.save() diff --git a/beetsplug/convert.py b/beetsplug/convert.py index 38191d79c..e08e473f0 100644 --- a/beetsplug/convert.py +++ b/beetsplug/convert.py @@ -26,8 +26,8 @@ from string import Template from beets import ui, util, plugins, config from beets.plugins import BeetsPlugin -from beetsplug.embedart import EmbedCoverArtPlugin from beets.util.confit import ConfigTypeError +from beets import art _fs_lock = threading.Lock() _temp_files = [] # Keep track of temporary transcoded files for deletion. @@ -286,8 +286,8 @@ class ConvertPlugin(BeetsPlugin): if self.config['embed']: album = item.get_album() if album and album.artpath: - EmbedCoverArtPlugin().embed_item(item, album.artpath, - itempath=converted) + art.embed_item(self._log, item, album.artpath, + itempath=converted) if keep_new: plugins.send('after_convert', item=item, diff --git a/beetsplug/embedart.py b/beetsplug/embedart.py index 9117201e8..66a1d81b2 100644 --- a/beetsplug/embedart.py +++ b/beetsplug/embedart.py @@ -17,18 +17,14 @@ from __future__ import (division, absolute_import, print_function, unicode_literals) import os.path -import imghdr -import subprocess -import platform -from tempfile import NamedTemporaryFile from beets.plugins import BeetsPlugin -from beets import mediafile from beets import ui from beets.ui import decargs from beets.util import syspath, normpath, displayable_path, bytestring_path from beets.util.artresizer import ArtResizer from beets import config +from beets import art class EmbedCoverArtPlugin(BeetsPlugin): @@ -75,11 +71,12 @@ class EmbedCoverArtPlugin(BeetsPlugin): displayable_path(imagepath) )) for item in lib.items(decargs(args)): - self.embed_item(item, imagepath, maxwidth, None, - compare_threshold, ifempty) + art.embed_item(self._log, item, imagepath, maxwidth, None, + compare_threshold, ifempty) else: for album in lib.albums(decargs(args)): - self.embed_album(album, maxwidth) + art.embed_album(self._log, album, maxwidth, False, + compare_threshold, ifempty) embed_cmd.func = embed_func @@ -98,8 +95,8 @@ class EmbedCoverArtPlugin(BeetsPlugin): def extract_func(lib, opts, args): if opts.outpath: - self.extract_first(normpath(opts.outpath), - lib.items(decargs(args))) + art.extract_first(self._log, normpath(opts.outpath), + lib.items(decargs(args))) else: filename = bytestring_path(opts.filename or config['art_filename'].get()) @@ -109,7 +106,8 @@ class EmbedCoverArtPlugin(BeetsPlugin): return for album in lib.albums(decargs(args)): artpath = normpath(os.path.join(album.path, filename)) - artpath = self.extract_first(artpath, album.items()) + artpath = art.extract_first(self._log, artpath, + album.items()) if artpath and opts.associate: album.set_art(artpath) album.store() @@ -120,7 +118,7 @@ class EmbedCoverArtPlugin(BeetsPlugin): help='remove images from file metadata') def clear_func(lib, opts, args): - self.clear(lib, decargs(args)) + art.clear(self._log, lib, decargs(args)) clear_cmd.func = clear_func return [embed_cmd, extract_cmd, clear_cmd] @@ -130,169 +128,6 @@ class EmbedCoverArtPlugin(BeetsPlugin): """ if self.config['auto']: max_width = self.config['maxwidth'].get(int) - self.embed_album(album, max_width, True) - - def embed_item(self, item, imagepath, maxwidth=None, itempath=None, - compare_threshold=0, ifempty=False, as_album=False): - """Embed an image into the item's media file. - """ - if compare_threshold: - if not self.check_art_similarity(item, imagepath, - compare_threshold): - self._log.info(u'Image not similar; skipping.') - return - if ifempty and self.get_art(item): - self._log.info(u'media file already contained art') - return - if maxwidth and not as_album: - imagepath = self.resize_image(imagepath, maxwidth) - - try: - self._log.debug(u'embedding {0}', displayable_path(imagepath)) - image = self._mediafile_image(imagepath, maxwidth) - except IOError as exc: - self._log.warning(u'could not read image file: {0}', exc) - return - item.try_write(path=itempath, tags={'images': [image]}) - - def embed_album(self, album, maxwidth=None, quiet=False): - """Embed album art into all of the album's items. - """ - imagepath = album.artpath - if not imagepath: - self._log.info(u'No album art present for {0}', album) - return - if not os.path.isfile(syspath(imagepath)): - self._log.info(u'Album art not found at {0} for {1}', - displayable_path(imagepath), album) - return - if maxwidth: - imagepath = self.resize_image(imagepath, maxwidth) - - self._log.info(u'Embedding album art into {0}', album) - - for item in album.items(): - thresh = self.config['compare_threshold'].get(int) - ifempty = self.config['ifempty'].get(bool) - self.embed_item(item, imagepath, maxwidth, None, - thresh, ifempty, as_album=True) - - def resize_image(self, imagepath, maxwidth): - """Returns path to an image resized to maxwidth. - """ - self._log.debug(u'Resizing album art to {0} pixels wide', maxwidth) - imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath)) - return imagepath - - def check_art_similarity(self, item, imagepath, compare_threshold): - """A boolean indicating if an image is similar to embedded item art. - """ - with NamedTemporaryFile(delete=True) as f: - art = self.extract(f.name, item) - - if art: - is_windows = platform.system() == "Windows" - - # Converting images to grayscale tends to minimize the weight - # of colors in the diff score. - convert_proc = subprocess.Popen( - [b'convert', syspath(imagepath), syspath(art), - b'-colorspace', b'gray', b'MIFF:-'], - stdout=subprocess.PIPE, - close_fds=not is_windows, - ) - compare_proc = subprocess.Popen( - [b'compare', b'-metric', b'PHASH', b'-', b'null:'], - stdin=convert_proc.stdout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=not is_windows, - ) - convert_proc.stdout.close() - - stdout, stderr = compare_proc.communicate() - if compare_proc.returncode: - if compare_proc.returncode != 1: - self._log.debug(u'IM phashes compare failed for {0}, ' - u'{1}', displayable_path(imagepath), - displayable_path(art)) - return - out_str = stderr - else: - out_str = stdout - - try: - phash_diff = float(out_str) - except ValueError: - self._log.debug(u'IM output is not a number: {0!r}', - out_str) - return - - self._log.debug(u'compare PHASH score is {0}', phash_diff) - return phash_diff <= compare_threshold - - return True - - def _mediafile_image(self, image_path, maxwidth=None): - """Return a `mediafile.Image` object for the path. - """ - - with open(syspath(image_path), 'rb') as f: - data = f.read() - return mediafile.Image(data, type=mediafile.ImageType.front) - - def get_art(self, item): - # Extract the art. - try: - mf = mediafile.MediaFile(syspath(item.path)) - except mediafile.UnreadableFileError as exc: - self._log.warning(u'Could not extract art from {0}: {1}', - displayable_path(item.path), exc) - return - - return mf.art - - # 'extractart' command. - def extract(self, outpath, item): - art = self.get_art(item) - - if not art: - self._log.info(u'No album art present in {0}, skipping.', item) - return - - # Add an extension to the filename. - ext = imghdr.what(None, h=art) - if not ext: - self._log.warning(u'Unknown image type in {0}.', - displayable_path(item.path)) - return - outpath += b'.' + ext - - self._log.info(u'Extracting album art from: {0} to: {1}', - item, displayable_path(outpath)) - with open(syspath(outpath), 'wb') as f: - f.write(art) - return outpath - - def extract_first(self, outpath, items): - for item in items: - real_path = self.extract(outpath, item) - if real_path: - return real_path - - # 'clearart' command. - def clear(self, lib, query): - id3v23 = config['id3v23'].get(bool) - - items = lib.items(query) - self._log.info(u'Clearing album art from {0} items', len(items)) - for item in items: - self._log.debug(u'Clearing art for {0}', item) - try: - mf = mediafile.MediaFile(syspath(item.path), id3v23) - except mediafile.UnreadableFileError as exc: - self._log.warning(u'Could not read file {0}: {1}', - displayable_path(item.path), exc) - else: - del mf.art - mf.save() + art.embed_album(self._log, album, max_width, True, + self.config['compare_threshold'].get(int), + self.config['ifempty'].get(bool)) diff --git a/test/test_embedart.py b/test/test_embedart.py index 729f5853d..16899975e 100644 --- a/test/test_embedart.py +++ b/test/test_embedart.py @@ -27,7 +27,7 @@ from beets.mediafile import MediaFile from beets import config, logging, ui from beets.util import syspath from beets.util.artresizer import ArtResizer -from beetsplug.embedart import EmbedCoverArtPlugin +from beets import art def require_artresizer_compare(test): @@ -129,35 +129,35 @@ class EmbedartCliTest(_common.TestCase, TestHelper): class EmbedartTest(unittest.TestCase): - @patch('beetsplug.embedart.subprocess') + @patch('beets.art.subprocess') def test_imagemagick_response(self, mock_subprocess): - embed = EmbedCoverArtPlugin() - embed.extract = Mock(return_value=True) + art.extract = Mock(return_value=True) proc = mock_subprocess.Popen.return_value + log = logging.getLogger('beets.embedart') # everything is fine proc.returncode = 0 proc.communicate.return_value = "10", "tagada" - self.assertTrue(embed.check_art_similarity(None, None, 20)) - self.assertFalse(embed.check_art_similarity(None, None, 5)) + self.assertTrue(art.check_art_similarity(log, None, None, 20)) + self.assertFalse(art.check_art_similarity(log, None, None, 5)) # small failure proc.returncode = 1 proc.communicate.return_value = "tagada", "10" - self.assertTrue(embed.check_art_similarity(None, None, 20)) - self.assertFalse(embed.check_art_similarity(None, None, 5)) + self.assertTrue(art.check_art_similarity(log, None, None, 20)) + self.assertFalse(art.check_art_similarity(log, None, None, 5)) # bigger failure proc.returncode = 2 - self.assertIsNone(embed.check_art_similarity(None, None, 20)) + self.assertIsNone(art.check_art_similarity(log, None, None, 20)) # IM result parsing problems proc.returncode = 0 proc.communicate.return_value = "foo", "bar" - self.assertIsNone(embed.check_art_similarity(None, None, 20)) + self.assertIsNone(art.check_art_similarity(log, None, None, 20)) proc.returncode = 1 - self.assertIsNone(embed.check_art_similarity(None, None, 20)) + self.assertIsNone(art.check_art_similarity(log, None, None, 20)) def suite():