abstract pipeline implementation into its own module

This commit is contained in:
Adrian Sampson 2010-07-31 19:12:10 -07:00
parent 45ee9b210c
commit ee6b15b367
2 changed files with 292 additions and 148 deletions

View file

@ -27,6 +27,7 @@ from beets import autotag
from beets import library
from beets.mediafile import UnreadableFileError, FileTypeError
import beets.autotag.art
from beets.ui import pipeline
# Global logger.
log = logging.getLogger('beets')
@ -197,7 +198,22 @@ def choose_match(items, cur_artist, cur_album, candidates, rec):
except autotag.AutotagError:
candidates, rec = None, None
# Core autotagger generators and coroutines.
def _reopen_lib(lib):
"""Because of limitations in SQLite, a given Library is bound to
the thread in which it was created. This function reopens Library
objects so that they can be used from separate threads.
"""
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_format,
lib.art_filename,
)
else:
return lib
# Core autotagger pipeline stages.
def read_albums(paths):
"""A generator yielding all the albums (as sets of Items) found in
@ -215,8 +231,8 @@ def read_albums(paths):
def initial_lookup():
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(cur_artist, cur_album, candidates, rec) tuples. If no match is found,
all of the yielded parameters are None.
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
is found, all of the yielded parameters (except items) are None.
"""
items = yield
while True:
@ -224,7 +240,7 @@ def initial_lookup():
cur_artist, cur_album, candidates, rec = autotag.tag_album(items)
except autotag.AutotagError:
cur_artist, cur_album, candidates, rec = None, None, None, None
items = yield(cur_artist, cur_album, candidates, rec)
items = yield (items, cur_artist, cur_album, candidates, rec)
def user_query(lib, logfile=None):
"""A coroutine for interfacing with the user about the tagging
@ -233,9 +249,11 @@ def user_query(lib, logfile=None):
accepts (items, cur_artist, cur_album, candidates, rec) tuples.
items is a set of Items in the album to be tagged; the remaining
parameters are the result of an initial lookup from MusicBrainz.
The coroutine yields either a candidate info dict, CHOICE_ASIS, or
None (indicating that the items should not be imported).
The coroutine yields (items, info) pairs where info is either a
candidate info dict or CHOICE_ASIS. May also yield pipeline.BUBBLE,
indicating that the items should not be imported.
"""
lib = _reopen_lib(lib)
items, cur_artist, cur_album, candidates, rec = yield
first = True
while True:
@ -254,7 +272,8 @@ def user_query(lib, logfile=None):
tag_log(logfile, 'skip', items)
# Yield None, indicating that the pipeline should not
# progress.
items, cur_artist, cur_album, candidates, rec = yield None
items, cur_artist, cur_album, candidates, rec = \
yield pipeline.BUBBLE
continue
# Ensure that we don't have the album already.
@ -271,11 +290,12 @@ def user_query(lib, logfile=None):
if count >= 1:
print_("This album (%s - %s) is already in the library!" %
(artist, album))
items, cur_artist, cur_album, candidates, rec = yield None
items, cur_artist, cur_album, candidates, rec = \
yield pipeline.BUBBLE
continue
# Yield the result and get the next chunk of work.
items, cur_artist, cur_album, candidates, rec = yield info
items, cur_artist, cur_album, candidates, rec = yield (items, info)
def apply_choices(lib, copy, write, art):
"""A coroutine for applying changes to albums during the autotag
@ -284,6 +304,7 @@ def apply_choices(lib, copy, write, art):
nothing. items the set of Items to import; info is either a
candidate info dictionary or CHOICE_ASIS.
"""
lib = _reopen_lib(lib)
while True:
# Get next chunk of work.
items, info = yield
@ -310,142 +331,6 @@ def apply_choices(lib, copy, write, art):
# Write the database after each album.
lib.save()
# Importer main functions.
def autotag_sequential(lib, paths, copy, write, logfile, art):
"""Autotags and imports the albums in the directory in a single-
threaded manner. lib is the Library to import into. If copy, then
items are copied into the destination directory. If write, then
new metadata is written back to the files' tags. If logfile is
provided, then a log message will be added there if the album is
untaggable. If art, then attempt to download cover art for the
album.
"""
# Set up the various coroutines.
init_lookup_coro = initial_lookup()
init_lookup_coro.next()
user_coro = user_query(lib, logfile)
user_coro.next()
apply_coro = apply_choices(lib, copy, write, art)
apply_coro.next()
# Crawl albums and send them through the pipeline one at a time.
for items in read_albums(paths):
cur_artist, cur_album, candidates, rec = \
init_lookup_coro.send(items)
info = user_coro.send((items, cur_artist, cur_album, candidates, rec))
if info is None:
# User-query coroutine yeilds None when the album
# should be skipped. Bypass the rest of the pipeline.
continue
apply_coro.send((items, info))
def _reopen_lib(lib):
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_format,
lib.art_filename,
)
else:
return lib
CHANNEL_POISON = 'CHANNEL_POISION'
class ReadAlbumsThread(Thread):
def __init__(self, paths, out_queue):
super(ReadAlbumsThread, self).__init__()
self.gen = read_albums(paths)
self.out_queue = out_queue
def run(self):
for items in self.gen:
self.out_queue.put(items)
self.out_queue.put(CHANNEL_POISON)
class InitialLookupThread(Thread):
def __init__(self, in_queue, out_queue):
super(InitialLookupThread, self).__init__()
self.coro = initial_lookup()
self.coro.next()
self.in_queue, self.out_queue = in_queue, out_queue
def run(self):
while True:
items = self.in_queue.get()
if items is CHANNEL_POISON:
break
cur_artist, cur_album, candidates, rec = self.coro.send(items)
self.out_queue.put((items, cur_artist, cur_album,
candidates, rec))
self.out_queue.put(CHANNEL_POISON)
class UIThread(Thread):
def __init__(self, lib, logfile, in_queue, out_queue):
super(UIThread, self).__init__()
self.lib, self.logfile = lib, logfile
self.in_queue, self.out_queue = in_queue, out_queue
def run(self):
self.coro = user_query(_reopen_lib(self.lib), self.logfile)
self.coro.next()
while True:
msg = self.in_queue.get()
if msg is CHANNEL_POISON:
break
items = msg[0]
info = self.coro.send(msg)
if info is None:
# Skip this album.
continue
self.out_queue.put((items, info))
self.out_queue.put(CHANNEL_POISON)
class ApplyThread(Thread):
def __init__(self, lib, copy, write, art, in_queue):
super(ApplyThread, self).__init__()
self.lib, self.copy, self.write, self.art = lib, copy, write, art
self.in_queue = in_queue
def run(self):
self.coro = apply_choices(_reopen_lib(self.lib), self.copy,
self.write, self.art)
self.coro.next()
while True:
msg = self.in_queue.get()
if msg is CHANNEL_POISON:
break
self.coro.send(msg)
CHANNEL_SIZE = 10
def autotag_threaded(lib, paths, copy, write, logfile, art):
"""Autotags and imports albums using multiple threads. A drop-in
replacement for autotag_sequential.
"""
q_read_to_lookup = Queue(CHANNEL_SIZE)
q_lookup_to_user = Queue(CHANNEL_SIZE)
q_user_to_apply = Queue(CHANNEL_SIZE)
reader_thread = ReadAlbumsThread(paths, q_read_to_lookup)
lookup_thread = InitialLookupThread(q_read_to_lookup, q_lookup_to_user)
ui_thread = UIThread(lib, logfile, q_lookup_to_user, q_user_to_apply)
apply_thread = ApplyThread(lib, copy, write, art, q_user_to_apply)
reader_thread.start()
lookup_thread.start()
ui_thread.start()
apply_thread.start()
reader_thread.join()
lookup_thread.join()
ui_thread.join()
apply_thread.join()
def simple_import(lib, paths, copy):
"""Imports all the albums found in the paths without attempting to
autotag them. The behavior is similar to an import in which the
user always chooses the "as-is" option.
"""
for items in read_albums(paths):
if copy:
for item in items:
item.move(lib, True)
lib.add_album(items)
lib.save()
# The import command.
def import_files(lib, paths, copy, write, autot, logpath, art, threaded):
@ -465,12 +350,25 @@ def import_files(lib, paths, copy, write, autot, logpath, art, threaded):
# Perform the import.
if autot:
# Autotag. Set up the pipeline.
pl = pipeline.Pipeline([
read_albums(paths),
initial_lookup(),
user_query(lib, logfile),
apply_choices(lib, copy, write, art),
])
if threaded:
autotag_threaded(lib, paths, copy, write, logfile, art)
pl.run_parallel()
else:
autotag_sequential(lib, paths, copy, write, logfile, art)
pl.run_sequential()
else:
just_import(lib, paths, copy)
# Simple import without autotagging. Always sequential.
for items in read_albums(paths):
if copy:
for item in items:
item.move(lib, True)
lib.add_album(items)
lib.save()
# If we were logging, close the file.
if logfile:

