diff --git a/beets/importer.py b/beets/importer.py index f8134870e..344e382dd 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -58,107 +58,6 @@ class ImportAbort(Exception): # Utilities. -def _duplicate_check(lib, task): - """Check whether an album already exists in the library. Returns a - list of Album objects (empty if no duplicates are found). - """ - assert task.choice_flag in (action.ASIS, action.APPLY) - artist, album = task.chosen_ident() - - if artist is None: - # As-is import with no artist. Skip check. - return [] - - found_albums = [] - cur_paths = set(i.path for i in task.items if i) - for album_cand in lib.albums(dbcore.MatchQuery('albumartist', artist)): - if album_cand.album == album: - # Check whether the album is identical in contents, in which - # case it is not a duplicate (will be replaced). - other_paths = set(i.path for i in album_cand.items()) - if other_paths == cur_paths: - continue - found_albums.append(album_cand) - return found_albums - - -def _item_duplicate_check(lib, task): - """Check whether an item already exists in the library. Returns a - list of Item objects. - """ - assert task.choice_flag in (action.ASIS, action.APPLY) - artist, title = task.chosen_ident() - - found_items = [] - query = dbcore.AndQuery(( - dbcore.MatchQuery('artist', artist), - dbcore.MatchQuery('title', title), - )) - for other_item in lib.items(query): - # Existing items not considered duplicates. - if other_item.path == task.item.path: - continue - found_items.append(other_item) - return found_items - - -def _infer_album_fields(task): - """Given an album and an associated import task, massage the - album-level metadata. This ensures that the album artist is set - and that the "compilation" flag is set automatically. - """ - assert task.is_album - assert task.items - - changes = {} - - if task.choice_flag == action.ASIS: - # Taking metadata "as-is". Guess whether this album is VA. - plur_albumartist, freq = util.plurality( - [i.albumartist or i.artist for i in task.items] - ) - if freq == len(task.items) or \ - (freq > 1 and - float(freq) / len(task.items) >= SINGLE_ARTIST_THRESH): - # Single-artist album. - changes['albumartist'] = plur_albumartist - changes['comp'] = False - else: - # VA. - changes['albumartist'] = VARIOUS_ARTISTS - changes['comp'] = True - - elif task.choice_flag == action.APPLY: - # Applying autotagged metadata. Just get AA from the first - # item. - for item in task.items: - if item is not None: - first_item = item - break - else: - assert False, "all items are None" - if not first_item.albumartist: - changes['albumartist'] = first_item.artist - if not first_item.mb_albumartistid: - changes['mb_albumartistid'] = first_item.mb_artistid - - else: - assert False - - # Apply new metadata. - for item in task.items: - if item is not None: - for k, v in changes.iteritems(): - setattr(item, k, v) - - -def _resume(): - """Check whether an import should resume and return a boolean or the - string 'ask' indicating that the user should be queried. - """ - return config['import']['resume'].as_choice([True, False, 'ask']) - - def _open_state(): """Reads the state file, returning a dictionary.""" try: @@ -251,17 +150,20 @@ class ImportSession(object): self.logfile = logfile self.paths = paths self.query = query + self.seen_idents = set() # Normalize the paths. if self.paths: self.paths = map(normpath, self.paths) - def _amend_config(self): - """Make implied changes the importer configuration. + def set_config(self, config): + """Set `config` property from global import config and make + implied changes. """ # FIXME: Maybe this function should not exist and should instead # provide "decision wrappers" like "should_resume()", etc. - iconfig = config['import'] + iconfig = dict(config) + self.config = iconfig # Incremental and progress are mutually exclusive. if iconfig['incremental']: @@ -281,6 +183,8 @@ class ImportSession(object): if not iconfig['copy']: iconfig['delete'] = False + self.want_resume = config['resume'].as_choice([True, False, 'ask']) + def tag_log(self, status, paths): """Log a message about a given album to logfile. The status should reflect the reason the album couldn't be tagged. @@ -295,10 +199,10 @@ class ImportSession(object): ``duplicate``, then this is a secondary choice after a duplicate was detected and a decision was made. """ - paths = task.paths if task.is_album else [task.item.path] + paths = task.paths if duplicate: # Duplicate: log all three choices (skip, keep both, and trump). - if task.remove_duplicates: + if task.should_remove_duplicates: self.tag_log('duplicate-replace', paths) elif task.choice_flag in (action.ASIS, action.APPLY): self.tag_log('duplicate-keep', paths) @@ -326,35 +230,30 @@ class ImportSession(object): def run(self): """Run the import task. """ - self._amend_config() + self.set_config(config['import']) # Set up the pipeline. if self.query is None: stages = [read_tasks(self)] else: stages = [query_tasks(self)] - if config['import']['singletons']: - # Singleton importer. - if config['import']['autotag']: - stages += [item_lookup(self), item_query(self)] - else: - stages += [item_progress(self)] + + if self.config['group_albums'] and \ + not self.config['singletons']: + # Split directory tasks into one task for each album + stages += [group_albums(self)] + if self.config['autotag']: + # Only look up and query the user when autotagging. + + # FIXME We should also resolve duplicates when not + # autotagging. + stages += [lookup_candidates(self), user_query(self)] else: - # Whole-album importer. - if config['import']['group_albums']: - # Split directory tasks into one task for each album - stages += [group_albums(self)] - if config['import']['autotag']: - # Only look up and query the user when autotagging. - stages += [initial_lookup(self), user_query(self)] - else: - # When not autotagging, just display progress. - stages += [show_progress(self)] + stages += [import_asis(self)] stages += [apply_choices(self)] for stage_func in plugins.import_stages(): stages.append(plugin_stage(self, stage_func)) stages += [manipulate_files(self)] - stages += [finalize(self)] pl = pipeline.Pipeline(stages) # Run the pipeline. @@ -378,48 +277,10 @@ class ImportTask(object): self.toppath = toppath self.paths = paths self.items = items - self.sentinel = False - self.remove_duplicates = False - self.is_album = True self.choice_flag = None - - @classmethod - def done_sentinel(cls, toppath): - """Create an ImportTask that indicates the end of a top-level - directory import. - """ - obj = cls(toppath) - obj.sentinel = True - return obj - - @classmethod - def progress_sentinel(cls, toppath, paths): - """Create a task indicating that a single directory in a larger - import has finished. This is only required for singleton - imports; progress is implied for album imports. - """ - obj = cls(toppath, paths) - obj.sentinel = True - return obj - - @classmethod - def item_task(cls, item): - """Creates an ImportTask for a single item.""" - obj = cls() - obj.item = item - obj.is_album = False - return obj - - def set_candidates(self, cur_artist, cur_album, candidates, rec): - """Sets the candidates for this album matched by the - `autotag.tag_album` method. - """ - assert self.is_album - assert not self.sentinel - self.cur_artist = cur_artist - self.cur_album = cur_album - self.candidates = candidates - self.rec = rec + # TODO remove this eventually + self.should_remove_duplicates = False + self.is_album = True def set_null_candidates(self): """Set the candidates to indicate no album match was found. @@ -429,18 +290,10 @@ class ImportTask(object): self.candidates = None self.rec = None - def set_item_candidates(self, candidates, rec): - """Set the match for a single-item task.""" - assert not self.is_album - assert self.item is not None - self.candidates = candidates - self.rec = rec - def set_choice(self, choice): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. """ - assert not self.sentinel # Not part of the task structure: assert choice not in (action.MANUAL, action.MANUAL_ID) assert choice != action.APPLY # Only used internally. @@ -448,10 +301,6 @@ class ImportTask(object): self.choice_flag = choice self.match = None else: - if self.is_album: - assert isinstance(choice, autotag.AlbumMatch) - else: - assert isinstance(choice, autotag.TrackMatch) self.choice_flag = action.APPLY # Implicit choice. self.match = choice @@ -459,36 +308,23 @@ class ImportTask(object): """Updates the progress state to indicate that this album has finished. """ - if self.sentinel and self.paths is None: - # "Done" sentinel. - progress_set(self.toppath, None) - elif self.sentinel or self.is_album: - # "Directory progress" sentinel for singletons or a real - # album task, which implies the same. - progress_set(self.toppath, self.paths) + progress_set(self.toppath, self.paths) def save_history(self): """Save the directory in the history for incremental imports. """ - if self.is_album and self.paths and not self.sentinel: + if self.paths: history_add(self.paths) # Logical decisions. - def should_write_tags(self): - """Should new info be written to the files' metadata?""" - if self.choice_flag == action.APPLY: - return True - elif self.choice_flag in (action.ASIS, action.TRACKS, action.SKIP): - return False - else: - assert False + @property + def apply(self): + return self.choice_flag == action.APPLY - def should_skip(self): - """After a choice has been made, returns True if this is a - sentinel or it has been marked for skipping. - """ - return self.sentinel or self.choice_flag == action.SKIP + @property + def skip(self): + return self.choice_flag == action.SKIP # Convenient data. @@ -499,17 +335,10 @@ class ImportTask(object): (in which case the data comes from the files' current metadata) or APPLY (data comes from the choice). """ - assert self.choice_flag in (action.ASIS, action.APPLY) - if self.is_album: - if self.choice_flag is action.ASIS: - return (self.cur_artist, self.cur_album) - elif self.choice_flag is action.APPLY: - return (self.match.info.artist, self.match.info.album) - else: - if self.choice_flag is action.ASIS: - return (self.item.artist, self.item.title) - elif self.choice_flag is action.APPLY: - return (self.match.info.artist, self.match.info.title) + if self.choice_flag is action.ASIS: + return (self.cur_artist, self.cur_album) + elif self.choice_flag is action.APPLY: + return (self.match.info.artist, self.match.info.album) def imported_items(self): """Return a list of Items that should be added to the library. @@ -517,20 +346,230 @@ class ImportTask(object): selected match or everything if the choice is ASIS. If this is a singleton task, return a list containing the item. """ - if self.is_album: - if self.choice_flag == action.ASIS: - return list(self.items) - elif self.choice_flag == action.APPLY: - return self.match.mapping.keys() - else: - assert False + if self.choice_flag == action.ASIS: + return list(self.items) + elif self.choice_flag == action.APPLY: + return self.match.mapping.keys() else: - return [self.item] + assert False - def cleanup(self): - """Perform clean up during `finalize` stage. + def apply_metadata(self): + """Copy metadata from match info to the items. """ - pass + autotag.apply_metadata(self.match.info, self.match.mapping) + + def duplicate_items(self, lib): + duplicate_items = [] + for album in self.find_duplicates(lib): + duplicate_items += album.items() + return duplicate_items + + def remove_duplicates(self, lib): + duplicate_items = self.duplicate_items(lib) + log.debug('removing %i old duplicated items' % + len(duplicate_items)) + for item in duplicate_items: + item.remove() + if lib.directory in util.ancestry(item.path): + log.debug(u'deleting duplicate %s' % + util.displayable_path(item.path)) + util.remove(item.path) + util.prune_dirs(os.path.dirname(item.path), + lib.directory) + + def finalize(self, session): + """Save progress, clean up files, and emit plugin event. + """ + # FIXME the session argument is unfortunate. It should be + # present as an attribute of the task. + + # Update progress. + if session.want_resume: + self.save_progress() + if session.config['incremental']: + self.save_history() + + self.cleanup(copy=session.config['copy'], + delete=session.config['delete'], + move=session.config['move']) + self._emit_imported(session.lib) + + def cleanup(self, copy=False, delete=False, move=False): + """Remove and prune imported paths. + """ + # FIXME Maybe the keywords should be task properties. + + # FIXME This shouldn't be here. Skipping should be handled in + # the stages. + if self.skip: + return + items = self.imported_items() + + # When copying and deleting originals, delete old files. + if copy and delete: + new_paths = [os.path.realpath(item.path) for item in items] + for old_path in self.old_paths: + # Only delete files that were actually copied. + if old_path not in new_paths: + util.remove(syspath(old_path), False) + self.prune(old_path) + + # When moving, prune empty directories containing the original files. + elif move: + for old_path in self.old_paths: + self.prune(old_path) + + def _emit_imported(self, lib): + # FIXME This shouldn't be here. Skipping should be handled in + # the stages. + if self.skip: + return + plugins.send('album_imported', lib=lib, album=self.album) + + def lookup_candidates(self): + """Retrieve and store candidates for this album. + """ + artist, album, candidates, recommendation = \ + autotag.tag_album(self.items) + self.cur_artist = artist + self.cur_album = album + self.candidates = candidates + self.rec = recommendation + + def find_duplicates(self, lib): + """Return a list of albums from `lib` with the same artist and + album name as the task. + """ + artist, album = self.chosen_ident() + + if artist is None: + # As-is import with no artist. Skip check. + return [] + + duplicates = [] + task_paths = set(i.path for i in self.items if i) + duplicate_query = dbcore.AndQuery(( + dbcore.MatchQuery('albumartist', artist), + dbcore.MatchQuery('album', album), + )) + + for album in lib.albums(duplicate_query): + # Check whether the album is identical in contents, in which + # case it is not a duplicate (will be replaced). + album_paths = set(i.path for i in album.items()) + if album_paths != task_paths: + duplicates.append(album) + return duplicates + + def infer_album_fields(self): + """Make the some album fields equal across `self.items` + """ + changes = {} + + if self.choice_flag == action.ASIS: + # Taking metadata "as-is". Guess whether this album is VA. + plur_albumartist, freq = util.plurality( + [i.albumartist or i.artist for i in self.items] + ) + if freq == len(self.items) or \ + (freq > 1 and + float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH): + # Single-artist album. + changes['albumartist'] = plur_albumartist + changes['comp'] = False + else: + # VA. + changes['albumartist'] = VARIOUS_ARTISTS + changes['comp'] = True + + elif self.choice_flag == action.APPLY: + # Applying autotagged metadata. Just get AA from the first + # item. + if not self.items[0].albumartist: + changes['albumartist'] = self.items[0].artist + if not self.items[0].mb_albumartistid: + changes['mb_albumartistid'] = self.items[0].mb_artistid + + # Apply new metadata. + for item in self.items: + item.update(changes) + + def manipulate_files(self, move=False, copy=False, write=False, + session=None): + items = self.imported_items() + # Save the original paths of all items for deletion and pruning + # in the next step (finalization). + self.old_paths = [item.path for item in items] + for item in items: + if session.config['move']: + # Just move the file. + item.move(False) + elif session.config['copy']: + # If it's a reimport, move in-library files and copy + # out-of-library files. Otherwise, copy and keep track + # of the old path. + old_path = item.path + if self.replaced_items[item]: + # This is a reimport. Move in-library files and copy + # out-of-library files. + if session.lib.directory in util.ancestry(old_path): + item.move(False) + # We moved the item, so remove the + # now-nonexistent file from old_paths. + self.old_paths.remove(old_path) + else: + item.move(True) + else: + # A normal import. Just copy files and keep track of + # old paths. + item.move(True) + + if session.config['write'] and self.apply: + item.try_write() + + with session.lib.transaction(): + for item in self.imported_items(): + item.store() + + plugins.send('import_task_files', session=session, task=self) + + def add(self, lib): + """Add the items as an album to the library and remove replaced items. + """ + with lib.transaction(): + self.remove_replaced(lib) + self.album = lib.add_album(self.imported_items()) + + def remove_replaced(self, lib): + """Removes all the items from the library that have the same + path as an item from this task. + + Records the replaced items in the `replaced_items` dictionary + """ + self.replaced_items = defaultdict(list) + for item in self.imported_items(): + dup_items = lib.items(dbcore.query.BytesQuery('path', item.path)) + self.replaced_items[item] = dup_items + for dup_item in dup_items: + log.debug('replacing item %i: %s' % + (dup_item.id, displayable_path(item.path))) + dup_item.remove() + log.debug('%i of %i items replaced' % (len(self.replaced_items), + len(self.imported_items()))) + + def choose_match(self, session): + """Ask the session which match should apply and apply it. + """ + choice = session.choose_match(self) + self.set_choice(choice) + session.log_choice(self) + + def reload(self): + """Reload albums and items from the database. + """ + for item in self.imported_items(): + item.load() + self.album.load() # Utilities. @@ -547,7 +586,134 @@ class ImportTask(object): clutter=config['clutter'].as_str_seq()) -class ArchiveImportTask(ImportTask): +class SingletonImportTask(ImportTask): + """ImportTask for a single track that is not associated to an album. + """ + + def __init__(self, item): + super(SingletonImportTask, self).__init__(paths=[item.path]) + self.item = item + self.is_album = False + self.paths = [item.path] + + def chosen_ident(self): + assert self.choice_flag in (action.ASIS, action.APPLY) + if self.choice_flag is action.ASIS: + return (self.item.artist, self.item.title) + elif self.choice_flag is action.APPLY: + return (self.match.info.artist, self.match.info.title) + + def imported_items(self): + return [self.item] + + def save_progress(self): + # TODO we should also save progress for singletons + pass + + def save_history(self): + # TODO we should also save history for singletons + pass + + def apply_metadata(self): + autotag.apply_item_metadata(self.item, self.match.info) + + def _emit_imported(self, lib): + # FIXME This shouldn't be here. Skipped tasks should be removed from + # the pipeline. + if self.skip: + return + for item in self.imported_items(): + plugins.send('item_imported', lib=lib, item=item) + + def lookup_candidates(self): + candidates, recommendation = autotag.tag_item(self.item) + self.candidates = candidates + self.rec = recommendation + + def find_duplicates(self, lib): + """Return a list of items from `lib` that have the same artist + and title as the task. + """ + artist, title = self.chosen_ident() + + found_items = [] + query = dbcore.AndQuery(( + dbcore.MatchQuery('artist', artist), + dbcore.MatchQuery('title', title), + )) + for other_item in lib.items(query): + # Existing items not considered duplicates. + if other_item.path != self.item.path: + found_items.append(other_item) + return found_items + + duplicate_items = find_duplicates + + def add(self, lib): + with lib.transaction(): + self.remove_replaced(lib) + lib.add(self.item) + + def infer_album_fields(self): + raise NotImplementedError + + def choose_match(self, session): + """Ask the session which match should apply and apply it. + """ + choice = session.choose_item(self) + self.set_choice(choice) + session.log_choice(self) + + def reload(self): + self.item.load() + + +# FIXME The inheritance relationships are inverted. This is why there +# are so many methods which pass. We should introduce a new +# BaseImportTask class. +class SentinelImportTask(ImportTask): + """This class marks the progress of an import and does not import + any items itself. + + If only `toppath` is set the task indicats the end of a top-level + directory import. If the `paths` argument is givent, too, the task + indicates the progress in the `toppath` import. + """ + + def __init__(self, toppath=None, paths=None): + self.toppath = toppath + self.paths = paths + # TODO Remove the remaining attributes eventually + self.items = None + self.should_remove_duplicates = False + self.is_album = True + self.choice_flag = None + + def save_history(self): + pass + + def save_progress(self): + if self.paths is None: + # "Done" sentinel. + progress_set(self.toppath, None) + else: + # "Directory progress" sentinel for singletons + progress_set(self.toppath, self.paths) + + def skip(self): + return True + + def set_choice(self, choice): + raise NotImplementedError + + def cleanup(self, **kwargs): + pass + + def _emit_imported(self, session): + pass + + +class ArchiveImportTask(SentinelImportTask): """Additional methods for handling archives. Use when `toppath` points to a `zip`, `tar`, or `rar` archive. @@ -555,7 +721,6 @@ class ArchiveImportTask(ImportTask): def __init__(self, toppath): super(ArchiveImportTask, self).__init__(toppath) - self.sentinel = True self.extracted = False @classmethod @@ -595,7 +760,7 @@ class ArchiveImportTask(ImportTask): return cls._handlers - def cleanup(self): + def cleanup(self, **kwargs): """Removes the temporary directory the archive was extracted to. """ if self.extracted: @@ -626,28 +791,8 @@ def read_tasks(session): in the user-specified list of paths. In the case of a singleton import, yields single-item tasks instead. """ - # Look for saved progress. - if _resume(): - resume_dirs = {} - for path in session.paths: - resume_dir = progress_get(path) - if resume_dir: - - # Either accept immediately or prompt for input to decide. - if _resume() is True: - do_resume = True - log.warn('Resuming interrupted import of %s' % path) - else: - do_resume = session.should_resume(path) - - if do_resume: - resume_dirs[path] = resume_dir - else: - # Clear progress; we're starting from the top. - progress_set(path, None) - # Look for saved incremental directories. - if config['import']['incremental']: + if session.config['incremental']: incremental_skipped = 0 history_dirs = history_get() @@ -655,7 +800,7 @@ def read_tasks(session): # Extract archives. archive_task = None if ArchiveImportTask.is_archive(syspath(toppath)): - if not (config['import']['move'] or config['import']['copy']): + if not (session.config['move'] or session.config['copy']): log.warn("Archive importing requires either " "'copy' or 'move' to be enabled.") continue @@ -681,27 +826,38 @@ def read_tasks(session): util.displayable_path(toppath) )) continue - if config['import']['singletons']: - yield ImportTask.item_task(item) + if session.config['singletons']: + yield SingletonImportTask(item) else: yield ImportTask(toppath, [toppath], [item]) continue # A flat album import merges all items into one album. - if config['import']['flat'] and not config['import']['singletons']: + if session.config['flat'] and not session.config['singletons']: all_items = [] for _, items in autotag.albums_in_dir(toppath): all_items += items yield ImportTask(toppath, [toppath], all_items) - yield ImportTask.done_sentinel(toppath) + yield SentinelImportTask(toppath) continue + resume_dir = None + if session.want_resume: + resume_dir = progress_get(toppath) + if resume_dir: + # Either accept immediately or prompt for input to decide. + if session.want_resume is True or \ + session.should_resume(toppath): + log.warn('Resuming interrupted import of %s' % toppath) + else: + # Clear progress; we're starting from the top. + resume_dir = None + progress_set(toppath, None) + # Produce paths under this directory. - if _resume(): - resume_dir = resume_dirs.get(toppath) for paths, items in autotag.albums_in_dir(toppath): # Skip according to progress. - if _resume() and resume_dir: + if session.want_resume and resume_dir: # We're fast-forwarding to resume a previous tagging. if paths == resume_dir: # We've hit the last good path! Turn off the @@ -710,7 +866,7 @@ def read_tasks(session): continue # When incremental, skip paths in the history. - if config['import']['incremental'] \ + if session.config['incremental'] \ and tuple(paths) in history_dirs: log.debug(u'Skipping previously-imported path: %s' % displayable_path(paths)) @@ -718,22 +874,22 @@ def read_tasks(session): continue # Yield all the necessary tasks. - if config['import']['singletons']: + if session.config['singletons']: for item in items: - yield ImportTask.item_task(item) - yield ImportTask.progress_sentinel(toppath, paths) + yield SingletonImportTask(item) + yield SentinelImportTask(toppath, paths) else: yield ImportTask(toppath, paths, items) # Indicate the directory is finished. # FIXME hack to delete extracted archives if archive_task is None: - yield ImportTask.done_sentinel(toppath) + yield SentinelImportTask(toppath) else: yield archive_task # Show skipped directories. - if config['import']['incremental'] and incremental_skipped: + if session.config['incremental'] and incremental_skipped: log.info(u'Incremental import: skipped %i directories.' % incremental_skipped) @@ -743,10 +899,10 @@ def query_tasks(session): Instead of finding files from the filesystem, a query is used to match items from the library. """ - if config['import']['singletons']: + if session.config['singletons']: # Search for items. for item in session.lib.items(session.query): - yield ImportTask.item_task(item) + yield SingletonImportTask(item) else: # Search for albums. @@ -754,30 +910,35 @@ def query_tasks(session): log.debug('yielding album %i: %s - %s' % (album.id, album.albumartist, album.album)) items = list(album.items()) + + # Clear IDs from re-tagged items so they appear "fresh" when + # we add them back to the library. + for item in items: + item.id = None + item.album_id = None + yield ImportTask(None, [album.item_dir()], items) -def initial_lookup(session): +@pipeline.mutator_stage +def lookup_candidates(session, task): """A coroutine for performing the initial MusicBrainz lookup for an album. It accepts lists of Items and yields (items, cur_artist, cur_album, candidates, rec) tuples. If no match is found, all of the yielded parameters (except items) are None. """ - task = None - while True: - task = yield task - if task.should_skip(): - continue + if task.skip: + # FIXME This gets duplicated a lot. We need a better + # abstraction. + return - plugins.send('import_task_start', session=session, task=task) - - log.debug('Looking up: %s' % displayable_path(task.paths)) - task.set_candidates( - *autotag.tag_album(task.items) - ) + plugins.send('import_task_start', session=session, task=task) + log.debug('Looking up: %s' % displayable_path(task.paths)) + task.lookup_candidates() -def user_query(session): +@pipeline.stage +def user_query(session, task): """A coroutine for interfacing with the user about the tagging process. @@ -790,350 +951,129 @@ def user_query(session): acces to the choice via the ``taks.choice_flag`` property and may choose to change it. """ - recent = set() - task = None - while True: - task = yield task - if task.should_skip(): - continue + if task.skip: + return task - # Ask the user for a choice. - choice = session.choose_match(task) - task.set_choice(choice) - session.log_choice(task) - plugins.send('import_task_choice', session=session, task=task) + # Ask the user for a choice. + task.choose_match(session) + plugins.send('import_task_choice', session=session, task=task) - # As-tracks: transition to singleton workflow. - if task.choice_flag is action.TRACKS: - # Set up a little pipeline for dealing with the singletons. - def emitter(task): - for item in task.items: - yield ImportTask.item_task(item) - yield ImportTask.progress_sentinel(task.toppath, task.paths) + # As-tracks: transition to singleton workflow. + if task.choice_flag is action.TRACKS: + # Set up a little pipeline for dealing with the singletons. + def emitter(task): + for item in task.items: + yield SingletonImportTask(item) + yield SentinelImportTask(task.toppath, task.paths) - ipl = pipeline.Pipeline([ - emitter(task), - item_lookup(session), - item_query(session), - ]) - task = pipeline.multiple(ipl.pull()) - continue + ipl = pipeline.Pipeline([ + emitter(task), + lookup_candidates(session), + user_query(session), + ]) + return pipeline.multiple(ipl.pull()) - # As albums: group items by albums and create task for each album - if task.choice_flag is action.ALBUMS: - def emitter(task): - yield task + # As albums: group items by albums and create task for each album + if task.choice_flag is action.ALBUMS: + ipl = pipeline.Pipeline([ + iter([task]), + group_albums(session), + lookup_candidates(session), + user_query(session) + ]) + return pipeline.multiple(ipl.pull()) - ipl = pipeline.Pipeline([ - emitter(task), - group_albums(session), - initial_lookup(session), - user_query(session) - ]) - task = pipeline.multiple(ipl.pull()) - continue - - # Check for duplicates if we have a match (or ASIS). - if task.choice_flag in (action.ASIS, action.APPLY): - ident = task.chosen_ident() - # The "recent" set keeps track of identifiers for recently - # imported albums -- those that haven't reached the database - # yet. - if ident in recent or _duplicate_check(session.lib, task): - session.resolve_duplicate(task) - session.log_choice(task, True) - recent.add(ident) + resolve_duplicates(session, task) + return task -def show_progress(session): - """This stage replaces the initial_lookup and user_query stages - when the importer is run without autotagging. It displays the album - name and artist as the files are added. +def resolve_duplicates(session, task): + """Check if a task conflicts with items or albums already imported + and ask the session to resolve this. """ - task = None - while True: - task = yield task - if task.should_skip(): - continue - - log.info(displayable_path(task.paths)) - - # Behave as if ASIS were selected. - task.set_null_candidates() - task.set_choice(action.ASIS) + if task.choice_flag in (action.ASIS, action.APPLY): + ident = task.chosen_ident() + if ident in session.seen_idents or task.find_duplicates(session.lib): + session.resolve_duplicate(task) + session.log_choice(task, True) + session.seen_idents.add(ident) -def apply_choices(session): +@pipeline.mutator_stage +def import_asis(session, task): + """Select the `action.ASIS` choice for all tasks. + + This stage replaces the initial_lookup and user_query stages + when the importer is run without autotagging. + """ + if task.skip: + return + + log.info(displayable_path(task.paths)) + + # Behave as if ASIS were selected. + task.set_null_candidates() + task.set_choice(action.ASIS) + + +@pipeline.mutator_stage +def apply_choices(session, task): """A coroutine for applying changes to albums and singletons during the autotag process. """ - task = None - while True: - task = yield task - if task.should_skip(): - continue + if task.skip: + return - items = task.imported_items() - # Clear IDs in case the items are being re-tagged. - for item in items: - item.id = None - item.album_id = None + # Change metadata. + if task.apply: + task.apply_metadata() + plugins.send('import_task_apply', session=session, task=task) - # Change metadata. - if task.should_write_tags(): - if task.is_album: - autotag.apply_metadata( - task.match.info, task.match.mapping - ) - else: - autotag.apply_item_metadata(task.item, task.match.info) - plugins.send('import_task_apply', session=session, task=task) + # Infer album-level fields. + if task.is_album: + task.infer_album_fields() - # Infer album-level fields. - if task.is_album: - _infer_album_fields(task) - - # Find existing item entries that these are replacing (for - # re-imports). Old album structures are automatically cleaned up - # when the last item is removed. - task.replaced_items = defaultdict(list) - for item in items: - dup_items = session.lib.items( - dbcore.query.BytesQuery('path', item.path) - ) - for dup_item in dup_items: - task.replaced_items[item].append(dup_item) - log.debug('replacing item %i: %s' % - (dup_item.id, displayable_path(item.path))) - log.debug('%i of %i items replaced' % (len(task.replaced_items), - len(items))) - - # Find old items that should be replaced as part of a duplicate - # resolution. - duplicate_items = [] - if task.remove_duplicates: - if task.is_album: - for album in _duplicate_check(session.lib, task): - duplicate_items += album.items() - else: - duplicate_items = _item_duplicate_check(session.lib, task) - log.debug('removing %i old duplicated items' % - len(duplicate_items)) - - # Delete duplicate files that are located inside the library - # directory. - task.duplicate_paths = [] - for duplicate_path in [i.path for i in duplicate_items]: - if session.lib.directory in util.ancestry(duplicate_path): - # Mark the path for deletion in the manipulate_files - # stage. - task.duplicate_paths.append(duplicate_path) - - # Add items -- before path changes -- to the library. We add the - # items now (rather than at the end) so that album structures - # are in place before calls to destination(). - with session.lib.transaction(): - # Remove old items. - for replaced in task.replaced_items.itervalues(): - for item in replaced: - item.remove() - for item in duplicate_items: - item.remove() - - # Add new ones. - if task.is_album: - # Add an album. - album = session.lib.add_album(items) - task.album_id = album.id - else: - # Add tracks. - for item in items: - session.lib.add(item) + task.add(session.lib) -def plugin_stage(session, func): +@pipeline.mutator_stage +def plugin_stage(session, func, task): """A coroutine (pipeline stage) that calls the given function with each non-skipped import task. These stages occur between applying metadata changes and moving/copying/writing files. """ - task = None - while True: - task = yield task - if task.should_skip(): - continue - func(session, task) + if task.skip: + return - # Stage may modify DB, so re-load cached item data. - for item in task.imported_items(): - item.load() + func(session, task) + + # Stage may modify DB, so re-load cached item data. + # FIXME Importer plugins should not modify the database but instead + # the albums and items attached to tasks. + task.reload() -def manipulate_files(session): +@pipeline.stage +def manipulate_files(session, task): """A coroutine (pipeline stage) that performs necessary file - manipulations *after* items have been added to the library. + manipulations *after* items have been added to the library and + finalizes each task. """ - task = None - while True: - task = yield task - if task.should_skip(): - continue + if task.skip: + return - # Remove duplicate files marked for deletion. - if task.remove_duplicates: - for duplicate_path in task.duplicate_paths: - log.debug(u'deleting replaced duplicate %s' % - util.displayable_path(duplicate_path)) - util.remove(duplicate_path) - util.prune_dirs(os.path.dirname(duplicate_path), - session.lib.directory) + if task.should_remove_duplicates: + task.remove_duplicates(session.lib) - # Move/copy/write files. - items = task.imported_items() - # Save the original paths of all items for deletion and pruning - # in the next step (finalization). - task.old_paths = [item.path for item in items] - for item in items: - if config['import']['move']: - # Just move the file. - item.move(False) - elif config['import']['copy']: - # If it's a reimport, move in-library files and copy - # out-of-library files. Otherwise, copy and keep track - # of the old path. - old_path = item.path - if task.replaced_items[item]: - # This is a reimport. Move in-library files and copy - # out-of-library files. - if session.lib.directory in util.ancestry(old_path): - item.move(False) - # We moved the item, so remove the - # now-nonexistent file from old_paths. - task.old_paths.remove(old_path) - else: - item.move(True) - else: - # A normal import. Just copy files and keep track of - # old paths. - item.move(True) + task.manipulate_files( + move=session.config['move'], + copy=session.config['copy'], + write=session.config['write'], + session=session, + ) - if config['import']['write'] and task.should_write_tags(): - item.try_write() - - # Save new paths. - with session.lib.transaction(): - for item in items: - item.store() - - # Plugin event. - plugins.send('import_task_files', session=session, task=task) - - -def finalize(session): - """A coroutine that finishes up importer tasks. In particular, the - coroutine sends plugin events, deletes old files, and saves - progress. This is a "terminal" coroutine (it yields None). - """ - while True: - task = yield - if task.should_skip(): - if _resume(): - task.save_progress() - if config['import']['incremental']: - task.save_history() - task.cleanup() - continue - - items = task.imported_items() - - # When copying and deleting originals, delete old files. - if config['import']['copy'] and config['import']['delete']: - new_paths = [os.path.realpath(item.path) for item in items] - for old_path in task.old_paths: - # Only delete files that were actually copied. - if old_path not in new_paths: - util.remove(syspath(old_path), False) - task.prune(old_path) - - # When moving, prune empty directories containing the original - # files. - elif config['import']['move']: - for old_path in task.old_paths: - task.prune(old_path) - - # Update progress. - if _resume(): - task.save_progress() - if config['import']['incremental']: - task.save_history() - task.cleanup() - - # Announce that we've added an album. - if task.is_album: - album = session.lib.get_album(task.album_id) - plugins.send('album_imported', - lib=session.lib, album=album) - else: - for item in items: - plugins.send('item_imported', - lib=session.lib, item=item) - - -# Singleton pipeline stages. - -def item_lookup(session): - """A coroutine used to perform the initial MusicBrainz lookup for - an item task. - """ - task = None - while True: - task = yield task - if task.should_skip(): - continue - - plugins.send('import_task_start', session=session, task=task) - - task.set_item_candidates(*autotag.tag_item(task.item)) - - -def item_query(session): - """A coroutine that queries the user for input on single-item - lookups. - """ - task = None - recent = set() - while True: - task = yield task - if task.should_skip(): - continue - - choice = session.choose_item(task) - task.set_choice(choice) - session.log_choice(task) - plugins.send('import_task_choice', session=session, task=task) - - # Duplicate check. - if task.choice_flag in (action.ASIS, action.APPLY): - ident = task.chosen_ident() - if ident in recent or _item_duplicate_check(session.lib, task): - session.resolve_duplicate(task) - session.log_choice(task, True) - recent.add(ident) - - -def item_progress(session): - """Skips the lookup and query stages in a non-autotagged singleton - import. Just shows progress. - """ - task = None - log.info('Importing items:') - while True: - task = yield task - if task.should_skip(): - continue - - log.info(displayable_path(task.item.path)) - task.set_null_candidates() - task.set_choice(action.ASIS) + # Progress, cleanup, and event. + task.finalize(session) def group_albums(session): @@ -1146,11 +1086,11 @@ def group_albums(session): task = None while True: task = yield task - if task.should_skip(): + if task.skip: continue tasks = [] for _, items in itertools.groupby(task.items, group): tasks.append(ImportTask(items=list(items))) - tasks.append(ImportTask.progress_sentinel(task.toppath, task.paths)) + tasks.append(SentinelImportTask(task.toppath, task.paths)) task = pipeline.multiple(tasks) diff --git a/beets/ui/commands.py b/beets/ui/commands.py index 24c798e94..b660cd9e4 100644 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -740,7 +740,7 @@ class TerminalImportSession(importer.ImportSession): pass elif sel == 'r': # Remove old. - task.remove_duplicates = True + task.should_remove_duplicates = True else: assert False diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index 95b77b4da..d267789c8 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -137,6 +137,51 @@ def multiple(messages): return MultiMessage(messages) +def stage(func): + """Decorate a function to become a simple stage. + + >>> @stage + ... def add(n, i): + ... return i + n + >>> pipe = Pipeline([ + ... iter([1, 2, 3]), + ... add(2), + ... ]) + >>> list(pipe.pull()) + [3, 4, 5] + """ + + def coro(*args): + task = None + while True: + task = yield task + task = func(*(args + (task,))) + return coro + + +def mutator_stage(func): + """Decorate a function that manipulates items in a coroutine to + become a simple stage. + + >>> @mutator_stage + ... def setkey(key, item): + ... item[key] = True + >>> pipe = Pipeline([ + ... iter([{'x': False}, {'a': False}]), + ... setkey('x'), + ... ]) + >>> list(pipe.pull()) + [{'x': True}, {'a': False, 'x': True}] + """ + + def coro(*args): + task = None + while True: + task = yield task + func(*(args + (task,))) + return coro + + def _allmsgs(obj): """Returns a list of all the messages encapsulated in obj. If obj is a MultiMessage, returns its enclosed messages. If obj is BUBBLE, diff --git a/beetsplug/fetchart.py b/beetsplug/fetchart.py index 1b47e07c3..4b4f53bea 100644 --- a/beetsplug/fetchart.py +++ b/beetsplug/fetchart.py @@ -278,8 +278,7 @@ class FetchArtPlugin(BeetsPlugin): # For any other choices (e.g., TRACKS), do nothing. return - album = session.lib.get_album(task.album_id) - path = art_for_album(album, task.paths, self.maxwidth, local) + path = art_for_album(task.album, task.paths, self.maxwidth, local) if path: self.art_paths[task] = path @@ -290,7 +289,7 @@ class FetchArtPlugin(BeetsPlugin): if task in self.art_paths: path = self.art_paths.pop(task) - album = session.lib.get_album(task.album_id) + album = task.album src_removed = (config['import']['delete'].get(bool) or config['import']['move'].get(bool)) album.set_art(path, not src_removed) diff --git a/beetsplug/lastgenre/__init__.py b/beetsplug/lastgenre/__init__.py index eea428dfa..6646e6af3 100644 --- a/beetsplug/lastgenre/__init__.py +++ b/beetsplug/lastgenre/__init__.py @@ -382,7 +382,7 @@ class LastGenrePlugin(plugins.BeetsPlugin): def imported(self, session, task): """Event hook called when an import task finishes.""" if task.is_album: - album = session.lib.get_album(task.album_id) + album = task.album album.genre, src = self._get_genre(album) log.debug(u'added last.fm album genre ({0}): {1}'.format( src, album.genre diff --git a/beetsplug/replaygain.py b/beetsplug/replaygain.py index 87a4eb2dc..acf7afc86 100644 --- a/beetsplug/replaygain.py +++ b/beetsplug/replaygain.py @@ -601,8 +601,7 @@ class ReplayGainPlugin(BeetsPlugin): return if task.is_album: - album = session.lib.get_album(task.album_id) - self.handle_album(album, False) + self.handle_album(task.album, False) else: self.handle_track(task.item, False) diff --git a/test/helper.py b/test/helper.py index 2f65b2e7d..16cfd11d3 100644 --- a/test/helper.py +++ b/test/helper.py @@ -349,7 +349,7 @@ class TestImportSession(importer.ImportSession): if res == self.Resolution.SKIP: task.set_choice(importer.action.SKIP) elif res == self.Resolution.REMOVE: - task.remove_duplicates = True + task.should_remove_duplicates = True def generate_album_info(album_id, track_ids): diff --git a/test/test_art.py b/test/test_art.py index 2a4293a8f..01e786e6e 100644 --- a/test/test_art.py +++ b/test/test_art.py @@ -222,7 +222,7 @@ class ArtImporterTest(_common.TestCase): # Import task for the coroutine. self.task = importer.ImportTask(None, None, [self.i]) self.task.is_album = True - self.task.album_id = self.album.id + self.task.album = self.album info = AlbumInfo( album = 'some album', album_id = 'albumid', diff --git a/test/test_ihate.py b/test/test_ihate.py index 0b7f9affc..b9c6eb114 100644 --- a/test/test_ihate.py +++ b/test/test_ihate.py @@ -15,10 +15,7 @@ class IHatePluginTest(unittest.TestCase): genre='TestGenre', album=u'TestAlbum', artist=u'TestArtist') - task = importer.ImportTask() - task.items = [test_item] - task.item = test_item - task.is_album = False + task = importer.SingletonImportTask(test_item) # Empty query should let it pass. self.assertFalse(IHatePlugin.do_i_hate_this(task, match_pattern)) diff --git a/test/test_importer.py b/test/test_importer.py index 50a8c1d3b..371e9ab1c 100644 --- a/test/test_importer.py +++ b/test/test_importer.py @@ -867,12 +867,9 @@ class InferAlbumDataTest(_common.TestCase): items=self.items) self.task.set_null_candidates() - def _infer(self): - importer._infer_album_fields(self.task) - def test_asis_homogenous_single_artist(self): self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() self.assertFalse(self.items[0].comp) self.assertEqual(self.items[0].albumartist, self.items[2].artist) @@ -881,7 +878,7 @@ class InferAlbumDataTest(_common.TestCase): self.items[1].artist = 'some other artist' self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() self.assertTrue(self.items[0].comp) self.assertEqual(self.items[0].albumartist, 'Various Artists') @@ -891,7 +888,7 @@ class InferAlbumDataTest(_common.TestCase): self.items[1].artist = 'some other artist' self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() for item in self.items: self.assertTrue(item.comp) @@ -901,7 +898,7 @@ class InferAlbumDataTest(_common.TestCase): self.items[0].artist = 'another artist' self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() self.assertFalse(self.items[0].comp) self.assertEqual(self.items[0].albumartist, self.items[2].artist) @@ -914,7 +911,7 @@ class InferAlbumDataTest(_common.TestCase): item.mb_albumartistid = 'some album artist id' self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() self.assertEqual(self.items[0].albumartist, 'some album artist') @@ -924,7 +921,7 @@ class InferAlbumDataTest(_common.TestCase): def test_apply_gets_artist_and_id(self): self.task.set_choice(AlbumMatch(0, None, {}, set(), set())) # APPLY - self._infer() + self.task.infer_album_fields() self.assertEqual(self.items[0].albumartist, self.items[0].artist) self.assertEqual(self.items[0].mb_albumartistid, @@ -936,7 +933,7 @@ class InferAlbumDataTest(_common.TestCase): item.mb_albumartistid = 'some album artist id' self.task.set_choice(AlbumMatch(0, None, {}, set(), set())) # APPLY - self._infer() + self.task.infer_album_fields() self.assertEqual(self.items[0].albumartist, 'some album artist') @@ -947,16 +944,9 @@ class InferAlbumDataTest(_common.TestCase): self.items = [self.items[0]] self.task.items = self.items self.task.set_choice(importer.action.ASIS) - self._infer() + self.task.infer_album_fields() self.assertFalse(self.items[0].comp) - def test_first_item_null_apply(self): - self.items[0] = None - self.task.set_choice(AlbumMatch(0, None, {}, set(), set())) # APPLY - self._infer() - self.assertFalse(self.items[1].comp) - self.assertEqual(self.items[1].albumartist, self.items[2].artist) - class ImportDuplicateAlbumTest(unittest.TestCase, TestHelper): diff --git a/test/test_pipeline.py b/test/test_pipeline.py index cd371af12..0c4de6836 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -208,6 +208,32 @@ class MultiMessageTest(unittest.TestCase): self.assertEqual(list(pl.pull()), [0, 0, 1, -1, 2, -2, 3, -3, 4, -4]) +class StageDecoratorTest(unittest.TestCase): + + def test_stage_decorator(self): + @pipeline.stage + def add(n, i): + return i + n + + pl = pipeline.Pipeline([ + iter([1, 2, 3]), + add(2) + ]) + self.assertEqual(list(pl.pull()), [3, 4, 5]) + + def test_mutator_stage_decorator(self): + @pipeline.mutator_stage + def setkey(key, item): + item[key] = True + + pl = pipeline.Pipeline([ + iter([{'x': False}, {'a': False}]), + setkey('x'), + ]) + self.assertEqual(list(pl.pull()), + [{'x': True}, {'a': False, 'x': True}]) + + def suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/test/test_ui.py b/test/test_ui.py index 54cbb533f..d28587c0a 100644 --- a/test/test_ui.py +++ b/test/test_ui.py @@ -473,32 +473,6 @@ class PrintTest(_common.TestCase): del os.environ['LC_CTYPE'] -class AutotagTest(_common.TestCase): - def setUp(self): - super(AutotagTest, self).setUp() - self.io.install() - - def _no_candidates_test(self, result): - task = importer.ImportTask( - 'toppath', - 'path', - [_common.item()], - ) - task.set_candidates('artist', 'album', [], autotag.Recommendation.none) - session = _common.import_session(cli=True) - res = session.choose_match(task) - self.assertEqual(res, result) - self.assertTrue('No match' in self.io.getoutput()) - - def test_choose_match_with_no_candidates_skip(self): - self.io.addinput('s') - self._no_candidates_test(importer.action.SKIP) - - def test_choose_match_with_no_candidates_asis(self): - self.io.addinput('u') - self._no_candidates_test(importer.action.ASIS) - - class ImportTest(_common.TestCase): def test_quiet_timid_disallowed(self): config['import']['quiet'] = True