Improved multi-disc album detection.

- Remove "part", "volume", "vol." multi-disc markers. These are often
  part of album titles, and not necessarily indicative of a multi-disc
  album. Only look for "CD X" and "disc X" (case insensitive), ignoring
  white space and other non-word characters.

- Don't only expect each disc to be in a subdirectory of a common parent
  directory, with all siblings belonging to the same release. Also match
  any consecutive siblings (even when the parent contains other albums)
  that are named with the same prefix and multi-disc marker.

- The `albums_in_dir(path)` function now always yields a list of paths
  along with each list of items. `ItemTask.path` is now always a list of
  paths.

- The `displayable_path(path)` function now accepts a list of paths, and
  will join them with "; " by default. This can be changed with the
  `separator` argument.

- The `sorted_walk()` function now does a case insensitive sort on
  directories, but still returns case sensitive results. This allows
  better multi-disc album detection.

- The `art_for_album()` function now takes a list of paths as its second
  argument, instead of a single path.
This commit is contained in:
Tai Lee 2013-02-03 21:20:29 +11:00
parent 4f7e738f82
commit 23cd5453d9
7 changed files with 113 additions and 70 deletions

View file

@ -32,20 +32,19 @@ from .match import \
log = logging.getLogger('beets')
# Constants for directory walker.
MULTIDISC_MARKERS = (r'part', r'volume', r'vol\.', r'disc', r'cd')
MULTIDISC_PAT_FMT = r'%s\s*\d'
MULTIDISC_MARKERS = (r'disc', r'cd')
MULTIDISC_PAT_FMT = r'^(.*%s[\W_]*)\d'
# Additional utilities for the main interface.
def albums_in_dir(path):
"""Recursively searches the given directory and returns an iterable
of (path, items) where path is a containing directory and items is
of (paths, items) where paths is a list of directories and items is
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
collapse_root = None
collapse_items = None
collapse_pat = collapse_paths = collapse_items = multidisc = None
for root, dirs, files in sorted_walk(path,
ignore=config['ignore'].as_str_seq()):
@ -64,45 +63,68 @@ def albums_in_dir(path):
items.append(i)
# If we're collapsing, test to see whether we should continue to
# collapse. If so, just add to the collapsed item set;
# collapse. If so, just add to the collapsed paths and items;
# otherwise, end the collapse and continue as normal.
if collapse_root is not None:
if collapse_root in ancestry(root):
if collapse_paths:
if collapse_paths[0] in ancestry(root) or \
collapse_pat.match(os.path.basename(root)):
# Still collapsing.
collapse_paths.append(root)
collapse_items += items
continue
else:
# Collapse finished. Yield the collapsed directory and
# proceed to process the current one.
if collapse_items:
yield collapse_root, collapse_items
collapse_root = collapse_items = None
yield collapse_paths, collapse_items
collapse_pat = collapse_paths = collapse_items = \
multidisc = None
# Does the current directory look like a multi-disc album? If
# so, begin collapsing here.
if dirs and not items: # Must be only directories.
multidisc = False
for marker in MULTIDISC_MARKERS:
pat = MULTIDISC_PAT_FMT % marker
if all(re.search(pat, dirname, re.I) for dirname in dirs):
multidisc = True
# Does the current directory look like the start of a multi-disc
# album? If so, begin collapsing here.
for marker in MULTIDISC_MARKERS:
marker_pat = re.compile(MULTIDISC_PAT_FMT % marker, re.I)
# Is this directory the first in a flattened multi-disc album?
match = marker_pat.match(os.path.basename(root))
if match:
multidisc = True
collapse_pat = re.compile(r'^%s\d' %
re.escape(match.groups()[0]), re.I)
break
# Is this directory the root of a nested multi-disc album?
elif dirs and not items:
multidisc = True
for dirname in dirs:
if collapse_pat:
if collapse_pat.match(dirname):
continue
else:
match = marker_pat.match(dirname)
if match:
collapse_pat = re.compile(r'^%s\d' %
re.escape(match.groups()[0]), re.I)
continue
multidisc = False
break
if multidisc:
break
# This becomes True only when all directories match a
# pattern for a single marker.
if multidisc:
# Start collapsing; continue to the next iteration.
collapse_root = root
collapse_items = []
continue
# This becomes True only when all sub-directories match a
# pattern for a single marker with a common prefix, or when
# this directory matches a multidisc marker pattern.
if multidisc:
# Start collapsing; continue to the next iteration.
collapse_paths = [root]
collapse_items = items
continue
# If it's nonempty, yield it.
if items:
yield root, items
yield [root], items
# Clear out any unfinished collapse.
if collapse_root is not None and collapse_items:
yield collapse_root, collapse_items
if collapse_paths and collapse_items:
yield collapse_paths, collapse_items
def apply_item_metadata(item, track_info):
"""Set an item's metadata from its matched TrackInfo object.

