Merge pull request #1930 from wordofglass/fetchart_unify_sources

Fetchart: unify source handling, split out of #1917
This commit is contained in:
Adrian Sampson 2016-04-16 12:10:24 -07:00
commit 1215600d97
3 changed files with 383 additions and 291 deletions

View file

@ -41,9 +41,83 @@ IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg']
CONTENT_TYPES = ('image/jpeg', 'image/png')
DOWNLOAD_EXTENSION = '.jpg'
CANDIDATE_BAD = 0
CANDIDATE_EXACT = 1
CANDIDATE_DOWNSCALE = 2
class Candidate(object):
"""Holds information about a matching artwork, deals with validation of
dimension restrictions and resizing.
"""
CANDIDATE_BAD = 0
CANDIDATE_EXACT = 1
CANDIDATE_DOWNSCALE = 2
MATCH_EXACT = 0
MATCH_FALLBACK = 1
def __init__(self, log, path=None, url=None, source=u'',
match=None, size=None):
self._log = log
self.path = path
self.url = url
self.source = source
self.check = None
self.match = match
self.size = size
def _validate(self, extra):
"""Determine whether the candidate artwork is valid based on
its dimensions (width and ratio).
Return `CANDIDATE_BAD` if the file is unusable.
Return `CANDIDATE_EXACT` if the file is usable as-is.
Return `CANDIDATE_DOWNSCALE` if the file must be resized.
"""
if not self.path:
return self.CANDIDATE_BAD
if not (extra['enforce_ratio'] or
extra['minwidth'] or
extra['maxwidth']):
return self.CANDIDATE_EXACT
# get_size returns None if no local imaging backend is available
if not self.size:
self.size = ArtResizer.shared.get_size(self.path)
self._log.debug(u'image size: {}', self.size)
if not self.size:
self._log.warning(u'Could not get size of image (please see '
u'documentation for dependencies). '
u'The configuration options `minwidth` and '
u'`enforce_ratio` may be violated.')
return self.CANDIDATE_EXACT
# Check minimum size.
if extra['minwidth'] and self.size[0] < extra['minwidth']:
self._log.debug(u'image too small ({} < {})',
self.size[0], extra['minwidth'])
return self.CANDIDATE_BAD
# Check aspect ratio.
if extra['enforce_ratio'] and self.size[0] != self.size[1]:
self._log.debug(u'image is not square ({} != {})',
self.size[0], self.size[1])
return self.CANDIDATE_BAD
# Check maximum size.
if extra['maxwidth'] and self.size[0] > extra['maxwidth']:
self._log.debug(u'image needs resizing ({} > {})',
self.size[0], extra['maxwidth'])
return self.CANDIDATE_DOWNSCALE
return self.CANDIDATE_EXACT
def validate(self, extra):
self.check = self._validate(extra)
return self.check
def resize(self, extra):
if extra['maxwidth'] and self.check == self.CANDIDATE_DOWNSCALE:
self.path = ArtResizer.shared.resize(extra['maxwidth'], self.path)
def _logged_get(log, *args, **kwargs):
@ -99,43 +173,105 @@ class ArtSource(RequestMixin):
self._log = log
self._config = config
def get(self, album):
def get(self, album, extra):
raise NotImplementedError()
def _candidate(self, **kwargs):
return Candidate(source=self.NAME, log=self._log, **kwargs)
def fetch_image(self, candidate, extra):
raise NotImplementedError()
class CoverArtArchive(ArtSource):
"""Cover Art Archive"""
class LocalArtSource(ArtSource):
IS_LOCAL = True
LOC_STR = u'local'
def fetch_image(self, candidate, extra):
pass
class RemoteArtSource(ArtSource):
IS_LOCAL = False
LOC_STR = u'remote'
def fetch_image(self, candidate, extra):
"""Downloads an image from a URL and checks whether it seems to
actually be an image. If so, returns a path to the downloaded image.
Otherwise, returns None.
"""
if extra['maxwidth']:
candidate.url = ArtResizer.shared.proxy_url(extra['maxwidth'],
candidate.url)
try:
with closing(self.request(candidate.url, stream=True,
message=u'downloading image')) as resp:
if 'Content-Type' not in resp.headers \
or resp.headers['Content-Type'] not in CONTENT_TYPES:
self._log.debug(
u'not a supported image: {}',
resp.headers.get('Content-Type') or u'no content type',
)
candidate.path = None
return
# Generate a temporary file with the correct extension.
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION,
delete=False) as fh:
for chunk in resp.iter_content(chunk_size=1024):
fh.write(chunk)
self._log.debug(u'downloaded art to: {0}',
util.displayable_path(fh.name))
candidate.path = fh.name
return
except (IOError, requests.RequestException, TypeError) as exc:
# Handling TypeError works around a urllib3 bug:
# https://github.com/shazow/urllib3/issues/556
self._log.debug(u'error fetching art: {}', exc)
candidate.path = None
return
class CoverArtArchive(RemoteArtSource):
NAME = u"Cover Art Archive"
URL = 'http://coverartarchive.org/release/{mbid}/front'
GROUP_URL = 'http://coverartarchive.org/release-group/{mbid}/front'
def get(self, album):
def get(self, album, extra):
"""Return the Cover Art Archive and Cover Art Archive release group URLs
using album MusicBrainz release ID and release group ID.
"""
if album.mb_albumid:
yield self.URL.format(mbid=album.mb_albumid)
yield self._candidate(url=self.URL.format(mbid=album.mb_albumid),
match=Candidate.MATCH_EXACT)
if album.mb_releasegroupid:
yield self.GROUP_URL.format(mbid=album.mb_releasegroupid)
yield self._candidate(
url=self.GROUP_URL.format(mbid=album.mb_releasegroupid),
match=Candidate.MATCH_FALLBACK)
class Amazon(ArtSource):
class Amazon(RemoteArtSource):
NAME = u"Amazon"
URL = 'http://images.amazon.com/images/P/%s.%02i.LZZZZZZZ.jpg'
INDICES = (1, 2)
def get(self, album):
def get(self, album, extra):
"""Generate URLs using Amazon ID (ASIN) string.
"""
if album.asin:
for index in self.INDICES:
yield self.URL % (album.asin, index)
yield self._candidate(url=self.URL % (album.asin, index),
match=Candidate.MATCH_EXACT)
class AlbumArtOrg(ArtSource):
"""AlbumArt.org scraper"""
class AlbumArtOrg(RemoteArtSource):
NAME = u"AlbumArt.org scraper"
URL = 'http://www.albumart.org/index_detail.php'
PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'
def get(self, album):
def get(self, album, extra):
"""Return art URL from AlbumArt.org using album ASIN.
"""
if not album.asin:
@ -152,15 +288,21 @@ class AlbumArtOrg(ArtSource):
m = re.search(self.PAT, resp.text)
if m:
image_url = m.group(1)
yield image_url
yield self._candidate(url=image_url, match=Candidate.MATCH_EXACT)
else:
self._log.debug(u'no image found on page')
class GoogleImages(ArtSource):
class GoogleImages(RemoteArtSource):
NAME = u"Google Images"
URL = u'https://www.googleapis.com/customsearch/v1'
def get(self, album):
def __init__(self, *args, **kwargs):
super(GoogleImages, self).__init__(*args, **kwargs)
self.key = self._config['google_key'].get(),
self.cx = self._config['google_engine'].get(),
def get(self, album, extra):
"""Return art URL from google custom search engine
given an album title and interpreter.
"""
@ -168,8 +310,8 @@ class GoogleImages(ArtSource):
return
search_string = (album.albumartist + ',' + album.album).encode('utf-8')
response = self.request(self.URL, params={
'key': self._config['google_key'].get(),
'cx': self._config['google_engine'].get(),
'key': self.key,
'cx': self.cx,
'q': search_string,
'searchType': 'image'
})
@ -189,25 +331,30 @@ class GoogleImages(ArtSource):
if 'items' in data.keys():
for item in data['items']:
yield item['link']
yield self._candidate(url=item['link'],
match=Candidate.MATCH_EXACT)
class FanartTV(ArtSource):
class FanartTV(RemoteArtSource):
"""Art from fanart.tv requested using their API"""
NAME = u"fanart.tv"
API_URL = 'http://webservice.fanart.tv/v3/'
API_ALBUMS = API_URL + 'music/albums/'
PROJECT_KEY = '61a7d0ab4e67162b7a0c7c35915cd48e'
def get(self, album):
def __init__(self, *args, **kwargs):
super(FanartTV, self).__init__(*args, **kwargs)
self.client_key = self._config['fanarttv_key'].get()
def get(self, album, extra):
if not album.mb_releasegroupid:
return
response = self.request(
self.API_ALBUMS + album.mb_releasegroupid,
headers={
'api-key': self.PROJECT_KEY,
'client-key': self._config['fanarttv_key'].get()
})
headers={'api-key': self.PROJECT_KEY,
'client-key': self.client_key})
try:
data = response.json()
@ -241,12 +388,17 @@ class FanartTV(ArtSource):
matches.sort(key=lambda x: x[u'likes'], reverse=True)
for item in matches:
yield item[u'url']
# fanart.tv has a strict size requirement for album art to be
# uploaded
yield self._candidate(url=item[u'url'],
match=Candidate.MATCH_EXACT,
size=(1000, 1000))
class ITunesStore(ArtSource):
# Art from the iTunes Store.
def get(self, album):
class ITunesStore(RemoteArtSource):
NAME = u"iTunes Store"
def get(self, album, extra):
"""Return art URL from iTunes Store given an album title.
"""
if not (album.albumartist and album.album):
@ -271,15 +423,15 @@ class ITunesStore(ArtSource):
if itunes_album.get_artwork()['100']:
small_url = itunes_album.get_artwork()['100']
big_url = small_url.replace('100x100', '1200x1200')
yield big_url
yield self._candidate(url=big_url, match=Candidate.MATCH_EXACT)
else:
self._log.debug(u'album has no artwork in iTunes Store')
except IndexError:
self._log.debug(u'album not found in iTunes Store')
class Wikipedia(ArtSource):
# Art from Wikipedia (queried through DBpedia)
class Wikipedia(RemoteArtSource):
NAME = u"Wikipedia (queried through DBpedia)"
DBPEDIA_URL = 'http://dbpedia.org/sparql'
WIKIPEDIA_URL = 'http://en.wikipedia.org/w/api.php'
SPARQL_QUERY = '''PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
@ -304,7 +456,7 @@ class Wikipedia(ArtSource):
}}
Limit 1'''
def get(self, album):
def get(self, album, extra):
if not (album.albumartist and album.album):
return
@ -396,14 +548,16 @@ class Wikipedia(ArtSource):
results = data['query']['pages']
for _, result in results.iteritems():
image_url = result['imageinfo'][0]['url']
yield image_url
yield self._candidate(url=image_url,
match=Candidate.MATCH_EXACT)
except (ValueError, KeyError, IndexError):
self._log.debug(u'wikipedia: error scraping imageinfo')
return
class FileSystem(ArtSource):
"""Art from the filesystem"""
class FileSystem(LocalArtSource):
NAME = u"Filesystem"
@staticmethod
def filename_priority(filename, cover_names):
"""Sort order for image names.
@ -414,43 +568,58 @@ class FileSystem(ArtSource):
"""
return [idx for (idx, x) in enumerate(cover_names) if x in filename]
def get(self, path, cover_names, cautious):
"""Look for album art files in a specified directory.
def get(self, album, extra):
"""Look for album art files in the specified directories.
"""
if not os.path.isdir(path):
paths = extra['paths']
if not paths:
return
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith(b'.' + ext.encode('utf8')) and \
os.path.isfile(os.path.join(path, fn)):
images.append(fn)
# Look for "preferred" filenames.
images = sorted(images,
key=lambda x: self.filename_priority(x, cover_names))
cover_names = extra['cover_names']
cover_pat = br"(\b|_)({0})(\b|_)".format(b'|'.join(cover_names))
for fn in images:
if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
self._log.debug(u'using well-named art file {0}',
util.displayable_path(fn))
return os.path.join(path, fn)
cautious = extra['cautious']
# Fall back to any image in the folder.
if images and not cautious:
self._log.debug(u'using fallback art file {0}',
util.displayable_path(images[0]))
return os.path.join(path, images[0])
for path in paths:
if not os.path.isdir(path):
continue
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(path):
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith(b'.' + ext.encode('utf8')) and \
os.path.isfile(os.path.join(path, fn)):
images.append(fn)
# Look for "preferred" filenames.
images = sorted(images,
key=lambda x:
self.filename_priority(x, cover_names))
remaining = []
for fn in images:
if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
self._log.debug(u'using well-named art file {0}',
util.displayable_path(fn))
yield self._candidate(path=os.path.join(path, fn),
match=Candidate.MATCH_EXACT)
else:
remaining.append(fn)
# Fall back to any image in the folder.
if remaining and not cautious:
self._log.debug(u'using fallback art file {0}',
util.displayable_path(remaining[0]))
yield self._candidate(path=os.path.join(path, remaining[0]),
match=Candidate.MATCH_FALLBACK)
# Try each source in turn.
SOURCES_ALL = [u'coverart', u'itunes', u'amazon', u'albumart',
SOURCES_ALL = [u'filesystem',
u'coverart', u'itunes', u'amazon', u'albumart',
u'wikipedia', u'google', u'fanarttv']
ART_SOURCES = {
u'filesystem': FileSystem,
u'coverart': CoverArtArchive,
u'itunes': ITunesStore,
u'albumart': AlbumArtOrg,
@ -459,6 +628,7 @@ ART_SOURCES = {
u'google': GoogleImages,
u'fanarttv': FanartTV,
}
SOURCE_NAMES = {v: k for k, v in ART_SOURCES.items()}
# PLUGIN LOGIC ###############################################################
@ -472,10 +642,10 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
'minwidth': 0,
'maxwidth': 0,
'enforce_ratio': False,
'remote_priority': False,
'cautious': False,
'cover_names': ['cover', 'front', 'art', 'album', 'folder'],
'sources': ['coverart', 'itunes', 'amazon', 'albumart'],
'sources': ['filesystem',
'coverart', 'itunes', 'amazon', 'albumart'],
'google_key': None,
'google_engine': u'001442825323518660753:hrh5ch1gjzm',
'fanarttv_key': None
@ -491,6 +661,13 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
self.maxwidth = self.config['maxwidth'].get(int)
self.enforce_ratio = self.config['enforce_ratio'].get(bool)
cover_names = self.config['cover_names'].as_str_seq()
self.cover_names = map(util.bytestring_path, cover_names)
self.cautious = self.config['cautious'].get(bool)
self.src_removed = (config['import']['delete'].get(bool) or
config['import']['move'].get(bool))
if self.config['auto']:
# Enable two import hooks when fetching is enabled.
self.import_stages = [self.fetch_art]
@ -511,9 +688,18 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
u'available shorter after its upload. See the documentation.')
sources_name = plugins.sanitize_choices(
self.config['sources'].as_str_seq(), available_sources)
if 'remote_priority' in self.config:
self._log.warning(
u'The `fetch_art.remote_priority` configuration option has '
u'been deprecated, see the documentation.')
if self.config['remote_priority'].get(bool):
try:
self.sources_name.remove[u'filesystem']
sources_name.append[u'filesystem']
except ValueError:
pass
self.sources = [ART_SOURCES[s](self._log, self.config)
for s in sources_name]
self.fs_source = FileSystem(self._log, self.config)
# Asynchronous; after music is added to the library.
def fetch_art(self, session, task):
@ -532,10 +718,10 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
# For any other choices (e.g., TRACKS), do nothing.
return
path = self.art_for_album(task.album, task.paths, local)
candidate = self.art_for_album(task.album, task.paths, local)
if path:
self.art_paths[task] = path
if candidate:
self.art_paths[task] = candidate.path
# Synchronous; after music files are put in place.
def assign_art(self, session, task):
@ -544,11 +730,9 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
path = self.art_paths.pop(task)
album = task.album
src_removed = (config['import']['delete'].get(bool) or
config['import']['move'].get(bool))
album.set_art(path, not src_removed)
album.set_art(path, not self.src_removed)
album.store()
if src_removed:
if self.src_removed:
task.prune(path)
# Manual album art fetching.
@ -567,82 +751,6 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
# Utilities converted from functions to methods on logging overhaul
def _fetch_image(self, url):
"""Downloads an image from a URL and checks whether it seems to
actually be an image. If so, returns a path to the downloaded image.
Otherwise, returns None.
"""
try:
with closing(self.request(url, stream=True,
message=u'downloading image')) as resp:
if 'Content-Type' not in resp.headers \
or resp.headers['Content-Type'] not in CONTENT_TYPES:
self._log.debug(
u'not a supported image: {}',
resp.headers.get('Content-Type') or u'no content type',
)
return None
# Generate a temporary file with the correct extension.
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION,
delete=False) as fh:
for chunk in resp.iter_content(chunk_size=1024):
fh.write(chunk)
self._log.debug(u'downloaded art to: {0}',
util.displayable_path(fh.name))
return fh.name
except (IOError, requests.RequestException, TypeError) as exc:
# Handling TypeError works around a urllib3 bug:
# https://github.com/shazow/urllib3/issues/556
self._log.debug(u'error fetching art: {}', exc)
return None
def _is_valid_image_candidate(self, candidate):
"""Determine whether the given candidate artwork is valid based on
its dimensions (width and ratio).
Return `CANDIDATE_BAD` if the file is unusable.
Return `CANDIDATE_EXACT` if the file is usable as-is.
Return `CANDIDATE_DOWNSCALE` if the file must be resized.
"""
if not candidate:
return CANDIDATE_BAD
if not (self.enforce_ratio or self.minwidth or self.maxwidth):
return CANDIDATE_EXACT
# get_size returns None if no local imaging backend is available
size = ArtResizer.shared.get_size(candidate)
self._log.debug(u'image size: {}', size)
if not size:
self._log.warning(u'Could not get size of image (please see '
u'documentation for dependencies). '
u'The configuration options `minwidth` and '
u'`enforce_ratio` may be violated.')
return CANDIDATE_EXACT
# Check minimum size.
if self.minwidth and size[0] < self.minwidth:
self._log.debug(u'image too small ({} < {})',
size[0], self.minwidth)
return CANDIDATE_BAD
# Check aspect ratio.
if self.enforce_ratio and size[0] != size[1]:
self._log.debug(u'image is not square ({} != {})',
size[0], size[1])
return CANDIDATE_BAD
# Check maximum size.
if self.maxwidth and size[0] > self.maxwidth:
self._log.debug(u'image needs resizing ({} > {})',
size[0], self.maxwidth)
return CANDIDATE_DOWNSCALE
return CANDIDATE_EXACT
def art_for_album(self, album, paths, local_only=False):
"""Given an Album object, returns a path to downloaded art for the
album (or None if no art is found). If `maxwidth`, then images are
@ -651,36 +759,37 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
are made.
"""
out = None
check = None
# Local art.
cover_names = self.config['cover_names'].as_str_seq()
cover_names = map(util.bytestring_path, cover_names)
cautious = self.config['cautious'].get(bool)
if paths:
for path in paths:
candidate = self.fs_source.get(path, cover_names, cautious)
check = self._is_valid_image_candidate(candidate)
if check:
out = candidate
self._log.debug(u'found local image {}', out)
# all the information any of the sources might need
extra = {'paths': paths,
'cover_names': self.cover_names,
'cautious': self.cautious,
'enforce_ratio': self.enforce_ratio,
'minwidth': self.minwidth,
'maxwidth': self.maxwidth}
for source in self.sources:
if source.IS_LOCAL or not local_only:
self._log.debug(
u'trying source {0} for album {1.albumartist} - {1.album}',
SOURCE_NAMES[type(source)],
album,
)
# URLs might be invalid at this point, or the image may not
# fulfill the requirements
for candidate in source.get(album, extra):
source.fetch_image(candidate, extra)
if candidate.validate(extra):
out = candidate
self._log.debug(
u'using {0.LOC_STR} image {1}'.format(
source, util.displayable_path(out.path)))
break
if out:
break
# Web art sources.
remote_priority = self.config['remote_priority'].get(bool)
if not local_only and (remote_priority or not out):
for url in self._source_urls(album):
if self.maxwidth:
url = ArtResizer.shared.proxy_url(self.maxwidth, url)
candidate = self._fetch_image(url)
check = self._is_valid_image_candidate(candidate)
if check:
out = candidate
self._log.debug(u'using remote image {}', out)
break
if self.maxwidth and out and check == CANDIDATE_DOWNSCALE:
out = ArtResizer.shared.resize(self.maxwidth, out)
if out:
out.resize(extra)
return out
@ -697,30 +806,12 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
# sources.
local_paths = None if force else [album.path]
path = self.art_for_album(album, local_paths)
if path:
album.set_art(path, False)
candidate = self.art_for_album(album, local_paths)
if candidate:
album.set_art(candidate.path, False)
album.store()
message = ui.colorize('text_success', u'found album art')
else:
message = ui.colorize('text_error', u'no art found')
self._log.info(u'{0}: {1}', album, message)
def _source_urls(self, album):
"""Generate possible source URLs for an album's art. The URLs are
not guaranteed to work so they each need to be attempted in turn.
This allows the main `art_for_album` function to abort iteration
through this sequence early to avoid the cost of scraping when not
necessary.
"""
source_names = {v: k for k, v in ART_SOURCES.items()}
for source in self.sources:
self._log.debug(
u'trying source {0} for album {1.albumartist} - {1.album}',
source_names[type(source)],
album,
)
urls = source.get(album)
for url in urls:
yield url

View file

@ -43,15 +43,14 @@ file. The available options are:
pixels. The height is recomputed so that the aspect ratio is preserved.
- **enforce_ratio**: Only images with a width:height ratio of 1:1 are
considered as valid album art candidates. Default: ``no``.
- **remote_priority**: Query remote sources every time and use local image only
as fallback.
Default: ``no``; remote (Web) art sources are only queried if no local art is
found in the filesystem.
- **sources**: List of sources to search for images. An asterisk `*` expands
to all available sources.
Default: ``coverart itunes amazon albumart``, i.e., everything but
Default: ``filesystem coverart itunes amazon albumart``, i.e., everything but
``wikipedia``, ``google`` and ``fanarttv``. Enable those sources for more
matches at the cost of some speed.
matches at the cost of some speed. They are searched in the given order,
thus in the default config, no remote (Web) art source are queried if
local art is found in the filesystem. To use a local image as fallback,
move it to the end of the list.
- **google_key**: Your Google API key (to enable the Google Custom Search
backend).
Default: None.

View file

@ -29,7 +29,6 @@ from beetsplug import fetchart
from beets.autotag import AlbumInfo, AlbumMatch
from beets import library
from beets import importer
from beets import config
from beets import logging
from beets import util
from beets.util.artresizer import ArtResizer, WEBPROXY
@ -45,23 +44,33 @@ class UseThePlugin(_common.TestCase):
class FetchImageTest(UseThePlugin):
URL = 'http://example.com'
@responses.activate
def run(self, *args, **kwargs):
super(FetchImageTest, self).run(*args, **kwargs)
def mock_response(self, content_type):
responses.add(responses.GET, 'http://example.com',
responses.add(responses.GET, self.URL,
content_type=content_type)
def setUp(self):
super(FetchImageTest, self).setUp()
self.dpath = os.path.join(self.temp_dir, 'arttest')
self.source = fetchart.RemoteArtSource(logger, self.plugin.config)
self.extra = {'maxwidth': 0}
def test_invalid_type_returns_none(self):
self.mock_response('image/watercolour')
artpath = self.plugin._fetch_image('http://example.com')
self.assertEqual(artpath, None)
candidate = fetchart.Candidate(logger, url=self.URL)
self.source.fetch_image(candidate, self.extra)
self.assertEqual(candidate.path, None)
def test_jpeg_type_returns_path(self):
self.mock_response('image/jpeg')
artpath = self.plugin._fetch_image('http://example.com')
self.assertNotEqual(artpath, None)
candidate = fetchart.Candidate(logger, url=self.URL)
self.source.fetch_image(candidate, self.extra)
self.assertNotEqual(candidate.path, None)
class FSArtTest(UseThePlugin):
@ -71,38 +80,45 @@ class FSArtTest(UseThePlugin):
os.mkdir(self.dpath)
self.source = fetchart.FileSystem(logger, self.plugin.config)
self.extra = {'cautious': False,
'cover_names': ('art',),
'paths': [self.dpath]}
def test_finds_jpg_in_directory(self):
_common.touch(os.path.join(self.dpath, 'a.jpg'))
fn = self.source.get(self.dpath, ('art',), False)
self.assertEqual(fn, os.path.join(self.dpath, 'a.jpg'))
candidate = next(self.source.get(None, self.extra))
self.assertEqual(candidate.path, os.path.join(self.dpath, 'a.jpg'))
def test_appropriately_named_file_takes_precedence(self):
_common.touch(os.path.join(self.dpath, 'a.jpg'))
_common.touch(os.path.join(self.dpath, 'art.jpg'))
fn = self.source.get(self.dpath, ('art',), False)
self.assertEqual(fn, os.path.join(self.dpath, 'art.jpg'))
candidate = next(self.source.get(None, self.extra))
self.assertEqual(candidate.path, os.path.join(self.dpath, 'art.jpg'))
def test_non_image_file_not_identified(self):
_common.touch(os.path.join(self.dpath, 'a.txt'))
fn = self.source.get(self.dpath, ('art',), False)
self.assertEqual(fn, None)
with self.assertRaises(StopIteration):
next(self.source.get(None, self.extra))
def test_cautious_skips_fallback(self):
_common.touch(os.path.join(self.dpath, 'a.jpg'))
fn = self.source.get(self.dpath, ('art',), True)
self.assertEqual(fn, None)
self.extra['cautious'] = True
with self.assertRaises(StopIteration):
next(self.source.get(None, self.extra))
def test_empty_dir(self):
fn = self.source.get(self.dpath, ('art',), True)
self.assertEqual(fn, None)
with self.assertRaises(StopIteration):
next(self.source.get(None, self.extra))
def test_precedence_amongst_correct_files(self):
_common.touch(os.path.join(self.dpath, 'back.jpg'))
_common.touch(os.path.join(self.dpath, 'front.jpg'))
_common.touch(os.path.join(self.dpath, 'front-cover.jpg'))
fn = self.source.get(self.dpath, ('cover', 'front', 'back'), False)
self.assertEqual(fn, os.path.join(self.dpath, 'front-cover.jpg'))
images = ['front-cover.jpg', 'front.jpg', 'back.jpg']
paths = [os.path.join(self.dpath, i) for i in images]
for p in paths:
_common.touch(p)
self.extra['cover_names'] = ['cover', 'front', 'back']
candidates = [candidate.path for candidate in
self.source.get(None, self.extra)]
self.assertEqual(candidates, paths)
class CombinedTest(UseThePlugin):
@ -130,27 +146,28 @@ class CombinedTest(UseThePlugin):
def test_main_interface_returns_amazon_art(self):
self.mock_response(self.AMAZON_URL)
album = _common.Bag(asin=self.ASIN)
artpath = self.plugin.art_for_album(album, None)
self.assertNotEqual(artpath, None)
candidate = self.plugin.art_for_album(album, None)
self.assertIsNotNone(candidate)
def test_main_interface_returns_none_for_missing_asin_and_path(self):
album = _common.Bag()
artpath = self.plugin.art_for_album(album, None)
self.assertEqual(artpath, None)
candidate = self.plugin.art_for_album(album, None)
self.assertIsNone(candidate)
def test_main_interface_gives_precedence_to_fs_art(self):
_common.touch(os.path.join(self.dpath, 'art.jpg'))
self.mock_response(self.AMAZON_URL)
album = _common.Bag(asin=self.ASIN)
artpath = self.plugin.art_for_album(album, [self.dpath])
self.assertEqual(artpath, os.path.join(self.dpath, 'art.jpg'))
candidate = self.plugin.art_for_album(album, [self.dpath])
self.assertIsNotNone(candidate)
self.assertEqual(candidate.path, os.path.join(self.dpath, 'art.jpg'))
def test_main_interface_falls_back_to_amazon(self):
self.mock_response(self.AMAZON_URL)
album = _common.Bag(asin=self.ASIN)
artpath = self.plugin.art_for_album(album, [self.dpath])
self.assertNotEqual(artpath, None)
self.assertFalse(artpath.startswith(self.dpath))
candidate = self.plugin.art_for_album(album, [self.dpath])
self.assertIsNotNone(candidate)
self.assertFalse(candidate.path.startswith(self.dpath))
def test_main_interface_tries_amazon_before_aao(self):
self.mock_response(self.AMAZON_URL)
@ -168,24 +185,23 @@ class CombinedTest(UseThePlugin):
def test_main_interface_uses_caa_when_mbid_available(self):
self.mock_response(self.CAA_URL)
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
artpath = self.plugin.art_for_album(album, None)
self.assertNotEqual(artpath, None)
candidate = self.plugin.art_for_album(album, None)
self.assertIsNotNone(candidate)
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.CAA_URL)
def test_local_only_does_not_access_network(self):
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
artpath = self.plugin.art_for_album(album, [self.dpath],
local_only=True)
self.assertEqual(artpath, None)
self.plugin.art_for_album(album, None, local_only=True)
self.assertEqual(len(responses.calls), 0)
def test_local_only_gets_fs_image(self):
_common.touch(os.path.join(self.dpath, 'art.jpg'))
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
artpath = self.plugin.art_for_album(album, [self.dpath],
local_only=True)
self.assertEqual(artpath, os.path.join(self.dpath, 'art.jpg'))
candidate = self.plugin.art_for_album(album, [self.dpath],
local_only=True)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.path, os.path.join(self.dpath, 'art.jpg'))
self.assertEqual(len(responses.calls), 0)
@ -196,6 +212,7 @@ class AAOTest(UseThePlugin):
def setUp(self):
super(AAOTest, self).setUp()
self.source = fetchart.AlbumArtOrg(logger, self.plugin.config)
self.extra = dict()
@responses.activate
def run(self, *args, **kwargs):
@ -215,20 +232,21 @@ class AAOTest(UseThePlugin):
"""
self.mock_response(self.AAO_URL, body)
album = _common.Bag(asin=self.ASIN)
res = self.source.get(album)
self.assertEqual(list(res)[0], 'TARGET_URL')
candidate = next(self.source.get(album, self.extra))
self.assertEqual(candidate.url, 'TARGET_URL')
def test_aao_scraper_returns_no_result_when_no_image_present(self):
self.mock_response(self.AAO_URL, b'blah blah')
album = _common.Bag(asin=self.ASIN)
res = self.source.get(album)
self.assertEqual(list(res), [])
with self.assertRaises(StopIteration):
next(self.source.get(album, self.extra))
class GoogleImageTest(UseThePlugin):
def setUp(self):
super(GoogleImageTest, self).setUp()
self.source = fetchart.GoogleImages(logger, self.plugin.config)
self.extra = dict()
@responses.activate
def run(self, *args, **kwargs):
@ -242,22 +260,22 @@ class GoogleImageTest(UseThePlugin):
album = _common.Bag(albumartist="some artist", album="some album")
json = b'{"items": [{"link": "url_to_the_image"}]}'
self.mock_response(fetchart.GoogleImages.URL, json)
result_url = self.source.get(album)
self.assertEqual(list(result_url)[0], 'url_to_the_image')
candidate = next(self.source.get(album, self.extra))
self.assertEqual(candidate.url, 'url_to_the_image')
def test_google_art_returns_no_result_when_error_received(self):
album = _common.Bag(albumartist="some artist", album="some album")
json = b'{"error": {"errors": [{"reason": "some reason"}]}}'
self.mock_response(fetchart.GoogleImages.URL, json)
result_url = self.source.get(album)
self.assertEqual(list(result_url), [])
with self.assertRaises(StopIteration):
next(self.source.get(album, self.extra))
def test_google_art_returns_no_result_with_malformed_response(self):
album = _common.Bag(albumartist="some artist", album="some album")
json = b"""bla blup"""
self.mock_response(fetchart.GoogleImages.URL, json)
result_url = self.source.get(album)
self.assertEqual(list(result_url), [])
with self.assertRaises(StopIteration):
next(self.source.get(album, self.extra))
class FanartTVTest(UseThePlugin):
@ -304,6 +322,7 @@ class FanartTVTest(UseThePlugin):
def setUp(self):
super(FanartTVTest, self).setUp()
self.source = fetchart.FanartTV(logger, self.plugin.config)
self.extra = dict()
@responses.activate
def run(self, *args, **kwargs):
@ -317,22 +336,22 @@ class FanartTVTest(UseThePlugin):
album = _common.Bag(mb_releasegroupid=u'thereleasegroupid')
self.mock_response(fetchart.FanartTV.API_ALBUMS + u'thereleasegroupid',
self.RESPONSE_MULTIPLE)
result_url = self.source.get(album)
self.assertEqual(list(result_url)[0], 'http://example.com/1.jpg')
candidate = next(self.source.get(album, self.extra))
self.assertEqual(candidate.url, 'http://example.com/1.jpg')
def test_fanarttv_returns_no_result_when_error_received(self):
album = _common.Bag(mb_releasegroupid=u'thereleasegroupid')
self.mock_response(fetchart.FanartTV.API_ALBUMS + u'thereleasegroupid',
self.RESPONSE_ERROR)
result_url = self.source.get(album)
self.assertEqual(list(result_url), [])
with self.assertRaises(StopIteration):
next(self.source.get(album, self.extra))
def test_fanarttv_returns_no_result_with_malformed_response(self):
album = _common.Bag(mb_releasegroupid=u'thereleasegroupid')
self.mock_response(fetchart.FanartTV.API_ALBUMS + u'thereleasegroupid',
self.RESPONSE_MALFORMED)
result_url = self.source.get(album)
self.assertEqual(list(result_url), [])
with self.assertRaises(StopIteration):
next(self.source.get(album, self.extra))
@_common.slow_test()
@ -344,7 +363,7 @@ class ArtImporterTest(UseThePlugin):
self.art_file = os.path.join(self.temp_dir, 'tmpcover.jpg')
_common.touch(self.art_file)
self.old_afa = self.plugin.art_for_album
self.afa_response = self.art_file
self.afa_response = fetchart.Candidate(logger, path=self.art_file)
def art_for_album(i, p, local_only=False):
return self.afa_response
@ -423,19 +442,14 @@ class ArtImporterTest(UseThePlugin):
self.assertExists(self.art_file)
def test_delete_original_file(self):
config['import']['delete'] = True
self._fetch_art(True)
self.assertNotExists(self.art_file)
def test_move_original_file(self):
config['import']['move'] = True
self.plugin.src_removed = True
self._fetch_art(True)
self.assertNotExists(self.art_file)
def test_do_not_delete_original_if_already_in_place(self):
artdest = os.path.join(os.path.dirname(self.i.path), 'cover.jpg')
shutil.copyfile(self.art_file, artdest)
self.afa_response = artdest
self.afa_response = fetchart.Candidate(logger, path=artdest)
self._fetch_art(True)
def test_fetch_art_if_imported_file_deleted(self):
@ -463,49 +477,37 @@ class ArtForAlbumTest(UseThePlugin):
def setUp(self):
super(ArtForAlbumTest, self).setUp()
self.old_fs_source_get = self.plugin.fs_source.get
self.old_fetch_img = self.plugin._fetch_image
self.old_source_urls = self.plugin._source_urls
self.old_fs_source_get = fetchart.FileSystem.get
def fs_source_get(*_):
return self.image_file
def fs_source_get(_self, album, extra):
if extra['paths']:
yield fetchart.Candidate(logger, path=self.image_file)
def source_urls(_):
return ['']
fetchart.FileSystem.get = fs_source_get
def fetch_img(_):
return self.image_file
self.plugin.fs_source.get = fs_source_get
self.plugin._source_urls = source_urls
self.plugin._fetch_image = fetch_img
self.album = _common.Bag()
def tearDown(self):
self.plugin.fs_source.get = self.old_fs_source_get
self.plugin._source_urls = self.old_source_urls
self.plugin._fetch_image = self.old_fetch_img
fetchart.FileSystem.get = self.old_fs_source_get
super(ArtForAlbumTest, self).tearDown()
def _assertImageIsValidArt(self, image_file, should_exist):
self.assertExists(image_file)
self.image_file = image_file
local_artpath = self.plugin.art_for_album(None, [''], True)
remote_artpath = self.plugin.art_for_album(None, [], False)
self.assertEqual(local_artpath, remote_artpath)
candidate = self.plugin.art_for_album(self.album, [''], True)
if should_exist:
self.assertEqual(local_artpath, self.image_file)
self.assertExists(local_artpath)
return local_artpath
self.assertNotEqual(candidate, None)
self.assertEqual(candidate.path, self.image_file)
self.assertExists(candidate.path)
else:
self.assertIsNone(local_artpath)
self.assertIsNone(candidate)
def _assertImageResized(self, image_file, should_resize):
self.image_file = image_file
with patch.object(ArtResizer.shared, 'resize') as mock_resize:
self.plugin.art_for_album(None, [''], True)
self.plugin.art_for_album(self.album, [''], True)
self.assertEqual(mock_resize.called, should_resize)
def _require_backend(self):