New beets.art image utilities

This makes it cleaner to share the high-level image-embedding logic between
the `embedart` and `convert` plugins. This resolves a regression, introduced
in f504c786, that inadvertently activated the former plugin when the latter
was enabled. I also like avoiding cross-plugin imports.
This commit is contained in:
Adrian Sampson 2015-04-05 12:15:38 -07:00
parent 677d1e0b15
commit 0af2cdaa3a
4 changed files with 224 additions and 192 deletions

197
beets/art.py Normal file
View file

@ -0,0 +1,197 @@
# This file is part of beets.
# Copyright 2015, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""High-level utilities for manipulating image files associated with
music and items' embedded album art.
"""
import subprocess
import platform
from tempfile import NamedTemporaryFile
import imghdr
import os
from beets.util import displayable_path, syspath
from beets.util.artresizer import ArtResizer
from beets import mediafile
from beets import config
def mediafile_image(image_path, maxwidth=None):
"""Return a `mediafile.Image` object for the path.
"""
with open(syspath(image_path), 'rb') as f:
data = f.read()
return mediafile.Image(data, type=mediafile.ImageType.front)
def get_art(log, item):
# Extract the art.
try:
mf = mediafile.MediaFile(syspath(item.path))
except mediafile.UnreadableFileError as exc:
log.warning(u'Could not extract art from {0}: {1}',
displayable_path(item.path), exc)
return
return mf.art
def embed_item(log, item, imagepath, maxwidth=None, itempath=None,
compare_threshold=0, ifempty=False, as_album=False):
"""Embed an image into the item's media file.
"""
if compare_threshold:
if not check_art_similarity(log, item, imagepath, compare_threshold):
log.info(u'Image not similar; skipping.')
return
if ifempty and get_art(log, item):
log.info(u'media file already contained art')
return
if maxwidth and not as_album:
imagepath = resize_image(log, imagepath, maxwidth)
try:
log.debug(u'embedding {0}', displayable_path(imagepath))
image = mediafile_image(imagepath, maxwidth)
except IOError as exc:
log.warning(u'could not read image file: {0}', exc)
return
item.try_write(path=itempath, tags={'images': [image]})
def embed_album(log, album, maxwidth=None, quiet=False,
compare_threshold=0, ifempty=False):
"""Embed album art into all of the album's items.
"""
imagepath = album.artpath
if not imagepath:
log.info(u'No album art present for {0}', album)
return
if not os.path.isfile(syspath(imagepath)):
log.info(u'Album art not found at {0} for {1}',
displayable_path(imagepath), album)
return
if maxwidth:
imagepath = resize_image(log, imagepath, maxwidth)
log.info(u'Embedding album art into {0}', album)
for item in album.items():
embed_item(log, item, imagepath, maxwidth, None,
compare_threshold, ifempty, as_album=True)
def resize_image(log, imagepath, maxwidth):
"""Returns path to an image resized to maxwidth.
"""
log.debug(u'Resizing album art to {0} pixels wide', maxwidth)
imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath))
return imagepath
def check_art_similarity(log, item, imagepath, compare_threshold):
"""A boolean indicating if an image is similar to embedded item art.
"""
with NamedTemporaryFile(delete=True) as f:
art = extract(log, f.name, item)
if art:
is_windows = platform.system() == "Windows"
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score.
convert_proc = subprocess.Popen(
[b'convert', syspath(imagepath), syspath(art),
b'-colorspace', b'gray', b'MIFF:-'],
stdout=subprocess.PIPE,
close_fds=not is_windows,
)
compare_proc = subprocess.Popen(
[b'compare', b'-metric', b'PHASH', b'-', b'null:'],
stdin=convert_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
convert_proc.stdout.close()
stdout, stderr = compare_proc.communicate()
if compare_proc.returncode:
if compare_proc.returncode != 1:
log.debug(u'IM phashes compare failed for {0}, {1}',
displayable_path(imagepath),
displayable_path(art))
return
out_str = stderr
else:
out_str = stdout
try:
phash_diff = float(out_str)
except ValueError:
log.debug(u'IM output is not a number: {0!r}', out_str)
return
log.debug(u'compare PHASH score is {0}', phash_diff)
return phash_diff <= compare_threshold
return True
def extract(log, outpath, item):
art = get_art(log, item)
if not art:
log.info(u'No album art present in {0}, skipping.', item)
return
# Add an extension to the filename.
ext = imghdr.what(None, h=art)
if not ext:
log.warning(u'Unknown image type in {0}.',
displayable_path(item.path))
return
outpath += b'.' + ext
log.info(u'Extracting album art from: {0} to: {1}',
item, displayable_path(outpath))
with open(syspath(outpath), 'wb') as f:
f.write(art)
return outpath
def extract_first(log, outpath, items):
for item in items:
real_path = extract(log, outpath, item)
if real_path:
return real_path
def clear(log, lib, query):
id3v23 = config['id3v23'].get(bool)
items = lib.items(query)
log.info(u'Clearing album art from {0} items', len(items))
for item in items:
log.debug(u'Clearing art for {0}', item)
try:
mf = mediafile.MediaFile(syspath(item.path), id3v23)
except mediafile.UnreadableFileError as exc:
log.warning(u'Could not read file {0}: {1}',
displayable_path(item.path), exc)
else:
del mf.art
mf.save()

View file

@ -26,8 +26,8 @@ from string import Template
from beets import ui, util, plugins, config
from beets.plugins import BeetsPlugin
from beetsplug.embedart import EmbedCoverArtPlugin
from beets.util.confit import ConfigTypeError
from beets import art
_fs_lock = threading.Lock()
_temp_files = [] # Keep track of temporary transcoded files for deletion.
@ -286,8 +286,8 @@ class ConvertPlugin(BeetsPlugin):
if self.config['embed']:
album = item.get_album()
if album and album.artpath:
EmbedCoverArtPlugin().embed_item(item, album.artpath,
itempath=converted)
art.embed_item(self._log, item, album.artpath,
itempath=converted)
if keep_new:
plugins.send('after_convert', item=item,

View file

@ -17,18 +17,14 @@ from __future__ import (division, absolute_import, print_function,
unicode_literals)
import os.path
import imghdr
import subprocess
import platform
from tempfile import NamedTemporaryFile
from beets.plugins import BeetsPlugin
from beets import mediafile
from beets import ui
from beets.ui import decargs
from beets.util import syspath, normpath, displayable_path, bytestring_path
from beets.util.artresizer import ArtResizer
from beets import config
from beets import art
class EmbedCoverArtPlugin(BeetsPlugin):
@ -75,11 +71,12 @@ class EmbedCoverArtPlugin(BeetsPlugin):
displayable_path(imagepath)
))
for item in lib.items(decargs(args)):
self.embed_item(item, imagepath, maxwidth, None,
compare_threshold, ifempty)
art.embed_item(self._log, item, imagepath, maxwidth, None,
compare_threshold, ifempty)
else:
for album in lib.albums(decargs(args)):
self.embed_album(album, maxwidth)
art.embed_album(self._log, album, maxwidth, False,
compare_threshold, ifempty)
embed_cmd.func = embed_func
@ -98,8 +95,8 @@ class EmbedCoverArtPlugin(BeetsPlugin):
def extract_func(lib, opts, args):
if opts.outpath:
self.extract_first(normpath(opts.outpath),
lib.items(decargs(args)))
art.extract_first(self._log, normpath(opts.outpath),
lib.items(decargs(args)))
else:
filename = bytestring_path(opts.filename or
config['art_filename'].get())
@ -109,7 +106,8 @@ class EmbedCoverArtPlugin(BeetsPlugin):
return
for album in lib.albums(decargs(args)):
artpath = normpath(os.path.join(album.path, filename))
artpath = self.extract_first(artpath, album.items())
artpath = art.extract_first(self._log, artpath,
album.items())
if artpath and opts.associate:
album.set_art(artpath)
album.store()
@ -120,7 +118,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
help='remove images from file metadata')
def clear_func(lib, opts, args):
self.clear(lib, decargs(args))
art.clear(self._log, lib, decargs(args))
clear_cmd.func = clear_func
return [embed_cmd, extract_cmd, clear_cmd]
@ -130,169 +128,6 @@ class EmbedCoverArtPlugin(BeetsPlugin):
"""
if self.config['auto']:
max_width = self.config['maxwidth'].get(int)
self.embed_album(album, max_width, True)
def embed_item(self, item, imagepath, maxwidth=None, itempath=None,
compare_threshold=0, ifempty=False, as_album=False):
"""Embed an image into the item's media file.
"""
if compare_threshold:
if not self.check_art_similarity(item, imagepath,
compare_threshold):
self._log.info(u'Image not similar; skipping.')
return
if ifempty and self.get_art(item):
self._log.info(u'media file already contained art')
return
if maxwidth and not as_album:
imagepath = self.resize_image(imagepath, maxwidth)
try:
self._log.debug(u'embedding {0}', displayable_path(imagepath))
image = self._mediafile_image(imagepath, maxwidth)
except IOError as exc:
self._log.warning(u'could not read image file: {0}', exc)
return
item.try_write(path=itempath, tags={'images': [image]})
def embed_album(self, album, maxwidth=None, quiet=False):
"""Embed album art into all of the album's items.
"""
imagepath = album.artpath
if not imagepath:
self._log.info(u'No album art present for {0}', album)
return
if not os.path.isfile(syspath(imagepath)):
self._log.info(u'Album art not found at {0} for {1}',
displayable_path(imagepath), album)
return
if maxwidth:
imagepath = self.resize_image(imagepath, maxwidth)
self._log.info(u'Embedding album art into {0}', album)
for item in album.items():
thresh = self.config['compare_threshold'].get(int)
ifempty = self.config['ifempty'].get(bool)
self.embed_item(item, imagepath, maxwidth, None,
thresh, ifempty, as_album=True)
def resize_image(self, imagepath, maxwidth):
"""Returns path to an image resized to maxwidth.
"""
self._log.debug(u'Resizing album art to {0} pixels wide', maxwidth)
imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath))
return imagepath
def check_art_similarity(self, item, imagepath, compare_threshold):
"""A boolean indicating if an image is similar to embedded item art.
"""
with NamedTemporaryFile(delete=True) as f:
art = self.extract(f.name, item)
if art:
is_windows = platform.system() == "Windows"
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score.
convert_proc = subprocess.Popen(
[b'convert', syspath(imagepath), syspath(art),
b'-colorspace', b'gray', b'MIFF:-'],
stdout=subprocess.PIPE,
close_fds=not is_windows,
)
compare_proc = subprocess.Popen(
[b'compare', b'-metric', b'PHASH', b'-', b'null:'],
stdin=convert_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=not is_windows,
)
convert_proc.stdout.close()
stdout, stderr = compare_proc.communicate()
if compare_proc.returncode:
if compare_proc.returncode != 1:
self._log.debug(u'IM phashes compare failed for {0}, '
u'{1}', displayable_path(imagepath),
displayable_path(art))
return
out_str = stderr
else:
out_str = stdout
try:
phash_diff = float(out_str)
except ValueError:
self._log.debug(u'IM output is not a number: {0!r}',
out_str)
return
self._log.debug(u'compare PHASH score is {0}', phash_diff)
return phash_diff <= compare_threshold
return True
def _mediafile_image(self, image_path, maxwidth=None):
"""Return a `mediafile.Image` object for the path.
"""
with open(syspath(image_path), 'rb') as f:
data = f.read()
return mediafile.Image(data, type=mediafile.ImageType.front)
def get_art(self, item):
# Extract the art.
try:
mf = mediafile.MediaFile(syspath(item.path))
except mediafile.UnreadableFileError as exc:
self._log.warning(u'Could not extract art from {0}: {1}',
displayable_path(item.path), exc)
return
return mf.art
# 'extractart' command.
def extract(self, outpath, item):
art = self.get_art(item)
if not art:
self._log.info(u'No album art present in {0}, skipping.', item)
return
# Add an extension to the filename.
ext = imghdr.what(None, h=art)
if not ext:
self._log.warning(u'Unknown image type in {0}.',
displayable_path(item.path))
return
outpath += b'.' + ext
self._log.info(u'Extracting album art from: {0} to: {1}',
item, displayable_path(outpath))
with open(syspath(outpath), 'wb') as f:
f.write(art)
return outpath
def extract_first(self, outpath, items):
for item in items:
real_path = self.extract(outpath, item)
if real_path:
return real_path
# 'clearart' command.
def clear(self, lib, query):
id3v23 = config['id3v23'].get(bool)
items = lib.items(query)
self._log.info(u'Clearing album art from {0} items', len(items))
for item in items:
self._log.debug(u'Clearing art for {0}', item)
try:
mf = mediafile.MediaFile(syspath(item.path), id3v23)
except mediafile.UnreadableFileError as exc:
self._log.warning(u'Could not read file {0}: {1}',
displayable_path(item.path), exc)
else:
del mf.art
mf.save()
art.embed_album(self._log, album, max_width, True,
self.config['compare_threshold'].get(int),
self.config['ifempty'].get(bool))

