From e478ae740d70db123e2890e6072fa96d3b8ba594 Mon Sep 17 00:00:00 2001 From: Adrian Sampson Date: Sat, 9 Apr 2011 19:53:52 -0700 Subject: [PATCH] refactor importer to use a ImportTask class instead of a tuple --- beets/ui/commands.py | 220 ++++++++++++++++++++++++------------------- test/test_ui.py | 21 ++--- 2 files changed, 132 insertions(+), 109 deletions(-) diff --git a/beets/ui/commands.py b/beets/ui/commands.py index d3c37f504..3b6f93c7c 100755 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -135,6 +135,7 @@ CHOICE_SKIP = 'CHOICE_SKIP' CHOICE_ASIS = 'CHOICE_ASIS' CHOICE_TRACKS = 'CHOICE_TRACKS' CHOICE_MANUAL = 'CHOICE_MANUAL' +CHOICE_ALBUM = 'CHOICE_ALBUM' def choose_candidate(cur_artist, cur_album, candidates, rec, color=True): """Given current metadata and a sorted list of (distance, candidate) pairs, ask the user for a selection @@ -304,27 +305,17 @@ def _reopen_lib(lib): else: return lib -def _duplicate_check(lib, choice, info, cur_artist, cur_album): +def _duplicate_check(lib, artist, album): """Check whether the match already exists in the library.""" - if choice is CHOICE_TRACKS: - #TODO currently no track-level duplicate detection. + if artist is None: + # As-is import with no artist. Skip check. return False - if choice is CHOICE_ASIS and cur_artist is None: - return False - - if choice is CHOICE_ASIS: - artist = cur_artist - album = cur_album - else: - artist = info['artist'] - album = info['album'] for album_cand in lib.albums(artist): if album_cand.album == album: return True return False - # Utilities for reading and writing the beets progress file, which # allows long tagging tasks to be resumed when they pause (or crash). PROGRESS_KEY = 'tagprogress' @@ -359,15 +350,65 @@ def progress_get(toppath): return None return state[PROGRESS_KEY].get(toppath) +class ImportTask(object): + """Represents a single directory to be imported along with its + intermediate state. + """ + __slots__ = ['toppath', 'path', 'items', 'sentinel', + 'cur_artist', 'cur_album', 'candidates', 'rec', + 'choice_flag', 'info'] + def __init__(self, toppath, path=None, items=None): + self.toppath = toppath + self.path = path + self.items = items + self.sentinel = False + + @classmethod + def done_sentinel(cls, toppath): + """Create an ImportTask that indicates the end of a top-level + directory import. + """ + obj = cls(toppath) + obj.sentinel = True + return obj + + def set_match(self, cur_artist, cur_album, candidates, rec): + """Sets the candidates matched by the autotag.tag_album method. + """ + assert not self.sentinel + self.cur_artist = cur_artist + self.cur_album = cur_album + self.candidates = candidates + self.rec = rec + + def set_null_match(self): + """Set the candidate to indicate no match was found.""" + self.set_match(None, None, None, None) + + def set_choice(self, choice): + """Given either an (info, items) tuple or a CHOICE_ constant, + indicates that an action has been selected by the user (or + automatically). + """ + assert not self.sentinel + assert choice != CHOICE_MANUAL # Not part of the task structure. + assert choice != CHOICE_ALBUM # Only used internally. + if choice in (CHOICE_SKIP, CHOICE_ASIS, CHOICE_TRACKS): + self.choice_flag = choice + self.info = None + if choice == CHOICE_SKIP: + self.items = None # Items no longer needed. + else: + info, items = choice + self.items = items # Reordered items list. + self.info = info + self.choice_flag = CHOICE_ALBUM # Implicit choice. + # Core autotagger pipeline stages. -# This sentinel is passed along as the "path" when a directory has finished -# tagging. -DONE_SENTINEL = '__IMPORT_DONE_SENTINEL__' - def read_albums(paths, resume): - """A generator yielding all the albums (as sets of Items) found in - the user-specified list of paths. `progress` specifies whether + """A generator yielding all the albums (as ImportTask objects) found + in the user-specified list of paths. `progress` specifies whether the resuming feature should be used. It may be True (resume if possible), False (never resume), or None (ask). """ @@ -416,10 +457,10 @@ def read_albums(paths, resume): resume_dir = None continue - yield toppath, path, items + yield ImportTask(toppath, path, items) # Indicate the directory is finished. - yield toppath, DONE_SENTINEL, None + yield ImportTask.done_sentinel(toppath) def initial_lookup(): """A coroutine for performing the initial MusicBrainz lookup for an @@ -427,20 +468,18 @@ def initial_lookup(): (items, cur_artist, cur_album, candidates, rec) tuples. If no match is found, all of the yielded parameters (except items) are None. """ - toppath, path, items = yield - log.debug('Looking up: %s' % path) + task = yield + log.debug('Looking up: %s' % task.path) while True: - if path is DONE_SENTINEL: - cur_artist, cur_album, candidates, rec = None, None, None, None - else: - try: - cur_artist, cur_album, candidates, rec = \ - autotag.tag_album(items) - except autotag.AutotagError: - cur_artist, cur_album, candidates, rec = \ - None, None, None, None - toppath, path, items = yield toppath, path, items, cur_artist, \ - cur_album, candidates, rec + if task.sentinel: + task = yield task + continue + + try: + task.set_match(*autotag.tag_album(task.items)) + except autotag.AutotagError: + task.set_null_match() + task = yield task def user_query(lib, logfile, color, quiet, quiet_fallback): """A coroutine for interfacing with the user about the tagging @@ -457,12 +496,10 @@ def user_query(lib, logfile, color, quiet, quiet_fallback): """ lib = _reopen_lib(lib) first = True - out = None + task = None while True: - toppath, path, items, cur_artist, cur_album, candidates, rec = yield out - - if path is DONE_SENTINEL: - out = toppath, path, None, None + task = yield task + if task.sentinel: continue # Empty lines between albums. @@ -470,38 +507,32 @@ def user_query(lib, logfile, color, quiet, quiet_fallback): print_() first = False # Show current album path. - print_(path) + print_(task.path) # Ask the user for a choice. - choice = choose_match(path, items, cur_artist, cur_album, candidates, - rec, color, quiet, quiet_fallback) + choice = choose_match(task.path, task.items, task.cur_artist, + task.cur_album, task.candidates, task.rec, + color, quiet, quiet_fallback) + task.set_choice(choice) - # The "give-up" options. + # Log certain choices. if choice is CHOICE_ASIS: - tag_log(logfile, 'asis', path) - info = CHOICE_ASIS - elif choice is CHOICE_TRACKS: - info = CHOICE_TRACKS + tag_log(logfile, 'asis', task.path) elif choice is CHOICE_SKIP: - tag_log(logfile, 'skip', path) - # Yield None, indicating that the pipeline should not - # progress. - out = toppath, path, items, None - continue - else: - # We have a real candidate. Get the info dictionary and - # replace the items list with an ordered list. - info, items = choice + tag_log(logfile, 'skip', task.path) - # Ensure that we don't have the album already. - if _duplicate_check(lib, choice, info, cur_artist, cur_album): - tag_log(logfile, 'duplicate', path) - print_("This album is already in the library!") - out = toppath, path, items, None - continue - - # Yield the result and get the next chunk of work. - out = toppath, path, items, info + # Check for duplicates if we have a match. + if choice == CHOICE_ASIS or isinstance(choice, tuple): + if choice == CHOICE_ASIS: + artist = task.cur_artist + album = task.cur_album + else: + artist = task.info['artist'] + album = task.info['album'] + if _duplicate_check(lib, artist, album): + tag_log(logfile, 'duplicate', task.path) + print_("This album is already in the library!") + task.set_choice(CHOICE_SKIP) def apply_choices(lib, copy, write, art, delete, progress): """A coroutine for applying changes to albums during the autotag @@ -512,48 +543,45 @@ def apply_choices(lib, copy, write, art, delete, progress): """ lib = _reopen_lib(lib) while True: - # Get next chunk of work. - toppath, path, items, info = yield + task = yield # Check for "path finished" message. - if path is DONE_SENTINEL: + if task.sentinel: if progress: # Mark path as complete. - progress_set(toppath, None) + progress_set(task.toppath, None) continue - # Only process the items if info is not None (indicating a - # skip). - if info is not None: + # Only process the items if we're not skipping. + if task.choice_flag != CHOICE_SKIP: # Change metadata, move, and copy. - if info not in (CHOICE_ASIS, CHOICE_TRACKS): - autotag.apply_metadata(items, info) + if task.choice_flag == CHOICE_ALBUM: + autotag.apply_metadata(task.items, task.info) if copy and delete: - old_paths = [os.path.realpath(item.path) for item in items] - for item in items: + old_paths = [os.path.realpath(item.path) + for item in task.items] + for item in task.items: if copy: item.move(lib, True) - if write and info not in (CHOICE_ASIS, CHOICE_TRACKS): + if write and task.choice_flag == CHOICE_ALBUM: item.write() # Add items to library. We consolidate this at the end to avoid # locking while we do the copying and tag updates. - if info == CHOICE_TRACKS: + if task.choice_flag == CHOICE_TRACKS: # Add tracks. - is_album = False - for item in items: + for item in task.items: lib.add(item) else: # Add an album. - is_album = True - albuminfo = lib.add_album(items, infer_aa = - (info is CHOICE_ASIS)) + albuminfo = lib.add_album(task.items, + infer_aa = (task.choice_flag == CHOICE_ASIS)) # Get album art if requested. - if is_album and art and info is not CHOICE_ASIS: - artpath = beets.autotag.art.art_for_album(info) + if art and task.choice_flag == CHOICE_ALBUM: + artpath = beets.autotag.art.art_for_album(task.info) if artpath: albuminfo.set_art(artpath) @@ -561,15 +589,15 @@ def apply_choices(lib, copy, write, art, delete, progress): lib.save() # Announce that we've added an album. - if is_album: + if task.choice_flag in (CHOICE_ALBUM, CHOICE_ASIS): plugins.send('album_imported', album=albuminfo) else: - for item in items: + for item in task.items: plugins.send('item_imported', lib=lib, item=item) # Finally, delete old files. if copy and delete: - new_paths = [os.path.realpath(item.path) for item in items] + new_paths = [os.path.realpath(item.path) for item in task.items] for old_path in old_paths: # Only delete files that were actually moved. if old_path not in new_paths: @@ -577,7 +605,7 @@ def apply_choices(lib, copy, write, art, delete, progress): # Update progress. if progress: - progress_set(toppath, path) + progress_set(task.toppath, task.path) # Non-autotagged import (always sequential). @@ -585,27 +613,27 @@ def simple_import(lib, paths, copy, delete, resume): """Add files from the paths to the library without changing any tags. """ - for toppath, path, items in read_albums(paths, resume): - if items is None: + for task in read_albums(paths, resume): + if task.sentinel: continue if copy: if delete: - old_paths = [os.path.realpath(item.path) for item in items] - for item in items: + old_paths = [os.path.realpath(item.path) for item in task.items] + for item in task.items: item.move(lib, True) - album = lib.add_album(items, True) + album = lib.add_album(task.items, True) lib.save() # Announce that we added an album. plugins.send('album_imported', album=album) if resume is not False: - progress_set(toppath, path) + progress_set(task.toppath, task.path) if copy and delete: - new_paths = [os.path.realpath(item.path) for item in items] + new_paths = [os.path.realpath(item.path) for item in task.items] for old_path in old_paths: # Only delete files that were actually moved. if old_path not in new_paths: diff --git a/test/test_ui.py b/test/test_ui.py index 2f10fd8fe..bdf5d58a5 100644 --- a/test/test_ui.py +++ b/test/test_ui.py @@ -156,8 +156,9 @@ class ImportApplyTest(unittest.TestCase): shutil.rmtree(self.libdir) def call_apply(self, coro, items, info): - coro.send((None, None, # Only used for progress. - items, info)) + task = commands.ImportTask(None, None, None) + task.set_choice((info, items)) + coro.send(task) def test_apply_no_delete(self): coro = commands.apply_choices(self.lib, True, False, False, @@ -180,24 +181,18 @@ class DuplicateCheckTest(unittest.TestCase): self.album = self.lib.add_album([self.i], True) def test_duplicate_album(self): - info = {'artist': self.i.albumartist, 'album': self.i.album} - res = commands._duplicate_check(self.lib, None, info, None, None) + res = commands._duplicate_check(self.lib, self.i.albumartist, + self.i.album) self.assertTrue(res) def test_different_album(self): - info = {'artist': 'xxx', 'album': 'yyy'} - res = commands._duplicate_check(self.lib, None, info, None, None) + res = commands._duplicate_check(self.lib, 'xxx', 'yyy') self.assertFalse(res) - def test_duplicate_asis(self): - res = commands._duplicate_check(self.lib, commands.CHOICE_ASIS, - None, self.i.albumartist, self.i.album) - self.assertTrue(res) - def test_duplicate_va_album(self): self.album.albumartist = 'an album artist' - info = {'artist': 'an album artist', 'album': self.i.album} - res = commands._duplicate_check(self.lib, None, info, None, None) + res = commands._duplicate_check(self.lib, 'an album artist', + self.i.album) self.assertTrue(res) class ListTest(unittest.TestCase):