View file

@ -200,7 +200,7 @@ def history_add(path):
if HISTORY_KEY not in state:
state[HISTORY_KEY] = set()
state[HISTORY_KEY].add(path)
state[HISTORY_KEY].add(tuple(path))
_save_state(state)
def history_get():
@ -271,21 +271,21 @@ class ImportSession(object):
``duplicate``, then this is a secondary choice after a duplicate was
detected and a decision was made.
"""
path = task.path if task.is_album else task.item.path
paths = task.path if task.is_album else [task.item.path]
if duplicate:
# Duplicate: log all three choices (skip, keep both, and trump).
if task.remove_duplicates:
self.tag_log('duplicate-replace', path)
self.tag_log('duplicate-replace', displayable_path(paths))
elif task.choice_flag in (action.ASIS, action.APPLY):
self.tag_log('duplicate-keep', path)
self.tag_log('duplicate-keep', displayable_path(paths))
elif task.choice_flag is (action.SKIP):
self.tag_log('duplicate-skip', path)
self.tag_log('duplicate-skip', displayable_path(paths))
else:
# Non-duplicate: log "skip" and "asis" choices.
if task.choice_flag is action.ASIS:
self.tag_log('asis', path)
self.tag_log('asis', displayable_path(paths))
elif task.choice_flag is action.SKIP:
self.tag_log('skip', path)
self.tag_log('skip', displayable_path(paths))
def should_resume(self, path):
raise NotImplementedError
@ -575,7 +575,7 @@ def read_tasks(session):
continue
# When incremental, skip paths in the history.
if config['import']['incremental'] and path in history_dirs:
if config['import']['incremental'] and tuple(path) in history_dirs:
log.debug(u'Skipping previously-imported path: %s' %
displayable_path(path))
incremental_skipped += 1
@ -613,7 +613,7 @@ def query_tasks(session):
log.debug('yielding album %i: %s - %s' %
(album.id, album.albumartist, album.album))
items = list(album.items())
yield ImportTask(None, album.item_dir(), items)
yield ImportTask(None, [album.item_dir()], items)
def initial_lookup(session):
"""A coroutine for performing the initial MusicBrainz lookup for an
@ -629,7 +629,7 @@ def initial_lookup(session):
plugins.send('import_task_start', session=session, task=task)
log.debug('Looking up: %s' % task.path)
log.debug('Looking up: %s' % displayable_path(task.path))
try:
task.set_candidates(*autotag.tag_album(task.items,
config['import']['timid']))
@ -695,7 +695,7 @@ def show_progress(session):
if task.sentinel:
continue
log.info(task.path)
log.info(displayable_path(task.path))
# Behave as if ASIS were selected.
task.set_null_candidates()

View file

