Convert fetchart plugin, with OO rewrite of sources

Art sources are now classes
This commit is contained in:
Bruno Cauet 2015-01-06 19:19:30 +01:00
parent 5c1cc6e7fc
commit 8097ff8c1d

View file

@ -21,7 +21,6 @@ from tempfile import NamedTemporaryFile
import requests
from beets import logging
from beets import plugins
from beets import importer
from beets import ui
@ -39,191 +38,166 @@ IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg']
CONTENT_TYPES = ('image/jpeg',)
DOWNLOAD_EXTENSION = '.jpg'
log = logging.getLogger(__name__)
requests_session = requests.Session()
requests_session.headers = {'User-Agent': 'beets'}
def _fetch_image(url):
"""Downloads an image from a URL and checks whether it seems to
actually be an image. If so, returns a path to the downloaded image.
Otherwise, returns None.
"""
log.debug(u'downloading art: {0}', url)
try:
with closing(requests_session.get(url, stream=True)) as resp:
if 'Content-Type' not in resp.headers \
or resp.headers['Content-Type'] not in CONTENT_TYPES:
log.debug(u'not an image')
return
# Generate a temporary file with the correct extension.
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION, delete=False) \
as fh:
for chunk in resp.iter_content():
fh.write(chunk)
log.debug(u'downloaded art to: {0}',
util.displayable_path(fh.name))
return fh.name
except (IOError, requests.RequestException):
log.debug(u'error fetching art')
# ART SOURCES ################################################################
# Cover Art Archive.
class ArtSource(object):
def __init__(self, log):
self._log = log
CAA_URL = 'http://coverartarchive.org/release/{mbid}/front-500.jpg'
CAA_GROUP_URL = 'http://coverartarchive.org/release-group/{mbid}/front-500.jpg'
def get(self, album):
raise NotImplementedError()
def caa_art(album):
"""Return the Cover Art Archive and Cover Art Archive release group URLs
using album MusicBrainz release ID and release group ID.
"""
if album.mb_albumid:
yield CAA_URL.format(mbid=album.mb_albumid)
if album.mb_releasegroupid:
yield CAA_GROUP_URL.format(mbid=album.mb_releasegroupid)
class CoverArtArchive(ArtSource):
"""Cover Art Archive"""
URL = 'http://coverartarchive.org/release/{mbid}/front-500.jpg'
GROUP_URL = 'http://coverartarchive.org/release-group/{mbid}/front-500.jpg'
def get(self, album):
"""Return the Cover Art Archive and Cover Art Archive release group URLs
using album MusicBrainz release ID and release group ID.
"""
if album.mb_albumid:
yield self.URL.format(mbid=album.mb_albumid)
if album.mb_releasegroupid:
yield self.GROUP_URL.format(mbid=album.mb_releasegroupid)
# Art from Amazon.
class Amazon(ArtSource):
URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
INDICES = (1, 2)
AMAZON_URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
AMAZON_INDICES = (1, 2)
def get(self, album):
"""Generate URLs using Amazon ID (ASIN) string.
"""
if album.asin:
for index in self.INDICES:
yield self.URL % (album.asin, index)
def art_for_asin(album):
"""Generate URLs using Amazon ID (ASIN) string.
"""
if album.asin:
for index in AMAZON_INDICES:
yield AMAZON_URL % (album.asin, index)
class AlbumArtOrg(ArtSource):
"""AlbumArt.org scraper"""
URL = 'http://www.albumart.org/index_detail.php'
PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'
# AlbumArt.org scraper.
AAO_URL = 'http://www.albumart.org/index_detail.php'
AAO_PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'
def aao_art(album):
"""Return art URL from AlbumArt.org using album ASIN.
"""
if not album.asin:
return
# Get the page from albumart.org.
try:
resp = requests_session.get(AAO_URL, params={'asin': album.asin})
log.debug(u'scraped art URL: {0}', resp.url)
except requests.RequestException:
log.debug(u'error scraping art page')
return
# Search the page for the image URL.
m = re.search(AAO_PAT, resp.text)
if m:
image_url = m.group(1)
yield image_url
else:
log.debug(u'no image found on page')
# Google Images scraper.
GOOGLE_URL = 'https://ajax.googleapis.com/ajax/services/search/images'
def google_art(album):
"""Return art URL from google.org given an album title and
interpreter.
"""
if not (album.albumartist and album.album):
return
search_string = (album.albumartist + ',' + album.album).encode('utf-8')
response = requests_session.get(GOOGLE_URL, params={
'v': '1.0',
'q': search_string,
'start': '0',
})
# Get results using JSON.
try:
results = response.json()
data = results['responseData']
dataInfo = data['results']
for myUrl in dataInfo:
yield myUrl['unescapedUrl']
except:
log.debug(u'error scraping art page')
return
# Art from the iTunes Store.
def itunes_art(album):
"""Return art URL from iTunes Store given an album title.
"""
search_string = (album.albumartist + ' ' + album.album).encode('utf-8')
try:
# Isolate bugs in the iTunes library while searching.
def get(self, album):
"""Return art URL from AlbumArt.org using album ASIN.
"""
if not album.asin:
return
# Get the page from albumart.org.
try:
itunes_album = itunes.search_album(search_string)[0]
except Exception as exc:
log.debug('iTunes search failed: {0}', exc)
resp = requests_session.get(self.URL, params={'asin': album.asin})
self._log.debug(u'scraped art URL: {0}', resp.url)
except requests.RequestException:
self._log.debug(u'error scraping art page')
return
if itunes_album.get_artwork()['100']:
small_url = itunes_album.get_artwork()['100']
big_url = small_url.replace('100x100', '1200x1200')
yield big_url
# Search the page for the image URL.
m = re.search(self.PAT, resp.text)
if m:
image_url = m.group(1)
yield image_url
else:
log.debug(u'album has no artwork in iTunes Store')
except IndexError:
log.debug(u'album not found in iTunes Store')
self._log.debug(u'no image found on page')
# Art from the filesystem.
class GoogleImages(ArtSource):
URL = 'https://ajax.googleapis.com/ajax/services/search/images'
def get(self, album):
"""Return art URL from google.org given an album title and
interpreter.
"""
if not (album.albumartist and album.album):
return
search_string = (album.albumartist + ',' + album.album).encode('utf-8')
response = requests_session.get(self.URL, params={
'v': '1.0',
'q': search_string,
'start': '0',
})
# Get results using JSON.
try:
results = response.json()
data = results['responseData']
dataInfo = data['results']
for myUrl in dataInfo:
yield myUrl['unescapedUrl']
except:
self._log.debug(u'error scraping art page')
return
def filename_priority(filename, cover_names):
"""Sort order for image names.
class ITunesStore(ArtSource):
# Art from the iTunes Store.
def get(self, album):
"""Return art URL from iTunes Store given an album title.
"""
search_string = (album.albumartist + ' ' + album.album).encode('utf-8')
try:
# Isolate bugs in the iTunes library while searching.
try:
itunes_album = itunes.search_album(search_string)[0]
except Exception as exc:
self._log.debug('iTunes search failed: {0}', exc)
return
Return indexes of cover names found in the image filename. This
means that images with lower-numbered and more keywords will have higher
priority.
"""
return [idx for (idx, x) in enumerate(cover_names) if x in filename]
if itunes_album.get_artwork()['100']:
small_url = itunes_album.get_artwork()['100']
big_url = small_url.replace('100x100', '1200x1200')
yield big_url
else:
self._log.debug(u'album has no artwork in iTunes Store')
except IndexError:
self._log.debug(u'album not found in iTunes Store')
def art_in_path(path, cover_names, cautious):
"""Look for album art files in a specified directory.
"""
if not os.path.isdir(path):
return
class FileSystem(ArtSource):
"""Art from the filesystem"""
@staticmethod
def filename_priority(filename, cover_names):
"""Sort order for image names.
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith('.' + ext):
images.append(fn)
Return indexes of cover names found in the image filename. This
means that images with lower-numbered and more keywords will have
higher priority.
"""
return [idx for (idx, x) in enumerate(cover_names) if x in filename]
# Look for "preferred" filenames.
images = sorted(images, key=lambda x: filename_priority(x, cover_names))
cover_pat = r"(\b|_)({0})(\b|_)".format('|'.join(cover_names))
for fn in images:
if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
log.debug(u'using well-named art file {0}',
util.displayable_path(fn))
return os.path.join(path, fn)
def get(self, path, cover_names, cautious):
"""Look for album art files in a specified directory.
"""
if not os.path.isdir(path):
return
# Fall back to any image in the folder.
if images and not cautious:
log.debug(u'using fallback art file {0}',
util.displayable_path(images[0]))
return os.path.join(path, images[0])
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith('.' + ext):
images.append(fn)
# Look for "preferred" filenames.
images = sorted(images,
key=lambda x: self.filename_priority(x, cover_names))
cover_pat = r"(\b|_)({0})(\b|_)".format('|'.join(cover_names))
for fn in images:
if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
self._log.debug(u'using well-named art file {0}',
util.displayable_path(fn))
return os.path.join(path, fn)
# Fall back to any image in the folder.
if images and not cautious:
self._log.debug(u'using fallback art file {0}',
util.displayable_path(images[0]))
return os.path.join(path, images[0])
# Try each source in turn.
@ -231,90 +205,16 @@ def art_in_path(path, cover_names, cautious):
SOURCES_ALL = [u'coverart', u'itunes', u'amazon', u'albumart', u'google']
ART_FUNCS = {
u'coverart': caa_art,
u'itunes': itunes_art,
u'albumart': aao_art,
u'amazon': art_for_asin,
u'google': google_art,
u'coverart': CoverArtArchive,
u'itunes': ITunesStore,
u'albumart': AlbumArtOrg,
u'amazon': Amazon,
u'google': GoogleImages,
}
def _source_urls(album, sources=SOURCES_ALL):
"""Generate possible source URLs for an album's art. The URLs are
not guaranteed to work so they each need to be attempted in turn.
This allows the main `art_for_album` function to abort iteration
through this sequence early to avoid the cost of scraping when not
necessary.
"""
for s in sources:
urls = ART_FUNCS[s](album)
for url in urls:
yield url
def art_for_album(album, paths, maxwidth=None, local_only=False):
"""Given an Album object, returns a path to downloaded art for the
album (or None if no art is found). If `maxwidth`, then images are
resized to this maximum pixel size. If `local_only`, then only local
image files from the filesystem are returned; no network requests
are made.
"""
out = None
# Local art.
cover_names = config['fetchart']['cover_names'].as_str_seq()
cover_names = map(util.bytestring_path, cover_names)
cautious = config['fetchart']['cautious'].get(bool)
if paths:
for path in paths:
out = art_in_path(path, cover_names, cautious)
if out:
break
# Web art sources.
remote_priority = config['fetchart']['remote_priority'].get(bool)
if not local_only and (remote_priority or not out):
for url in _source_urls(album,
config['fetchart']['sources'].as_str_seq()):
if maxwidth:
url = ArtResizer.shared.proxy_url(maxwidth, url)
candidate = _fetch_image(url)
if candidate:
out = candidate
break
if maxwidth and out:
out = ArtResizer.shared.resize(maxwidth, out)
return out
# PLUGIN LOGIC ###############################################################
def batch_fetch_art(lib, albums, force, maxwidth=None):
"""Fetch album art for each of the albums. This implements the manual
fetchart CLI command.
"""
for album in albums:
if album.artpath and not force:
message = 'has album art'
else:
# In ordinary invocations, look for images on the
# filesystem. When forcing, however, always go to the Web
# sources.
local_paths = None if force else [album.path]
path = art_for_album(album, local_paths, maxwidth)
if path:
album.set_art(path, False)
album.store()
message = ui.colorize('green', 'found album art')
else:
message = ui.colorize('red', 'no art found')
log.info(u'{0} - {1}: {2}', album.albumartist, album.album, message)
class FetchArtPlugin(plugins.BeetsPlugin):
def __init__(self):
super(FetchArtPlugin, self).__init__()
@ -342,8 +242,10 @@ class FetchArtPlugin(plugins.BeetsPlugin):
available_sources = list(SOURCES_ALL)
if not HAVE_ITUNES and u'itunes' in available_sources:
available_sources.remove(u'itunes')
self.config['sources'] = plugins.sanitize_choices(
sources_name = plugins.sanitize_choices(
self.config['sources'].as_str_seq(), available_sources)
self.sources = [ART_FUNCS[s](self._log) for s in sources_name]
self.fs_source = FileSystem(self._log)
# Asynchronous; after music is added to the library.
def fetch_art(self, session, task):
@ -359,7 +261,7 @@ class FetchArtPlugin(plugins.BeetsPlugin):
# For any other choices (e.g., TRACKS), do nothing.
return
path = art_for_album(task.album, task.paths, self.maxwidth, local)
path = self.art_for_album(task.album, task.paths, local)
if path:
self.art_paths[task] = path
@ -386,7 +288,102 @@ class FetchArtPlugin(plugins.BeetsPlugin):
help='re-download art when already present')
def func(lib, opts, args):
batch_fetch_art(lib, lib.albums(ui.decargs(args)), opts.force,
self.maxwidth)
self.batch_fetch_art(lib, lib.albums(ui.decargs(args)), opts.force)
cmd.func = func
return [cmd]
# Utilities converted from functions to methods on logging overhaul
def _fetch_image(self, url):
"""Downloads an image from a URL and checks whether it seems to
actually be an image. If so, returns a path to the downloaded image.
Otherwise, returns None.
"""
self._log.debug(u'downloading art: {0}', url)
try:
with closing(requests_session.get(url, stream=True)) as resp:
if 'Content-Type' not in resp.headers \
or resp.headers['Content-Type'] not in CONTENT_TYPES:
self._log.debug(u'not an image')
return
# Generate a temporary file with the correct extension.
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION,
delete=False) as fh:
for chunk in resp.iter_content():
fh.write(chunk)
self._log.debug(u'downloaded art to: {0}',
util.displayable_path(fh.name))
return fh.name
except (IOError, requests.RequestException):
self._log.debug(u'error fetching art')
def art_for_album(self, album, paths, local_only=False):
"""Given an Album object, returns a path to downloaded art for the
album (or None if no art is found). If `maxwidth`, then images are
resized to this maximum pixel size. If `local_only`, then only local
image files from the filesystem are returned; no network requests
are made.
"""
out = None
# Local art.
cover_names = config['fetchart']['cover_names'].as_str_seq()
cover_names = map(util.bytestring_path, cover_names)
cautious = config['fetchart']['cautious'].get(bool)
if paths:
for path in paths:
# FIXME
out = self.fs_source.get(path, cover_names, cautious)
if out:
break
# Web art sources.
remote_priority = config['fetchart']['remote_priority'].get(bool)
if not local_only and (remote_priority or not out):
for url in self._source_urls(album):
if self.maxwidth:
url = ArtResizer.shared.proxy_url(self.maxwidth, url)
candidate = self._fetch_image(url)
if candidate:
out = candidate
break
if self.maxwidth and out:
out = ArtResizer.shared.resize(self.maxwidth, out)
return out
def batch_fetch_art(self, lib, albums, force):
"""Fetch album art for each of the albums. This implements the manual
fetchart CLI command.
"""
for album in albums:
if album.artpath and not force:
message = 'has album art'
else:
# In ordinary invocations, look for images on the
# filesystem. When forcing, however, always go to the Web
# sources.
local_paths = None if force else [album.path]
path = self.art_for_album(album, local_paths)
if path:
album.set_art(path, False)
album.store()
message = ui.colorize('green', 'found album art')
else:
message = ui.colorize('red', 'no art found')
self._log.info(u'{0.albumartist} - {0.album}: {1}', album, message)
def _source_urls(self, album):
"""Generate possible source URLs for an album's art. The URLs are
not guaranteed to work so they each need to be attempted in turn.
This allows the main `art_for_album` function to abort iteration
through this sequence early to avoid the cost of scraping when not
necessary.
"""
for source in self.sources:
urls = source.get(album)
for url in urls:
yield url