Update 3 plugins: func → methods, listeners

- functions turn into method in order to have the logger object
- registering the listener has to be updated too
This commit is contained in:
Bruno Cauet 2015-01-06 16:54:15 +01:00
parent 203b325ee7
commit 860e7e1483
3 changed files with 288 additions and 303 deletions

View file

@ -29,9 +29,6 @@ from beets.util.artresizer import ArtResizer
from beets import config
log = logging.getLogger(__name__)
class EmbedCoverArtPlugin(BeetsPlugin):
"""Allows albumart to be embedded into the actual files.
"""
@ -46,13 +43,15 @@ class EmbedCoverArtPlugin(BeetsPlugin):
if self.config['maxwidth'].get(int) and not ArtResizer.shared.local:
self.config['maxwidth'] = 0
log.warn(u"ImageMagick or PIL not found; "
u"'maxwidth' option ignored")
self._log.warn(u"ImageMagick or PIL not found; "
u"'maxwidth' option ignored")
if self.config['compare_threshold'].get(int) and not \
ArtResizer.shared.can_compare:
self.config['compare_threshold'] = 0
log.warn(u"ImageMagick 6.8.7 or higher not installed; "
u"'compare_threshold' option ignored")
self._log.warn(u"ImageMagick 6.8.7 or higher not installed; "
u"'compare_threshold' option ignored")
self.register_listener('album_imported', self.album_imported)
def commands(self):
# Embed command.
@ -70,11 +69,11 @@ class EmbedCoverArtPlugin(BeetsPlugin):
if opts.file:
imagepath = normpath(opts.file)
for item in lib.items(decargs(args)):
embed_item(item, imagepath, maxwidth, None,
compare_threshold, ifempty)
self.embed_item(item, imagepath, maxwidth, None,
compare_threshold, ifempty)
else:
for album in lib.albums(decargs(args)):
embed_album(album, maxwidth)
self.embed_album(album, maxwidth)
embed_cmd.func = embed_func
@ -87,7 +86,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
def extract_func(lib, opts, args):
outpath = normpath(opts.outpath or 'cover')
item = lib.items(decargs(args)).get()
extract(outpath, item)
self.extract(outpath, item)
extract_cmd.func = extract_func
# Clear command.
@ -95,179 +94,173 @@ class EmbedCoverArtPlugin(BeetsPlugin):
help='remove images from file metadata')
def clear_func(lib, opts, args):
clear(lib, decargs(args))
self.clear(lib, decargs(args))
clear_cmd.func = clear_func
return [embed_cmd, extract_cmd, clear_cmd]
def album_imported(self, lib, album):
"""Automatically embed art into imported albums.
"""
if album.artpath and config['embedart']['auto']:
max_width = config['embedart']['maxwidth'].get(int)
self.embed_album(album, max_width, True)
@EmbedCoverArtPlugin.listen('album_imported')
def album_imported(lib, album):
"""Automatically embed art into imported albums.
"""
if album.artpath and config['embedart']['auto']:
embed_album(album, config['embedart']['maxwidth'].get(int), True)
def embed_item(item, imagepath, maxwidth=None, itempath=None,
compare_threshold=0, ifempty=False, as_album=False):
"""Embed an image into the item's media file.
"""
if compare_threshold:
if not check_art_similarity(item, imagepath, compare_threshold):
log.warn(u'Image not similar; skipping.')
return
if ifempty:
art = get_art(item)
if not art:
pass
else:
log.debug(u'media file contained art already {0}',
displayable_path(imagepath))
return
if maxwidth and not as_album:
imagepath = resize_image(imagepath, maxwidth)
try:
log.debug(u'embedding {0}', displayable_path(imagepath))
item['images'] = [_mediafile_image(imagepath, maxwidth)]
except IOError as exc:
log.error(u'could not read image file: {0}', exc)
else:
# We don't want to store the image in the database.
item.try_write(itempath)
del item['images']
def embed_album(album, maxwidth=None, quiet=False):
"""Embed album art into all of the album's items.
"""
imagepath = album.artpath
if not imagepath:
log.info(u'No album art present: {0} - {1}',
album.albumartist, album.album)
return
if not os.path.isfile(syspath(imagepath)):
log.error(u'Album art not found at {0}', displayable_path(imagepath))
return
if maxwidth:
imagepath = resize_image(imagepath, maxwidth)
log.log(
logging.DEBUG if quiet else logging.INFO,
u'Embedding album art into {0.albumartist} - {0.album}.', album
)
for item in album.items():
embed_item(item, imagepath, maxwidth, None,
config['embedart']['compare_threshold'].get(int),
config['embedart']['ifempty'].get(bool), as_album=True)
def resize_image(imagepath, maxwidth):
"""Returns path to an image resized to maxwidth.
"""
log.info(u'Resizing album art to {0} pixels wide', maxwidth)
imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath))
return imagepath
def check_art_similarity(item, imagepath, compare_threshold):
"""A boolean indicating if an image is similar to embedded item art.
"""
with NamedTemporaryFile(delete=True) as f:
art = extract(f.name, item)
if art:
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score
cmd = 'convert {0} {1} -colorspace gray MIFF:- | ' \
'compare -metric PHASH - null:'.format(syspath(imagepath),
syspath(art))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=platform.system() != 'Windows',
shell=True)
stdout, stderr = proc.communicate()
if proc.returncode:
if proc.returncode != 1:
log.warn(u'IM phashes compare failed for {0}, {1}',
displayable_path(imagepath),
displayable_path(art))
return
phashDiff = float(stderr)
def embed_item(self, item, imagepath, maxwidth=None, itempath=None,
compare_threshold=0, ifempty=False, as_album=False):
"""Embed an image into the item's media file.
"""
if compare_threshold:
if not self.check_art_similarity(item, imagepath,
compare_threshold):
self._log.warn(u'Image not similar; skipping.')
return
if ifempty:
art = self.get_art(item)
if not art:
pass
else:
phashDiff = float(stdout)
self._log.debug(u'media file contained art already {0}',
displayable_path(imagepath))
return
if maxwidth and not as_album:
imagepath = self.resize_image(imagepath, maxwidth)
log.info(u'compare PHASH score is {0}', phashDiff)
if phashDiff > compare_threshold:
return False
return True
def _mediafile_image(image_path, maxwidth=None):
"""Return a `mediafile.Image` object for the path.
"""
with open(syspath(image_path), 'rb') as f:
data = f.read()
return mediafile.Image(data, type=mediafile.ImageType.front)
def get_art(item):
# Extract the art.
try:
mf = mediafile.MediaFile(syspath(item.path))
except mediafile.UnreadableFileError as exc:
log.error(u'Could not extract art from {0}: {1}',
displayable_path(item.path), exc)
return
return mf.art
# 'extractart' command.
def extract(outpath, item):
if not item:
log.error(u'No item matches query.')
return
art = get_art(item)
if not art:
log.error(u'No album art present in {0} - {1}.',
item.artist, item.title)
return
# Add an extension to the filename.
ext = imghdr.what(None, h=art)
if not ext:
log.error(u'Unknown image type.')
return
outpath += '.' + ext
log.info(u'Extracting album art from: {0.artist} - {0.title} to: {1}',
item, displayable_path(outpath))
with open(syspath(outpath), 'wb') as f:
f.write(art)
return outpath
# 'clearart' command.
def clear(lib, query):
log.info(u'Clearing album art from items:')
for item in lib.items(query):
log.info(u'{0} - {1}', item.artist, item.title)
try:
mf = mediafile.MediaFile(syspath(item.path),
config['id3v23'].get(bool))
self._log.debug(u'embedding {0}', displayable_path(imagepath))
item['images'] = [self._mediafile_image(imagepath, maxwidth)]
except IOError as exc:
self._log.error(u'could not read image file: {0}', exc)
else:
# We don't want to store the image in the database.
item.try_write(itempath)
del item['images']
def embed_album(self, album, maxwidth=None, quiet=False):
"""Embed album art into all of the album's items.
"""
imagepath = album.artpath
if not imagepath:
self._log.info(u'No album art present: {0} - {1}',
album.albumartist, album.album)
return
if not os.path.isfile(syspath(imagepath)):
self._log.error(u'Album art not found at {0}',
displayable_path(imagepath))
return
if maxwidth:
imagepath = self.resize_image(imagepath, maxwidth)
self._log.log(
logging.DEBUG if quiet else logging.INFO,
u'Embedding album art into {0.albumartist} - {0.album}.', album
)
for item in album.items():
thresh = config['embedart']['compare_threshold'].get(int)
ifempty = config['embedart']['ifempty'].get(bool)
self.embed_item(item, imagepath, maxwidth, None,
thresh, ifempty, as_album=True)
def resize_image(self, imagepath, maxwidth):
"""Returns path to an image resized to maxwidth.
"""
self._log.info(u'Resizing album art to {0} pixels wide', maxwidth)
imagepath = ArtResizer.shared.resize(maxwidth, syspath(imagepath))
return imagepath
def check_art_similarity(self, item, imagepath, compare_threshold):
"""A boolean indicating if an image is similar to embedded item art.
"""
with NamedTemporaryFile(delete=True) as f:
art = self.extract(f.name, item)
if art:
# Converting images to grayscale tends to minimize the weight
# of colors in the diff score
cmd = 'convert {0} {1} -colorspace gray MIFF:- | ' \
'compare -metric PHASH - null:' \
.format(syspath(imagepath), syspath(art))
is_windows = platform.system() != "Windows"
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=is_windows,
shell=True)
stdout, stderr = proc.communicate()
if proc.returncode:
if proc.returncode != 1:
self._log.warn(u'IM phashes compare failed for {0}, '
u'{1}', displayable_path(imagepath),
displayable_path(art))
return
phashDiff = float(stderr)
else:
phashDiff = float(stdout)
self._log.info(u'compare PHASH score is {0}', phashDiff)
if phashDiff > compare_threshold:
return False
return True
def _mediafile_image(self, image_path, maxwidth=None):
"""Return a `mediafile.Image` object for the path.
"""
with open(syspath(image_path), 'rb') as f:
data = f.read()
return mediafile.Image(data, type=mediafile.ImageType.front)
def get_art(self, item):
# Extract the art.
try:
mf = mediafile.MediaFile(syspath(item.path))
except mediafile.UnreadableFileError as exc:
log.error(u'Could not clear art from {0}: {1}',
displayable_path(item.path), exc)
continue
del mf.art
mf.save()
self._log.error(u'Could not extract art from {0}: {1}',
displayable_path(item.path), exc)
return
return mf.art
# 'extractart' command.
def extract(self, outpath, item):
if not item:
self._log.error(u'No item matches query.')
return
art = self.get_art(item)
if not art:
self._log.error(u'No album art present in {0} - {1}.',
item.artist, item.title)
return
# Add an extension to the filename.
ext = imghdr.what(None, h=art)
if not ext:
self._log.error(u'Unknown image type.')
return
outpath += '.' + ext
self._log.info(u'Extracting album art from: {0.artist} - {0.title} '
u'to: {1}', item, displayable_path(outpath))
with open(syspath(outpath), 'wb') as f:
f.write(art)
return outpath
# 'clearart' command.
def clear(self, lib, query):
self._log.info(u'Clearing album art from items:')
for item in lib.items(query):
self._log.info(u'{0} - {1}', item.artist, item.title)
try:
mf = mediafile.MediaFile(syspath(item.path),
config['id3v23'].get(bool))
except mediafile.UnreadableFileError as exc:
self._log.error(u'Could not clear art from {0}: {1}',
displayable_path(item.path), exc)
continue
del mf.art
mf.save()