@ -511,7 +511,7 @@ class TerminalImportSession(importer.ImportSession):
"""
# Show what we're tagging.
print_()
print_(task.path)
print_(displayable_path(task.path, u'\n'))
# Take immediate action if appropriate.
action = _summary_judment(task.rec)

View file

@ -169,8 +169,9 @@ def sorted_walk(path, ignore=()):
else:
files.append(base)
# Sort lists and yield the current level.
dirs.sort()
# Sort lists and yield the current level. Do case insensitve sort on dirs,
# to improve multi-disc album detection.
dirs = [d2 for d1, d2 in sorted([(d.lower(), d) for d in dirs])]
files.sort()
yield (path, dirs, files)
@ -295,10 +296,12 @@ def bytestring_path(path, pathmod=None):
except (UnicodeError, LookupError):
return path.encode('utf8')
def displayable_path(path):
def displayable_path(path, separator=u'; '):
"""Attempts to decode a bytestring path to a unicode object for the
purpose of displaying it to the user.
"""
if isinstance(path, list):
return separator.join(displayable_path(p) for p in path)
if isinstance(path, unicode):
return path
elif not isinstance(path, str):

View file

@ -162,7 +162,7 @@ def _source_urls(album):
if url:
yield url
def art_for_album(album, path, maxwidth=None, local_only=False):
def art_for_album(album, paths, maxwidth=None, local_only=False):
"""Given an Album object, returns a path to downloaded art for the
album (or None if no art is found). If `maxwidth`, then images are
resized to this maximum pixel size. If `local_only`, then only local
@ -172,8 +172,11 @@ def art_for_album(album, path, maxwidth=None, local_only=False):
out = None
# Local art.
if isinstance(path, basestring):
out = art_in_path(path)
if paths:
for path in paths:
out = art_in_path(path)
if out:
break
# Web art sources.
if not local_only and not out:

View file

@ -105,26 +105,26 @@ class CombinedTest(unittest.TestCase):
_common.touch(os.path.join(self.dpath, 'a.jpg'))
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
album = _common.Bag(asin='xxxx')
artpath = fetchart.art_for_album(album, self.dpath)
artpath = fetchart.art_for_album(album, [self.dpath])
self.assertEqual(artpath, os.path.join(self.dpath, 'a.jpg'))
def test_main_interface_falls_back_to_amazon(self):
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
album = _common.Bag(asin='xxxx')
artpath = fetchart.art_for_album(album, self.dpath)
artpath = fetchart.art_for_album(album, [self.dpath])
self.assertNotEqual(artpath, None)
self.assertFalse(artpath.startswith(self.dpath))
def test_main_interface_tries_amazon_before_aao(self):
fetchart.urllib.urlretrieve = MockUrlRetrieve('image/jpeg')
album = _common.Bag(asin='xxxx')
fetchart.art_for_album(album, self.dpath)
fetchart.art_for_album(album, [self.dpath])
self.assertFalse(self.urlopen_called)
def test_main_interface_falls_back_to_aao(self):
fetchart.urllib.urlretrieve = MockUrlRetrieve('text/html')
album = _common.Bag(asin='xxxx')
fetchart.art_for_album(album, self.dpath)
fetchart.art_for_album(album, [self.dpath])
self.assertTrue(self.urlopen_called)
def test_main_interface_uses_caa_when_mbid_available(self):
@ -139,7 +139,7 @@ class CombinedTest(unittest.TestCase):
mock_retrieve = MockUrlRetrieve('image/jpeg')
fetchart.urllib.urlretrieve = mock_retrieve
album = _common.Bag(mb_albumid='releaseid', asin='xxxx')
artpath = fetchart.art_for_album(album, self.dpath, local_only=True)
artpath = fetchart.art_for_album(album, [self.dpath], local_only=True)
self.assertEqual(artpath, None)
self.assertFalse(self.urlopen_called)
self.assertFalse(mock_retrieve.fetched)
@ -149,7 +149,7 @@ class CombinedTest(unittest.TestCase):
mock_retrieve = MockUrlRetrieve('image/jpeg')
fetchart.urllib.urlretrieve = mock_retrieve
album = _common.Bag(mb_albumid='releaseid', asin='xxxx')
artpath = fetchart.art_for_album(album, self.dpath, local_only=True)
artpath = fetchart.art_for_album(album, [self.dpath], local_only=True)
self.assertEqual(artpath, os.path.join(self.dpath, 'a.jpg'))
self.assertFalse(self.urlopen_called)
self.assertFalse(mock_retrieve.fetched)

