Use Pipline.push() in importer stages

This commit is contained in:
Thomas Scholtes 2014-02-01 15:01:05 +01:00
parent 565a284c03
commit 0abf41e2fb
2 changed files with 17 additions and 23 deletions

View file

@ -675,42 +675,35 @@ def user_query(session):
# As-tracks: transition to singleton workflow.
if choice is action.TRACKS:
# Set up a little pipeline for dealing with the singletons.
item_tasks = []
def emitter():
def emitter(task):
for item in task.items:
yield ImportTask.item_task(item)
yield ImportTask.progress_sentinel(task.toppath, task.paths)
def collector():
while True:
item_task = yield
item_tasks.append(item_task)
ipl = pipeline.Pipeline((emitter(), item_lookup(session),
item_query(session), collector()))
ipl.run_sequential()
task = pipeline.multiple(item_tasks)
ipl = pipeline.Pipeline([
emitter(task),
item_lookup(session),
item_query(session),
])
task = pipeline.multiple(ipl.pull())
continue
# As albums: group items by albums and create task for each album
if choice is action.ALBUMS:
album_tasks = []
def group(item):
return (item.albumartist or item.artist, item.album)
def emitter():
def emitter(task):
for _, items in itertools.groupby(task.items, group):
yield ImportTask(items=list(items))
yield ImportTask.progress_sentinel(task.toppath, task.paths)
def collector():
while True:
album_task = yield
album_tasks.append(album_task)
ipl = pipeline.Pipeline([
emitter(),
emitter(task),
initial_lookup(session),
user_query(session),
collector()
user_query(session)
])
ipl.run_sequential()
task = pipeline.multiple(album_tasks)
task = pipeline.multiple(ipl.pull())
continue
# Check for duplicates if we have a match (or ASIS).

View file

@ -375,8 +375,9 @@ class Pipeline(object):
def pull(self):
"""Yield elements from the end of the pipeline. Runs the stages
sequentially until the last yields some messages. Each of the messages
is then yielded by ``pulled.next()``. Only the first coroutine in each
stage is used.
is then yielded by ``pulled.next()``. If the pipeline has a consumer,
that is the last stage does not yield any messages, then pull will not
yield any messages. Only the first coroutine in each stage is used
"""
coros = [stage[0] for stage in self.stages]