View file

@ -22,36 +22,9 @@ import re
from beets.plugins import BeetsPlugin
from beets.util import normpath, syspath, bytestring_path
from beets import config, logging
from beets import config
M3U_DEFAULT_NAME = 'imported.m3u'
log = logging.getLogger(__name__)
class ImportFeedsPlugin(BeetsPlugin):
def __init__(self):
super(ImportFeedsPlugin, self).__init__()
self.config.add({
'formats': [],
'm3u_name': u'imported.m3u',
'dir': None,
'relative_to': None,
'absolute_path': False,
})
feeds_dir = self.config['dir'].get()
if feeds_dir:
feeds_dir = os.path.expanduser(bytestring_path(feeds_dir))
self.config['dir'] = feeds_dir
if not os.path.exists(syspath(feeds_dir)):
os.makedirs(syspath(feeds_dir))
relative_to = self.config['relative_to'].get()
if relative_to:
self.config['relative_to'] = normpath(relative_to)
else:
self.config['relative_to'] = feeds_dir
def _get_feeds_dir(lib):
@ -89,62 +62,85 @@ def _write_m3u(m3u_path, items_paths):
f.write(path + '\n')
def _record_items(lib, basename, items):
"""Records relative paths to the given items for each feed format
"""
feedsdir = bytestring_path(config['importfeeds']['dir'].as_filename())
formats = config['importfeeds']['formats'].as_str_seq()
relative_to = config['importfeeds']['relative_to'].get() \
or config['importfeeds']['dir'].as_filename()
relative_to = bytestring_path(relative_to)
class ImportFeedsPlugin(BeetsPlugin):
def __init__(self):
super(ImportFeedsPlugin, self).__init__()
paths = []
for item in items:
if config['importfeeds']['absolute_path']:
paths.append(item.path)
self.config.add({
'formats': [],
'm3u_name': u'imported.m3u',
'dir': None,
'relative_to': None,
'absolute_path': False,
})
feeds_dir = self.config['dir'].get()
if feeds_dir:
feeds_dir = os.path.expanduser(bytestring_path(feeds_dir))
self.config['dir'] = feeds_dir
if not os.path.exists(syspath(feeds_dir)):
os.makedirs(syspath(feeds_dir))
relative_to = self.config['relative_to'].get()
if relative_to:
self.config['relative_to'] = normpath(relative_to)
else:
try:
relpath = os.path.relpath(item.path, relative_to)
except ValueError:
# On Windows, it is sometimes not possible to construct a
# relative path (if the files are on different disks).
relpath = item.path
paths.append(relpath)
self.config['relative_to'] = feeds_dir
if 'm3u' in formats:
basename = bytestring_path(
config['importfeeds']['m3u_name'].get(unicode)
)
m3u_path = os.path.join(feedsdir, basename)
_write_m3u(m3u_path, paths)
self.register_listener('library_opened', self.library_opened)
self.register_listener('album_imported', self.album_imported)
self.register_listener('item_imported', self.item_imported)
if 'm3u_multi' in formats:
m3u_path = _build_m3u_filename(basename)
_write_m3u(m3u_path, paths)
def _record_items(self, lib, basename, items):
"""Records relative paths to the given items for each feed format
"""
feedsdir = bytestring_path(config['importfeeds']['dir'].as_filename())
formats = config['importfeeds']['formats'].as_str_seq()
relative_to = config['importfeeds']['relative_to'].get() \
or config['importfeeds']['dir'].as_filename()
relative_to = bytestring_path(relative_to)
if 'link' in formats:
for path in paths:
dest = os.path.join(feedsdir, os.path.basename(path))
if not os.path.exists(syspath(dest)):
os.symlink(syspath(path), syspath(dest))
paths = []
for item in items:
if config['importfeeds']['absolute_path']:
paths.append(item.path)
else:
try:
relpath = os.path.relpath(item.path, relative_to)
except ValueError:
# On Windows, it is sometimes not possible to construct a
# relative path (if the files are on different disks).
relpath = item.path
paths.append(relpath)
if 'echo' in formats:
log.info("Location of imported music:")
for path in paths:
log.info(" {0}", path)
if 'm3u' in formats:
basename = bytestring_path(
config['importfeeds']['m3u_name'].get(unicode)
)
m3u_path = os.path.join(feedsdir, basename)
_write_m3u(m3u_path, paths)
if 'm3u_multi' in formats:
m3u_path = _build_m3u_filename(basename)
_write_m3u(m3u_path, paths)
@ImportFeedsPlugin.listen('library_opened')
def library_opened(lib):
if config['importfeeds']['dir'].get() is None:
config['importfeeds']['dir'] = _get_feeds_dir(lib)
if 'link' in formats:
for path in paths:
dest = os.path.join(feedsdir, os.path.basename(path))
if not os.path.exists(syspath(dest)):
os.symlink(syspath(path), syspath(dest))
if 'echo' in formats:
self._log.info("Location of imported music:")
for path in paths:
self._log.info(" {0}", path)
@ImportFeedsPlugin.listen('album_imported')
def album_imported(lib, album):
_record_items(lib, album.album, album.items())
def library_opened(self, lib):
if self.config['dir'].get() is None:
self.config['dir'] = _get_feeds_dir(lib)
def album_imported(self, lib, album):
self._record_items(lib, album.album, album.items())
@ImportFeedsPlugin.listen('item_imported')
def item_imported(lib, item):
_record_items(lib, item.title, [item])
def item_imported(self, lib, item):
self._record_items(lib, item.title, [item])