View file

@ -322,19 +322,29 @@ class MultiDiscAlbumsInDirTest(unittest.TestCase):
os.mkdir(self.base)
self.dirs = [
# Nested album with non-consecutive disc numbers and a subtitle.
os.path.join(self.base, 'album1'),
os.path.join(self.base, 'album1', 'disc 1'),
os.path.join(self.base, 'album1', 'disc 2'),
os.path.join(self.base, 'dir2'),
os.path.join(self.base, 'dir2', 'disc 1'),
os.path.join(self.base, 'dir2', 'something'),
os.path.join(self.base, 'album1', 'cd 1'),
os.path.join(self.base, 'album1', 'cd 3 - bonus'),
# Nested album with a single disc and punctuation.
os.path.join(self.base, 'album2'),
os.path.join(self.base, 'album2', 'cd _ 1'),
# Flattened album with case typo.
os.path.join(self.base, 'artist'),
os.path.join(self.base, 'artist', 'CAT disc 1'),
os.path.join(self.base, 'artist', 'Cat disc 2'),
# Prevent "CAT" being collapsed as a nested multi-disc album,
# and test case insensitive sorting.
os.path.join(self.base, 'artist', 'CATS'),
]
self.files = [
os.path.join(self.base, 'album1', 'disc 1', 'song1.mp3'),
os.path.join(self.base, 'album1', 'disc 2', 'song2.mp3'),
os.path.join(self.base, 'album1', 'disc 2', 'song3.mp3'),
os.path.join(self.base, 'dir2', 'disc 1', 'song4.mp3'),
os.path.join(self.base, 'dir2', 'something', 'song5.mp3'),
os.path.join(self.base, 'album1', 'cd 1', 'song1.mp3'),
os.path.join(self.base, 'album1', 'cd 3 - bonus', 'song2.mp3'),
os.path.join(self.base, 'album1', 'cd 3 - bonus', 'song3.mp3'),
os.path.join(self.base, 'album2', 'cd _ 1', 'song4.mp3'),
os.path.join(self.base, 'artist', 'CAT disc 1', 'song5.mp3'),
os.path.join(self.base, 'artist', 'Cat disc 2', 'song6.mp3'),
os.path.join(self.base, 'artist', 'CATS', 'song7.mp3'),
]
for path in self.dirs:
@ -347,23 +357,28 @@ class MultiDiscAlbumsInDirTest(unittest.TestCase):
def test_coalesce_multi_disc_album(self):
albums = list(autotag.albums_in_dir(self.base))
self.assertEquals(len(albums), 3)
self.assertEquals(len(albums), 4)
# Nested album with non-consecutive disc numbers.
root, items = albums[0]
self.assertEquals(root, os.path.join(self.base, 'album1'))
self.assertEquals(root, self.dirs[0:3])
self.assertEquals(len(items), 3)
def test_separate_red_herring(self):
albums = list(autotag.albums_in_dir(self.base))
# Nested album with a single disc and punctuation.
root, items = albums[1]
self.assertEquals(root, os.path.join(self.base, 'dir2', 'disc 1'))
self.assertEquals(root, self.dirs[3:5])
self.assertEquals(len(items), 1)
# Flattened album with case typo.
root, items = albums[2]
self.assertEquals(root, os.path.join(self.base, 'dir2', 'something'))
self.assertEquals(root, self.dirs[6:8])
self.assertEquals(len(items), 2)
# Non-multi-disc album (sorted in between "cat" album).
root, items = albums[3]
self.assertEquals(root, self.dirs[8:])
self.assertEquals(len(items), 1)
def test_do_not_yield_empty_album(self):
# Remove all the MP3s.
for path in self.files:
os.remove(path)
albums = list(autotag.albums_in_dir(self.base))
self.assertEquals(len(albums), 0)