diff --git a/beets/importer.py b/beets/importer.py index 3dc6c4da6..9e3e7f193 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -675,42 +675,35 @@ def user_query(session): # As-tracks: transition to singleton workflow. if choice is action.TRACKS: # Set up a little pipeline for dealing with the singletons. - item_tasks = [] - def emitter(): + def emitter(task): for item in task.items: yield ImportTask.item_task(item) yield ImportTask.progress_sentinel(task.toppath, task.paths) - def collector(): - while True: - item_task = yield - item_tasks.append(item_task) - ipl = pipeline.Pipeline((emitter(), item_lookup(session), - item_query(session), collector())) - ipl.run_sequential() - task = pipeline.multiple(item_tasks) + + ipl = pipeline.Pipeline([ + emitter(task), + item_lookup(session), + item_query(session), + ]) + task = pipeline.multiple(ipl.pull()) continue # As albums: group items by albums and create task for each album if choice is action.ALBUMS: - album_tasks = [] def group(item): return (item.albumartist or item.artist, item.album) - def emitter(): + + def emitter(task): for _, items in itertools.groupby(task.items, group): yield ImportTask(items=list(items)) yield ImportTask.progress_sentinel(task.toppath, task.paths) - def collector(): - while True: - album_task = yield - album_tasks.append(album_task) + ipl = pipeline.Pipeline([ - emitter(), + emitter(task), initial_lookup(session), - user_query(session), - collector() + user_query(session) ]) - ipl.run_sequential() - task = pipeline.multiple(album_tasks) + task = pipeline.multiple(ipl.pull()) continue # Check for duplicates if we have a match (or ASIS). diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index 22531bc7d..c64454ffd 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -375,8 +375,9 @@ class Pipeline(object): def pull(self): """Yield elements from the end of the pipeline. Runs the stages sequentially until the last yields some messages. Each of the messages - is then yielded by ``pulled.next()``. Only the first coroutine in each - stage is used. + is then yielded by ``pulled.next()``. If the pipeline has a consumer, + that is the last stage does not yield any messages, then pull will not + yield any messages. Only the first coroutine in each stage is used """ coros = [stage[0] for stage in self.stages]