View file

@ -16,15 +16,12 @@
automatically whenever tags are written.
"""
from beets import logging
from beets.plugins import BeetsPlugin
from beets import ui
from beets import util
from beets import config
from beets import mediafile
log = logging.getLogger(__name__)
_MUTAGEN_FORMATS = {
'asf': 'ASF',
'apev2': 'APEv2File',
@ -54,6 +51,7 @@ class ScrubPlugin(BeetsPlugin):
self.config.add({
'auto': True,
})
self.register_listener("write", self.write_item)
def commands(self):
def scrub_func(lib, opts, args):
@ -64,7 +62,8 @@ class ScrubPlugin(BeetsPlugin):
# Walk through matching files and remove tags.
for item in lib.items(ui.decargs(args)):
log.info(u'scrubbing: {0}', util.displayable_path(item.path))
self._log.info(u'scrubbing: {0}',
util.displayable_path(item.path))
# Get album art if we need to restore it.
if opts.write:
@ -73,14 +72,14 @@ class ScrubPlugin(BeetsPlugin):
art = mf.art
# Remove all tags.
_scrub(item.path)
self._scrub(item.path)
# Restore tags, if enabled.
if opts.write:
log.debug(u'writing new tags after scrub')
self._log.debug(u'writing new tags after scrub')
item.try_write()
if art:
log.info(u'restoring art')
self._log.info(u'restoring art')
mf = mediafile.MediaFile(item.path)
mf.art = art
mf.save()
@ -95,49 +94,46 @@ class ScrubPlugin(BeetsPlugin):
return [scrub_cmd]
@staticmethod
def _mutagen_classes():
"""Get a list of file type classes from the Mutagen module.
"""
classes = []
for modname, clsname in _MUTAGEN_FORMATS.items():
mod = __import__('mutagen.{0}'.format(modname),
fromlist=[clsname])
classes.append(getattr(mod, clsname))
return classes
def _mutagen_classes():
"""Get a list of file type classes from the Mutagen module.
"""
classes = []
for modname, clsname in _MUTAGEN_FORMATS.items():
mod = __import__('mutagen.{0}'.format(modname),
fromlist=[clsname])
classes.append(getattr(mod, clsname))
return classes
def _scrub(self, path):
"""Remove all tags from a file.
"""
for cls in self._mutagen_classes():
# Try opening the file with this type, but just skip in the
# event of any error.
try:
f = cls(util.syspath(path))
except Exception:
continue
if f.tags is None:
continue
# Remove the tag for this type.
try:
f.delete()
except NotImplementedError:
# Some Mutagen metadata subclasses (namely, ASFTag) do not
# support .delete(), presumably because it is impossible to
# remove them. In this case, we just remove all the tags.
for tag in f.keys():
del f[tag]
f.save()
except IOError as exc:
self._log.error(u'could not scrub {0}: {1}',
util.displayable_path(path), exc)
def _scrub(path):
"""Remove all tags from a file.
"""
for cls in _mutagen_classes():
# Try opening the file with this type, but just skip in the
# event of any error.
try:
f = cls(util.syspath(path))
except Exception:
continue
if f.tags is None:
continue
# Remove the tag for this type.
try:
f.delete()
except NotImplementedError:
# Some Mutagen metadata subclasses (namely, ASFTag) do not
# support .delete(), presumably because it is impossible to
# remove them. In this case, we just remove all the tags.
for tag in f.keys():
del f[tag]
f.save()
except IOError as exc:
log.error(u'could not scrub {0}: {1}',
util.displayable_path(path), exc)
# Automatically embed art into imported albums.
@ScrubPlugin.listen('write')
def write_item(path):
if not scrubbing and config['scrub']['auto']:
log.debug(u'auto-scrubbing {0}', util.displayable_path(path))
_scrub(path)
def write_item(self, path):
"""Automatically embed art into imported albums."""
if not scrubbing and config['scrub']['auto']:
self._log.debug(u'auto-scrubbing {0}', util.displayable_path(path))
self._scrub(path)