246
beets/ui/pipeline.py Normal file
View file

@ -0,0 +1,246 @@
# This file is part of beets.
# Copyright 2010, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Simple but robust implementation of generator/coroutine-based
pipelines in Python. The pipelines may be run either sequentially
(single-threaded) or in parallel (one thread per pipeline stage).
This implementation supports pipeline bubbles (indications that the
processing for a certain item should abort). To use them, yield the
BUBBLE constant from any stage coroutine except the last.
In the parallel case, the implementation transparently handles thread
shutdown when the processing is complete and when a stage raises an
exception.
"""
from __future__ import with_statement # for Python 2.5
from Queue import Queue
from threading import Thread, Lock
BUBBLE = '__PIPELINE_BUBBLE__'
POISON = '__PIPELINE_POISON__'
DEFAULT_QUEUE_SIZE = 16
class PipelineError(object):
"""An indication that an exception occurred in the pipeline. The
object is passed through the pipeline to shut down all threads
before it is raised again in the main thread.
"""
def __init__(self, exc):
self.exc = exc
class FirstPipelineThread(Thread):
"""The thread running the first stage in a parallel pipeline setup.
The coroutine should just be a generator.
"""
def __init__(self, coro, out_queue):
super(FirstPipelineThread, self).__init__()
self.coro = coro
self.out_queue = out_queue
self.abort_lock = Lock()
self.abort_flag = False
def run(self):
while True:
# Time to abort?
with self.abort_lock:
if self.abort_flag:
break
# Get the value from the generator.
try:
msg = self.coro.next()
except StopIteration:
break
except Exception, exc:
self.out_queue.put(PipelineError(exc))
return
# Send it to the next stage.
self.out_queue.put(msg)
if msg is BUBBLE:
continue
# Generator finished; shut down the pipeline.
self.out_queue.put(POISON)
def abort(self):
"""Shut down the pipeline by canceling this thread and
poisoning out_channel.
"""
with self.abort_lock:
self.abort_flag = True
class MiddlePipelineThread(Thread):
"""A thread running any stage in the pipeline except the first or
last.
"""
def __init__(self, coro, in_queue, out_queue):
super(MiddlePipelineThread, self).__init__()
self.coro = coro
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
# Prime the coroutine.
self.coro.next()
while True:
# Get the message from the previous stage.
msg = self.in_queue.get()
if msg is POISON:
break
elif isinstance(msg, PipelineError):
self.out_queue.put(msg)
return
# Invoke the current stage.
try:
out = self.coro.send(msg)
except Exception, exc:
self.out_queue.put(PipelineError(exc))
return
# Send message to next stage.
if out is BUBBLE:
continue
self.out_queue.put(out)
# Pipeline is shutting down normally.
self.out_queue.put(POISON)
class LastPipelineThread(Thread):
"""A thread running the last stage in a pipeline. The coroutine
should yield nothing.
"""
def __init__(self, coro, in_queue):
super(LastPipelineThread, self).__init__()
self.coro = coro
self.in_queue = in_queue
def run(self):
# Prime the coroutine.
self.coro.next()
while True:
# Get the message from the previous stage.
msg = self.in_queue.get()
if msg is POISON:
break
elif isinstance(msg, PipelineError):
self.exc = msg.exc
return
# Send to consumer.
try:
self.coro.send(msg)
except Exception, exc:
self.exc = exc
return
# No exception raised in pipeline.
self.exc = None
class Pipeline(object):
"""Represents a staged pattern of work. Each stage in the pipeline
is a coroutine that receives messages from the previous stage and
yields messages to be sent to the next stage.
"""
def __init__(self, stages):
"""Makes a new pipeline from a list of coroutines. There must
be at least two stages.
"""
if len(stages) < 2:
raise ValueError('pipeline must have at least two stages')
self.stages = stages
def run_sequential(self):
"""Run the pipeline sequentially in the current thread. The
stages are run one after the other.
"""
# "Prime" the coroutines.
for coro in self.stages[1:]:
coro.next()
# Begin the pipeline.
for msg in self.stages[0]:
for stage in self.stages[1:]:
msg = stage.send(msg)
if msg is BUBBLE:
# Don't continue to the next stage.
break
def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE):
"""Run the pipeline in parallel using one thread per stage. The
messages between the stages are stored in queues of the given
size.
"""
queues = [Queue(queue_size) for i in range(len(self.stages)-1)]
threads = [FirstPipelineThread(self.stages[0], queues[0])]
for i in range(1, len(self.stages)-1):
threads.append(MiddlePipelineThread(
self.stages[i], queues[i-1], queues[i]
))
threads.append(LastPipelineThread(self.stages[-1], queues[-1]))
# Start threads.
for thread in threads:
thread.start()
# Wait for termination.
try:
for thread in threads:
thread.join()
except:
# Shut down the pipeline by telling the first thread to
# poison its channel.
threads[0].abort()
raise
# Was there an exception?
exc = threads[-1].exc
if exc:
raise exc
# Smoke test.
if __name__ == '__main__':
import time
def produce():
for i in range(5):
print 'generating', i
time.sleep(1)
yield i
def work():
num = yield
while True:
print 'processing', num
time.sleep(2)
num = yield num*2
def consume():
while True:
num = yield
time.sleep(1)
print 'received', num
ts_start = time.time()
Pipeline([produce(), work(), consume()]).run_sequential()
ts_middle = time.time()
Pipeline([produce(), work(), consume()]).run_parallel()
ts_end = time.time()
print 'Sequential time:', ts_middle - ts_start
print 'Parallel time:', ts_end - ts_middle