View file

@ -27,7 +27,7 @@ from beets.mediafile import MediaFile
from beets import config, logging, ui
from beets.util import syspath
from beets.util.artresizer import ArtResizer
from beetsplug.embedart import EmbedCoverArtPlugin
from beets import art
def require_artresizer_compare(test):
@ -129,35 +129,35 @@ class EmbedartCliTest(_common.TestCase, TestHelper):
class EmbedartTest(unittest.TestCase):
@patch('beetsplug.embedart.subprocess')
@patch('beets.art.subprocess')
def test_imagemagick_response(self, mock_subprocess):
embed = EmbedCoverArtPlugin()
embed.extract = Mock(return_value=True)
art.extract = Mock(return_value=True)
proc = mock_subprocess.Popen.return_value
log = logging.getLogger('beets.embedart')
# everything is fine
proc.returncode = 0
proc.communicate.return_value = "10", "tagada"
self.assertTrue(embed.check_art_similarity(None, None, 20))
self.assertFalse(embed.check_art_similarity(None, None, 5))
self.assertTrue(art.check_art_similarity(log, None, None, 20))
self.assertFalse(art.check_art_similarity(log, None, None, 5))
# small failure
proc.returncode = 1
proc.communicate.return_value = "tagada", "10"
self.assertTrue(embed.check_art_similarity(None, None, 20))
self.assertFalse(embed.check_art_similarity(None, None, 5))
self.assertTrue(art.check_art_similarity(log, None, None, 20))
self.assertFalse(art.check_art_similarity(log, None, None, 5))
# bigger failure
proc.returncode = 2
self.assertIsNone(embed.check_art_similarity(None, None, 20))
self.assertIsNone(art.check_art_similarity(log, None, None, 20))
# IM result parsing problems
proc.returncode = 0
proc.communicate.return_value = "foo", "bar"
self.assertIsNone(embed.check_art_similarity(None, None, 20))
self.assertIsNone(art.check_art_similarity(log, None, None, 20))
proc.returncode = 1
self.assertIsNone(embed.check_art_similarity(None, None, 20))
self.assertIsNone(art.check_art_similarity(log, None, None, 20))
def suite():