mirror of
https://github.com/beetbox/beets.git
synced 2025-12-27 11:02:43 +01:00
Merge pull request #597 from johtso/fetchart-requests-plus-useragent
Switch fetchart to requests and send user-agent
This commit is contained in:
commit
7ddb8676c4
4 changed files with 92 additions and 95 deletions
|
|
@ -10,7 +10,7 @@ branches:
|
|||
|
||||
install:
|
||||
- travis_retry pip install .
|
||||
- travis_retry pip install pylast flask
|
||||
- travis_retry pip install pylast flask responses
|
||||
# unittest backport on Python < 2.7
|
||||
- "[[ $TRAVIS_PYTHON_VERSION == '2.6' ]] && pip install unittest2 || true"
|
||||
- travis_retry sudo apt-get update
|
||||
|
|
|
|||
|
|
@ -14,11 +14,13 @@
|
|||
|
||||
"""Fetches album art.
|
||||
"""
|
||||
import urllib
|
||||
import re
|
||||
from contextlib import closing
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import re
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
import requests
|
||||
|
||||
from beets.plugins import BeetsPlugin
|
||||
from beets.util.artresizer import ArtResizer
|
||||
|
|
@ -33,31 +35,32 @@ DOWNLOAD_EXTENSION = '.jpg'
|
|||
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
requests_session = requests.Session()
|
||||
requests_session.headers = {'User-Agent': 'beets'}
|
||||
|
||||
|
||||
def _fetch_image(url):
|
||||
"""Downloads an image from a URL and checks whether it seems to
|
||||
actually be an image. If so, returns a path to the downloaded image.
|
||||
Otherwise, returns None.
|
||||
"""
|
||||
# Generate a temporary filename with the correct extension.
|
||||
fd, fn = tempfile.mkstemp(suffix=DOWNLOAD_EXTENSION)
|
||||
os.close(fd)
|
||||
|
||||
log.debug(u'fetchart: downloading art: {0}'.format(url))
|
||||
try:
|
||||
_, headers = urllib.urlretrieve(url, filename=fn)
|
||||
except IOError:
|
||||
log.debug(u'error fetching art')
|
||||
return
|
||||
with closing(requests_session.get(url, stream=True)) as resp:
|
||||
if not resp.headers['Content-Type'] in CONTENT_TYPES:
|
||||
log.debug(u'fetchart: not an image')
|
||||
return
|
||||
|
||||
# Make sure it's actually an image.
|
||||
if headers.gettype() in CONTENT_TYPES:
|
||||
log.debug(u'fetchart: downloaded art to: {0}'.format(
|
||||
util.displayable_path(fn)
|
||||
))
|
||||
return fn
|
||||
else:
|
||||
log.debug(u'fetchart: not an image')
|
||||
# Generate a temporary file with the correct extension.
|
||||
with NamedTemporaryFile(suffix=DOWNLOAD_EXTENSION, delete=False) as fh:
|
||||
for chunk in resp.iter_content():
|
||||
fh.write(chunk)
|
||||
log.debug(u'fetchart: downloaded art to: {0}'.format(
|
||||
util.displayable_path(fh.name)
|
||||
))
|
||||
return fh.name
|
||||
except (IOError, requests.RequestException):
|
||||
log.debug(u'fetchart: error fetching art')
|
||||
|
||||
|
||||
# ART SOURCES ################################################################
|
||||
|
|
@ -98,16 +101,15 @@ AAO_PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'
|
|||
def aao_art(asin):
|
||||
"""Return art URL from AlbumArt.org given an ASIN."""
|
||||
# Get the page from albumart.org.
|
||||
url = '%s?%s' % (AAO_URL, urllib.urlencode({'asin': asin}))
|
||||
try:
|
||||
log.debug(u'fetchart: scraping art URL: {0}'.format(url))
|
||||
page = urllib.urlopen(url).read()
|
||||
except IOError:
|
||||
resp = requests_session.get(AAO_URL, params={'asin': asin})
|
||||
log.debug(u'fetchart: scraped art URL: {0}'.format(resp.url))
|
||||
except requests.RequestException:
|
||||
log.debug(u'fetchart: error scraping art page')
|
||||
return
|
||||
|
||||
# Search the page for the image URL.
|
||||
m = re.search(AAO_PAT, page)
|
||||
m = re.search(AAO_PAT, resp.text)
|
||||
if m:
|
||||
image_url = m.group(1)
|
||||
return image_url
|
||||
|
|
|
|||
1
setup.py
1
setup.py
|
|
@ -83,6 +83,7 @@ setup(name='beets',
|
|||
# Plugin (optional) dependencies:
|
||||
extras_require={
|
||||
'beatport': ['requests'],
|
||||
'fetchart': ['requests'],
|
||||
'chroma': ['pyacoustid'],
|
||||
'discogs': ['discogs-client'],
|
||||
'echonest_tempo': ['pyechonest'],
|
||||
|
|
|
|||
134
test/test_art.py
134
test/test_art.py
|
|
@ -14,6 +14,11 @@
|
|||
|
||||
"""Tests for the album art fetchers."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import responses
|
||||
|
||||
import _common
|
||||
from _common import unittest
|
||||
from beetsplug import fetchart
|
||||
|
|
@ -21,35 +26,27 @@ from beets.autotag import AlbumInfo, AlbumMatch
|
|||
from beets import library
|
||||
from beets import importer
|
||||
from beets import config
|
||||
import os
|
||||
import shutil
|
||||
import StringIO
|
||||
|
||||
class MockHeaders(object):
|
||||
def __init__(self, typeval):
|
||||
self.typeval = typeval
|
||||
def gettype(self):
|
||||
return self.typeval
|
||||
class MockUrlRetrieve(object):
|
||||
def __init__(self, typeval, pathval='fetched_path'):
|
||||
self.pathval = pathval
|
||||
self.headers = MockHeaders(typeval)
|
||||
self.fetched = None
|
||||
def __call__(self, url, filename=None):
|
||||
self.fetched = url
|
||||
return filename or self.pathval, self.headers
|
||||
|
||||
class FetchImageTest(_common.TestCase):
|
||||
@responses.activate
|
||||
def run(self, *args, **kwargs):
|
||||
super(FetchImageTest, self).run(*args, **kwargs)
|
||||
|
||||
def mock_response(self, content_type):
|
||||
responses.add(responses.GET, 'http://example.com', content_type=content_type)
|
||||
|
||||
def test_invalid_type_returns_none(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('')
|
||||
self.mock_response('image/watercolour')
|
||||
artpath = fetchart._fetch_image('http://example.com')
|
||||
self.assertEqual(artpath, None)
|
||||
|
||||
def test_jpeg_type_returns_path(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
|
||||
self.mock_response('image/jpeg')
|
||||
artpath = fetchart._fetch_image('http://example.com')
|
||||
self.assertNotEqual(artpath, None)
|
||||
|
||||
|
||||
class FSArtTest(_common.TestCase):
|
||||
def setUp(self):
|
||||
super(FSArtTest, self).setUp()
|
||||
|
|
@ -81,32 +78,32 @@ class FSArtTest(_common.TestCase):
|
|||
fn = fetchart.art_in_path(self.dpath, ('art',), True)
|
||||
self.assertEqual(fn, None)
|
||||
|
||||
|
||||
class CombinedTest(_common.TestCase):
|
||||
ASIN = 'xxxx'
|
||||
MBID = 'releaseid'
|
||||
AMAZON_URL = 'http://images.amazon.com/images/P/{0}.01.LZZZZZZZ.jpg'.format(ASIN)
|
||||
AAO_URL = 'http://www.albumart.org/index_detail.php?asin={0}'.format(ASIN)
|
||||
CAA_URL = 'http://coverartarchive.org/release/{0}/front-500.jpg'.format(MBID)
|
||||
|
||||
def setUp(self):
|
||||
super(CombinedTest, self).setUp()
|
||||
|
||||
self.dpath = os.path.join(self.temp_dir, 'arttest')
|
||||
os.mkdir(self.dpath)
|
||||
self.old_urlopen = fetchart.urllib.urlopen
|
||||
fetchart.urllib.urlopen = self._urlopen
|
||||
self.page_text = ""
|
||||
self.urlopen_called = False
|
||||
|
||||
# Set up configuration.
|
||||
fetchart.FetchArtPlugin()
|
||||
|
||||
def tearDown(self):
|
||||
super(CombinedTest, self).tearDown()
|
||||
fetchart.urllib.urlopen = self.old_urlopen
|
||||
@responses.activate
|
||||
def run(self, *args, **kwargs):
|
||||
super(CombinedTest, self).run(*args, **kwargs)
|
||||
|
||||
def _urlopen(self, url):
|
||||
self.urlopen_called = True
|
||||
self.fetched_url = url
|
||||
return StringIO.StringIO(self.page_text)
|
||||
def mock_response(self, url, content_type='image/jpeg'):
|
||||
responses.add(responses.GET, url, content_type=content_type)
|
||||
|
||||
def test_main_interface_returns_amazon_art(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
|
||||
album = _common.Bag(asin='xxxx')
|
||||
self.mock_response(self.AMAZON_URL)
|
||||
album = _common.Bag(asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, None)
|
||||
self.assertNotEqual(artpath, None)
|
||||
|
||||
|
|
@ -117,84 +114,81 @@ class CombinedTest(_common.TestCase):
|
|||
|
||||
def test_main_interface_gives_precedence_to_fs_art(self):
|
||||
_common.touch(os.path.join(self.dpath, 'art.jpg'))
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
|
||||
album = _common.Bag(asin='xxxx')
|
||||
self.mock_response(self.AMAZON_URL)
|
||||
album = _common.Bag(asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, [self.dpath])
|
||||
self.assertEqual(artpath, os.path.join(self.dpath, 'art.jpg'))
|
||||
|
||||
def test_main_interface_falls_back_to_amazon(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
|
||||
album = _common.Bag(asin='xxxx')
|
||||
self.mock_response(self.AMAZON_URL)
|
||||
album = _common.Bag(asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, [self.dpath])
|
||||
self.assertNotEqual(artpath, None)
|
||||
self.assertFalse(artpath.startswith(self.dpath))
|
||||
|
||||
def test_main_interface_tries_amazon_before_aao(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
|
||||
album = _common.Bag(asin='xxxx')
|
||||
self.mock_response(self.AMAZON_URL)
|
||||
album = _common.Bag(asin=self.ASIN)
|
||||
fetchart.art_for_album(album, [self.dpath])
|
||||
self.assertFalse(self.urlopen_called)
|
||||
self.assertEqual(len(responses.calls), 1)
|
||||
self.assertEqual(responses.calls[0].request.url, self.AMAZON_URL)
|
||||
|
||||
def test_main_interface_falls_back_to_aao(self):
|
||||
fetchart.urllib.urlretrieve = MockUrlRetrieve('text/html')
|
||||
album = _common.Bag(asin='xxxx')
|
||||
self.mock_response(self.AMAZON_URL, content_type='text/html')
|
||||
album = _common.Bag(asin=self.ASIN)
|
||||
fetchart.art_for_album(album, [self.dpath])
|
||||
self.assertTrue(self.urlopen_called)
|
||||
self.assertEqual(responses.calls[-1].request.url, self.AAO_URL)
|
||||
|
||||
def test_main_interface_uses_caa_when_mbid_available(self):
|
||||
mock_retrieve = MockUrlRetrieve('image/jpeg')
|
||||
fetchart.urllib.urlretrieve = mock_retrieve
|
||||
album = _common.Bag(mb_albumid='releaseid', asin='xxxx')
|
||||
self.mock_response(self.CAA_URL)
|
||||
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, None)
|
||||
self.assertNotEqual(artpath, None)
|
||||
self.assertTrue('coverartarchive.org' in mock_retrieve.fetched)
|
||||
self.assertEqual(len(responses.calls), 1)
|
||||
self.assertEqual(responses.calls[0].request.url, self.CAA_URL)
|
||||
|
||||
def test_local_only_does_not_access_network(self):
|
||||
mock_retrieve = MockUrlRetrieve('image/jpeg')
|
||||
fetchart.urllib.urlretrieve = mock_retrieve
|
||||
album = _common.Bag(mb_albumid='releaseid', asin='xxxx')
|
||||
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, [self.dpath], local_only=True)
|
||||
self.assertEqual(artpath, None)
|
||||
self.assertFalse(self.urlopen_called)
|
||||
self.assertFalse(mock_retrieve.fetched)
|
||||
self.assertEqual(len(responses.calls), 0)
|
||||
|
||||
def test_local_only_gets_fs_image(self):
|
||||
_common.touch(os.path.join(self.dpath, 'art.jpg'))
|
||||
mock_retrieve = MockUrlRetrieve('image/jpeg')
|
||||
fetchart.urllib.urlretrieve = mock_retrieve
|
||||
album = _common.Bag(mb_albumid='releaseid', asin='xxxx')
|
||||
album = _common.Bag(mb_albumid=self.MBID, asin=self.ASIN)
|
||||
artpath = fetchart.art_for_album(album, [self.dpath], None, local_only=True)
|
||||
self.assertEqual(artpath, os.path.join(self.dpath, 'art.jpg'))
|
||||
self.assertFalse(self.urlopen_called)
|
||||
self.assertFalse(mock_retrieve.fetched)
|
||||
self.assertEqual(len(responses.calls), 0)
|
||||
|
||||
|
||||
class AAOTest(_common.TestCase):
|
||||
def setUp(self):
|
||||
super(AAOTest, self).setUp()
|
||||
self.old_urlopen = fetchart.urllib.urlopen
|
||||
fetchart.urllib.urlopen = self._urlopen
|
||||
self.page_text = ''
|
||||
def tearDown(self):
|
||||
super(AAOTest, self).tearDown()
|
||||
fetchart.urllib.urlopen = self.old_urlopen
|
||||
ASIN = 'xxxx'
|
||||
AAO_URL = 'http://www.albumart.org/index_detail.php?asin={0}'.format(ASIN)
|
||||
|
||||
def _urlopen(self, url):
|
||||
return StringIO.StringIO(self.page_text)
|
||||
@responses.activate
|
||||
def run(self, *args, **kwargs):
|
||||
super(AAOTest, self).run(*args, **kwargs)
|
||||
|
||||
def mock_response(self, url, body):
|
||||
responses.add(responses.GET, url, body=body, content_type='text/html',
|
||||
match_querystring=True)
|
||||
|
||||
def test_aao_scraper_finds_image(self):
|
||||
self.page_text = """
|
||||
body = """
|
||||
<br />
|
||||
<a href="TARGET_URL" title="View larger image" class="thickbox" style="color: #7E9DA2; text-decoration:none;">
|
||||
<img src="http://www.albumart.org/images/zoom-icon.jpg" alt="View larger image" width="17" height="15" border="0"/></a>
|
||||
"""
|
||||
res = fetchart.aao_art('x')
|
||||
self.mock_response(self.AAO_URL, body)
|
||||
res = fetchart.aao_art(self.ASIN)
|
||||
self.assertEqual(res, 'TARGET_URL')
|
||||
|
||||
def test_aao_scraper_returns_none_when_no_image_present(self):
|
||||
self.page_text = "blah blah"
|
||||
res = fetchart.aao_art('x')
|
||||
self.mock_response(self.AAO_URL, 'blah blah')
|
||||
res = fetchart.aao_art(self.ASIN)
|
||||
self.assertEqual(res, None)
|
||||
|
||||
|
||||
class ArtImporterTest(_common.TestCase):
|
||||
def setUp(self):
|
||||
super(ArtImporterTest, self).setUp()
|
||||
|
|
|
|||
Loading…
Reference in a new issue